diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc index b9740c71ba..33d4cbc7cb 100644 --- a/db/size_estimates_virtual_reader.cc +++ b/db/size_estimates_virtual_reader.cc @@ -144,7 +144,7 @@ static std::vector get_keyspaces(const schema& s, const replica::databa /** * Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables. */ -static dht::partition_range as_ring_position_range(dht::token_range& r) { +static dht::partition_range as_ring_position_range(const dht::token_range& r) { std::optional::bound> start_bound, end_bound; if (r.start()) { start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }}; @@ -156,11 +156,14 @@ static dht::partition_range as_ring_position_range(dht::token_range& r) { } /** - * Add a new range_estimates for the specified range, considering the sstables associated with `cf`. + * Add a new range_estimates for the specified range, considering the sstables associated + * with the table identified by `cf_id` across all shards. */ -static future estimate(const replica::column_family& cf, const token_range& r) { - int64_t count{0}; - utils::estimated_histogram hist{0}; +static future estimate(replica::database& db, table_id cf_id, schema_ptr schema, const token_range& r) { + struct shard_estimate { + int64_t count = 0; + utils::estimated_histogram hist{0}; + }; auto from_bytes = [] (auto& b) { return dht::token::from_sstring(utf8_type->to_string(b)); }; @@ -169,14 +172,35 @@ static future estimate(const replica::column_f wrapping_interval({{ from_bytes(r.start), false }}, {{ from_bytes(r.end) }}), dht::token_comparator(), [&] (auto&& rng) { ranges.push_back(std::move(rng)); }); - for (auto&& r : ranges) { - auto rp_range = as_ring_position_range(r); - for (auto&& sstable : cf.select_sstables(rp_range)) { - count += co_await sstable->estimated_keys_for_range(r); - hist.merge(sstable->get_stats_metadata().estimated_partition_size); + + // Estimate partition count and size distribution from sstables on a single shard. + auto estimate_on_shard = [cf_id, ranges] (replica::database& local_db) -> future { + auto table_ptr = local_db.get_tables_metadata().get_table_if_exists(cf_id); + if (!table_ptr) { + co_return shard_estimate{}; } - } - co_return system_keyspace::range_estimates{cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0}; + auto& cf = *table_ptr; + shard_estimate result; + for (auto&& r : ranges) { + auto rp_range = as_ring_position_range(r); + for (auto&& sstable : cf.select_sstables(rp_range)) { + result.count += co_await sstable->estimated_keys_for_range(r); + result.hist.merge(sstable->get_stats_metadata().estimated_partition_size); + } + } + co_return result; + }; + + // Combine partial results from two shards. + auto reduce = [] (shard_estimate a, const shard_estimate& b) { + a.count += b.count; + a.hist.merge(b.hist); + return a; + }; + + auto aggregate = co_await db.container().map_reduce0(std::move(estimate_on_shard), shard_estimate{}, std::move(reduce)); + int64_t mean_size = aggregate.count > 0 ? aggregate.hist.mean() : 0; + co_return system_keyspace::range_estimates{std::move(schema), r.start, r.end, aggregate.count, mean_size}; } /** @@ -321,7 +345,7 @@ size_estimates_mutation_reader::estimates_for_current_keyspace(std::vectorto_string(r.cf_name)); - estimates.push_back(co_await estimate(cf, r.tokens)); + estimates.push_back(co_await estimate(_db, cf.schema()->id(), cf.schema(), r.tokens)); if (estimates.size() >= _slice.partition_row_limit()) { co_return estimates; } diff --git a/test/cqlpy/test_system_tables.py b/test/cqlpy/test_system_tables.py index 428575b71f..7db683cc12 100644 --- a/test/cqlpy/test_system_tables.py +++ b/test/cqlpy/test_system_tables.py @@ -13,10 +13,28 @@ ############################################################################# from .util import new_test_table +import json import pytest +from contextlib import contextmanager from . import nodetool from cassandra.protocol import Unauthorized +# Context manager that temporarily overrides Scylla's sstable_format +# configuration option via system.config, restoring the previous value on +# exit. Note that the cqlpy harness reuses a single Scylla node across +# tests, so restoration is necessary to avoid leaking the change into +# subsequent tests. The 'value' column is JSON-encoded on read but expects a +# raw string on write, so we JSON-decode the saved value before writing it +# back. +@contextmanager +def sstable_format(cql, fmt): + old = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'sstable_format'").one().value) + cql.execute(f"UPDATE system.config SET value = '{fmt}' WHERE name = 'sstable_format'") + try: + yield + finally: + cql.execute(f"UPDATE system.config SET value = '{old}' WHERE name = 'sstable_format'") + ############################################################################# # system.size_estimates.partitions_count # Provides an estimate for the number of partitions in a table. A node @@ -64,11 +82,14 @@ def write_table_and_estimate_partitions(cql, test_keyspace, N): # up to 14%. So just to be generous let's allow a 25% inaccuracy for this # small test. In issue #9083 we noted that Scylla had much larger errors - # reporting as much as 10880 (!) partitions when we have just 1000. -@pytest.mark.xfail(reason="issue #9083") def test_partitions_estimate_simple_small(cql, test_keyspace): - N = 1000 - count = write_table_and_estimate_partitions(cql, test_keyspace, N) - assert count > N/1.25 and count < N*1.25 + # The 25% accuracy threshold below relies on the 'ms' sstable format's + # cardinality estimator. The default format ('me' on this branch) has + # wider per-sstable error and would not meet this bound. + with sstable_format(cql, 'ms'): + N = 1000 + count = write_table_and_estimate_partitions(cql, test_keyspace, N) + assert count > N/1.25 and count < N*1.25 # For a larger test, the estimation accuracy should be better: # Experimentally, for 10,000 rows, Cassandra's estimation error goes @@ -76,12 +97,12 @@ def test_partitions_estimate_simple_small(cql, test_keyspace): # This is a relatively long test (takes around 2 seconds), and isn't # needed to reproduce #9083 (the previous shorter test does it too), # so we skip this test. -@pytest.mark.xfail(reason="issue #9083") @pytest.mark.skip(reason="slow test, remove skip to try it anyway") def test_partitions_estimate_simple_large(cql, test_keyspace): - N = 10000 - count = write_table_and_estimate_partitions(cql, test_keyspace, N) - assert count > N/1.05 and count < N*1.05 + with sstable_format(cql, 'ms'): + N = 10000 + count = write_table_and_estimate_partitions(cql, test_keyspace, N) + assert count > N/1.05 and count < N*1.05 # If we write the *same* 1000 partitions to two sstables (by flushing twice, # and assuming that 1000 tiny partitions easily fit a memtable), and check @@ -95,24 +116,25 @@ def test_partitions_estimate_simple_large(cql, test_keyspace): def test_partitions_estimate_full_overlap(cassandra_bug, cql, test_keyspace): N = 500 with new_test_table(cql, test_keyspace, 'k int PRIMARY KEY') as table: - write = cql.prepare(f"INSERT INTO {table} (k) VALUES (?)") - for i in range(N): - cql.execute(write, [i]) - nodetool.flush(cql, table) - # And a second copy of the *same* data will end up in a second sstable: - for i in range(N): - cql.execute(write, [i]) - nodetool.flush(cql, table) - # TODO: In Scylla we should use NullCompactionStrategy to avoid the two - # sstables from immediately being compacted together. - nodetool.refreshsizeestimates(cql) - table_name = table[len(test_keyspace)+1:] - counts = [x.partitions_count for x in cql.execute( - f"SELECT partitions_count FROM system.size_estimates WHERE keyspace_name = '{test_keyspace}' AND table_name = '{table_name}'")] - count = sum(counts) - print(counts) - print(count) - assert count > N/1.5 and count < N*1.5 + # Disable autocompaction to prevent the two sstables from being + # compacted together before we read the size estimates. + with nodetool.no_autocompaction_context(cql, table): + write = cql.prepare(f"INSERT INTO {table} (k) VALUES (?)") + for i in range(N): + cql.execute(write, [i]) + nodetool.flush(cql, table) + # And a second copy of the *same* data will end up in a second sstable: + for i in range(N): + cql.execute(write, [i]) + nodetool.flush(cql, table) + nodetool.refreshsizeestimates(cql) + table_name = table[len(test_keyspace)+1:] + counts = [x.partitions_count for x in cql.execute( + f"SELECT partitions_count FROM system.size_estimates WHERE keyspace_name = '{test_keyspace}' AND table_name = '{table_name}'")] + count = sum(counts) + print(counts) + print(count) + assert count > N/1.5 and count < N*1.5 # Test that deleted partitions should not be counted by the estimated # partitions count. Unfortunately, the current state of both Cassandra