db: fix system.size_estimates to aggregate sstable estimates across all shards

The estimate() function in the size_estimates virtual reader only
considered sstables local to the shard that happened to own the
keyspace's partition key token. Since sstables are distributed across
shards, this caused partition count estimates to be approximately
1/smp_count of the actual value.

This bug has been present since the virtual reader was introduced in
225648780d.

Use db.container().map_reduce0() to aggregate sstable estimates
across all shards. Each shard contributes its local count and
estimated_histogram, which are then merged to produce the correct
total.

Also fix the `test_partitions_estimate_full_overlap` test which becomes
flaky (xpassing ~1% of runs) because autocompaction could merge the
two overlapping sstables before the size estimate was read. Wrap the
test body in nodetool.no_autocompaction_context to prevent this race.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1179
Refs https://github.com/scylladb/scylladb/issues/9083

Closes scylladb/scylladb#29286

(cherry picked from commit 6f364fd3b7)

Closes scylladb/scylladb#29381
This commit is contained in:
Łukasz Paszkowski
2026-03-31 10:49:11 +02:00
committed by Botond Dénes
parent 8a52602ec9
commit 342a7bfce1
2 changed files with 85 additions and 39 deletions

View File

@@ -144,7 +144,7 @@ static std::vector<sstring> get_keyspaces(const schema& s, const replica::databa
/**
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
*/
static dht::partition_range as_ring_position_range(dht::token_range& r) {
static dht::partition_range as_ring_position_range(const dht::token_range& r) {
std::optional<wrapping_interval<dht::ring_position>::bound> start_bound, end_bound;
if (r.start()) {
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
@@ -156,11 +156,14 @@ static dht::partition_range as_ring_position_range(dht::token_range& r) {
}
/**
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
* Add a new range_estimates for the specified range, considering the sstables associated
* with the table identified by `cf_id` across all shards.
*/
static future<system_keyspace::range_estimates> estimate(const replica::column_family& cf, const token_range& r) {
int64_t count{0};
utils::estimated_histogram hist{0};
static future<system_keyspace::range_estimates> estimate(replica::database& db, table_id cf_id, schema_ptr schema, const token_range& r) {
struct shard_estimate {
int64_t count = 0;
utils::estimated_histogram hist{0};
};
auto from_bytes = [] (auto& b) {
return dht::token::from_sstring(utf8_type->to_string(b));
};
@@ -169,14 +172,35 @@ static future<system_keyspace::range_estimates> estimate(const replica::column_f
wrapping_interval<dht::token>({{ from_bytes(r.start), false }}, {{ from_bytes(r.end) }}),
dht::token_comparator(),
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
for (auto&& r : ranges) {
auto rp_range = as_ring_position_range(r);
for (auto&& sstable : cf.select_sstables(rp_range)) {
count += co_await sstable->estimated_keys_for_range(r);
hist.merge(sstable->get_stats_metadata().estimated_partition_size);
// Estimate partition count and size distribution from sstables on a single shard.
auto estimate_on_shard = [cf_id, ranges] (replica::database& local_db) -> future<shard_estimate> {
auto table_ptr = local_db.get_tables_metadata().get_table_if_exists(cf_id);
if (!table_ptr) {
co_return shard_estimate{};
}
}
co_return system_keyspace::range_estimates{cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
auto& cf = *table_ptr;
shard_estimate result;
for (auto&& r : ranges) {
auto rp_range = as_ring_position_range(r);
for (auto&& sstable : cf.select_sstables(rp_range)) {
result.count += co_await sstable->estimated_keys_for_range(r);
result.hist.merge(sstable->get_stats_metadata().estimated_partition_size);
}
}
co_return result;
};
// Combine partial results from two shards.
auto reduce = [] (shard_estimate a, const shard_estimate& b) {
a.count += b.count;
a.hist.merge(b.hist);
return a;
};
auto aggregate = co_await db.container().map_reduce0(std::move(estimate_on_shard), shard_estimate{}, std::move(reduce));
int64_t mean_size = aggregate.count > 0 ? aggregate.hist.mean() : 0;
co_return system_keyspace::range_estimates{std::move(schema), r.start, r.end, aggregate.count, mean_size};
}
/**
@@ -321,7 +345,7 @@ size_estimates_mutation_reader::estimates_for_current_keyspace(std::vector<token
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
for (auto&& r : rows_to_estimate) {
auto& cf = _db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
estimates.push_back(co_await estimate(cf, r.tokens));
estimates.push_back(co_await estimate(_db, cf.schema()->id(), cf.schema(), r.tokens));
if (estimates.size() >= _slice.partition_row_limit()) {
co_return estimates;
}

View File

@@ -13,10 +13,28 @@
#############################################################################
from .util import new_test_table
import json
import pytest
from contextlib import contextmanager
from . import nodetool
from cassandra.protocol import Unauthorized
# Context manager that temporarily overrides Scylla's sstable_format
# configuration option via system.config, restoring the previous value on
# exit. Note that the cqlpy harness reuses a single Scylla node across
# tests, so restoration is necessary to avoid leaking the change into
# subsequent tests. The 'value' column is JSON-encoded on read but expects a
# raw string on write, so we JSON-decode the saved value before writing it
# back.
@contextmanager
def sstable_format(cql, fmt):
old = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'sstable_format'").one().value)
cql.execute(f"UPDATE system.config SET value = '{fmt}' WHERE name = 'sstable_format'")
try:
yield
finally:
cql.execute(f"UPDATE system.config SET value = '{old}' WHERE name = 'sstable_format'")
#############################################################################
# system.size_estimates.partitions_count
# Provides an estimate for the number of partitions in a table. A node
@@ -64,11 +82,14 @@ def write_table_and_estimate_partitions(cql, test_keyspace, N):
# up to 14%. So just to be generous let's allow a 25% inaccuracy for this
# small test. In issue #9083 we noted that Scylla had much larger errors -
# reporting as much as 10880 (!) partitions when we have just 1000.
@pytest.mark.xfail(reason="issue #9083")
def test_partitions_estimate_simple_small(cql, test_keyspace):
N = 1000
count = write_table_and_estimate_partitions(cql, test_keyspace, N)
assert count > N/1.25 and count < N*1.25
# The 25% accuracy threshold below relies on the 'ms' sstable format's
# cardinality estimator. The default format ('me' on this branch) has
# wider per-sstable error and would not meet this bound.
with sstable_format(cql, 'ms'):
N = 1000
count = write_table_and_estimate_partitions(cql, test_keyspace, N)
assert count > N/1.25 and count < N*1.25
# For a larger test, the estimation accuracy should be better:
# Experimentally, for 10,000 rows, Cassandra's estimation error goes
@@ -76,12 +97,12 @@ def test_partitions_estimate_simple_small(cql, test_keyspace):
# This is a relatively long test (takes around 2 seconds), and isn't
# needed to reproduce #9083 (the previous shorter test does it too),
# so we skip this test.
@pytest.mark.xfail(reason="issue #9083")
@pytest.mark.skip(reason="slow test, remove skip to try it anyway")
def test_partitions_estimate_simple_large(cql, test_keyspace):
N = 10000
count = write_table_and_estimate_partitions(cql, test_keyspace, N)
assert count > N/1.05 and count < N*1.05
with sstable_format(cql, 'ms'):
N = 10000
count = write_table_and_estimate_partitions(cql, test_keyspace, N)
assert count > N/1.05 and count < N*1.05
# If we write the *same* 1000 partitions to two sstables (by flushing twice,
# and assuming that 1000 tiny partitions easily fit a memtable), and check
@@ -95,24 +116,25 @@ def test_partitions_estimate_simple_large(cql, test_keyspace):
def test_partitions_estimate_full_overlap(cassandra_bug, cql, test_keyspace):
N = 500
with new_test_table(cql, test_keyspace, 'k int PRIMARY KEY') as table:
write = cql.prepare(f"INSERT INTO {table} (k) VALUES (?)")
for i in range(N):
cql.execute(write, [i])
nodetool.flush(cql, table)
# And a second copy of the *same* data will end up in a second sstable:
for i in range(N):
cql.execute(write, [i])
nodetool.flush(cql, table)
# TODO: In Scylla we should use NullCompactionStrategy to avoid the two
# sstables from immediately being compacted together.
nodetool.refreshsizeestimates(cql)
table_name = table[len(test_keyspace)+1:]
counts = [x.partitions_count for x in cql.execute(
f"SELECT partitions_count FROM system.size_estimates WHERE keyspace_name = '{test_keyspace}' AND table_name = '{table_name}'")]
count = sum(counts)
print(counts)
print(count)
assert count > N/1.5 and count < N*1.5
# Disable autocompaction to prevent the two sstables from being
# compacted together before we read the size estimates.
with nodetool.no_autocompaction_context(cql, table):
write = cql.prepare(f"INSERT INTO {table} (k) VALUES (?)")
for i in range(N):
cql.execute(write, [i])
nodetool.flush(cql, table)
# And a second copy of the *same* data will end up in a second sstable:
for i in range(N):
cql.execute(write, [i])
nodetool.flush(cql, table)
nodetool.refreshsizeestimates(cql)
table_name = table[len(test_keyspace)+1:]
counts = [x.partitions_count for x in cql.execute(
f"SELECT partitions_count FROM system.size_estimates WHERE keyspace_name = '{test_keyspace}' AND table_name = '{table_name}'")]
count = sum(counts)
print(counts)
print(count)
assert count > N/1.5 and count < N*1.5
# Test that deleted partitions should not be counted by the estimated
# partitions count. Unfortunately, the current state of both Cassandra