The test looks at metrics to confirm whether queries hit the row cache, the index cache or the disk, depending on various settings. BIG index readers use a two level, read-through index cache, where the higher layer stores parsed "index pages" of Index.db, while the lower layer is a cache of raw 4kiB file pages of Index.db. Therefore, if we want to count index cache hits, the appropriate metric to check in this case is `scylla_sstables_index_page_hits", which counts hits in the higher layer. This is done by the test. However, BTI index readers don't have an equivalent of the higher cache layer. Their cache only stores the raw 4 kiB pages, and the hits are counted in `scylla_sstables_index_page_cache_hits`. (The same metric is incremented by the lower layer of the BIG index cache). Before this commit, the test would fail with `ms` sstables, because their reads don't increment `scylla_sstables_index_page_hits`. In this commit we adapt the test so that it instead checks `scylla_sstables_index_page_cache_hits` for `ms` sstables.
312 lines
15 KiB
Python
312 lines
15 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import logging
|
|
import random
|
|
import string
|
|
import time
|
|
from collections.abc import Callable
|
|
|
|
import pytest
|
|
|
|
from dtest_class import Tester, create_cf, create_ks, get_ip_from_node, highest_supported_sstable_format
|
|
from tools.data import create_c1c2_table, insert_c1c2
|
|
from tools.metrics import get_node_metrics
|
|
|
|
logger = logging.getLogger(__file__)
|
|
NUM_OF_QUERY_EXECUTIONS = 100
|
|
|
|
|
|
@pytest.mark.single_node
|
|
class TestBypassCache(Tester):
|
|
"""
|
|
Test that will verify if the select statement will skip cache during its read
|
|
Introduced by commit 2a371c2689d327a10f5888f39414ec66efedb093
|
|
"""
|
|
|
|
METRIC_VALIDATORS = {
|
|
"ignore": lambda before, after: "",
|
|
"not_changed": lambda before, after: "" if before == after else "should not be changed",
|
|
"increased_by_1": lambda before, after: "" if before + 1 == after else "should be increased by 1",
|
|
"increased_by_at_least_1": lambda before, after: "" if before < after else "should be increased by at least 1",
|
|
}
|
|
|
|
def gen_less_than(self, value):
|
|
return lambda before, after: "" if before + value >= after else f"changed by more than {value}"
|
|
|
|
def gen_more_than(self, value):
|
|
return lambda before, after: "" if before + value < after else f"changed by less than {value}"
|
|
|
|
def prepare( # noqa: PLR0913
|
|
self,
|
|
nodes=1,
|
|
keyspace_name="bypass_cache",
|
|
rf=1,
|
|
options_dict=None,
|
|
table_name="user_events",
|
|
insert_data=True,
|
|
smp=1,
|
|
cache_index_pages=None,
|
|
):
|
|
self.keyspace_name = keyspace_name
|
|
self.table_name = table_name
|
|
cluster = self.cluster
|
|
if options_dict:
|
|
cluster.set_configuration_options(values=options_dict)
|
|
jvm_args = ["--smp", str(smp)]
|
|
if cache_index_pages is not None:
|
|
jvm_args += ["--cache-index-pages", "1" if cache_index_pages else "0"]
|
|
cluster.populate(nodes).start(jvm_args=jvm_args)
|
|
node1 = cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node1)
|
|
self.tablets = "tablets" in self.scylla_features
|
|
create_ks(session=session, name=keyspace_name, rf=rf)
|
|
|
|
if insert_data:
|
|
create_c1c2_table(session)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, ks=keyspace_name)
|
|
|
|
self.sstable_format = highest_supported_sstable_format(node1)
|
|
|
|
return session
|
|
|
|
def get_scylla_cache_reads_metrics(self, node, metrics):
|
|
return get_node_metrics(get_ip_from_node(node), metrics=metrics)
|
|
|
|
def run_query_and_check_metrics(self, node, session, query, metrics_validators: dict[str, str | Callable], num_runs: int = NUM_OF_QUERY_EXECUTIONS):
|
|
metrics = [*metrics_validators.keys()]
|
|
metric_data_before = self.get_scylla_cache_reads_metrics(node=node, metrics=metrics)
|
|
logger.info("Metrics before query:\n%s", "\n".join(f"{metric}: {value}" for metric, value in metric_data_before.items()))
|
|
logger.info('Running query "%s" %s times', query, num_runs)
|
|
for _ in range(num_runs):
|
|
session.execute(query)
|
|
metric_data_after = self.get_scylla_cache_reads_metrics(node=node, metrics=metrics)
|
|
logger.info("Metrics after query:\n%s", "\n".join(f"{metric}: {value}" for metric, value in metric_data_after.items()))
|
|
metric_errors = []
|
|
for metric, validator in metrics_validators.items():
|
|
if isinstance(validator, str):
|
|
validator_fn = self.METRIC_VALIDATORS.get(validator)
|
|
if not validator_fn:
|
|
raise ValueError("Can't find validator with name %s", validator)
|
|
elif callable(validator):
|
|
validator_fn = validator
|
|
else:
|
|
raise ValueError("Only str or callable is acceptable")
|
|
before = metric_data_before.get(metric, 0)
|
|
after = metric_data_after.get(metric, 0)
|
|
error = validator_fn(before, after)
|
|
if error:
|
|
metric_errors.append(f"Metric {metric} {error}, before:{before}, after:{after}")
|
|
return metric_errors
|
|
|
|
def cache_thresh(self):
|
|
return 800 if not self.tablets else 150
|
|
|
|
def metric_name_for_index_cache_hits(self):
|
|
"""
|
|
`scylla_sstables_index_page_cache_*` are cache stats for the `cached_file` objects used by index readers.
|
|
`scylla_sstables_index_page_hits` is the name for hits in the "structured" cache (with parsed and materialized
|
|
"index pages" of Index.db)
|
|
|
|
BIG-index sstables (like "me") use both types of caches.
|
|
BTI-index sstables (like "ms") use only the former.
|
|
"""
|
|
return "scylla_sstables_index_page_cache_hits" if self.sstable_format == "ms" else "scylla_sstables_index_page_hits"
|
|
|
|
def verify_read_was_from_disk(self, node, query, session, index_cache_involved: bool = False):
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 remove index_cache_involved, assume that it is false
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query,
|
|
metrics_validators={
|
|
"scylla_cache_reads": self.gen_less_than(self.cache_thresh()),
|
|
# Internal reads can also use the sstable index page cache, so we
|
|
# cannot make assumptions about the index metrics not changing
|
|
self.metric_name_for_index_cache_hits(): self.gen_more_than(self.cache_thresh()) if index_cache_involved else "ignore",
|
|
"scylla_sstables_index_page_cache_misses": "increased_by_at_least_1" if index_cache_involved else "ignore",
|
|
"scylla_sstables_index_page_cache_populations": "increased_by_at_least_1" if index_cache_involved else "ignore",
|
|
},
|
|
)
|
|
assert not errors, "Running query that is suppose to read from disk following errors found:\n" + "\n".join(errors)
|
|
|
|
def verify_read_was_from_cache(self, node, query, session):
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query,
|
|
metrics_validators={
|
|
"scylla_cache_reads": self.gen_more_than(self.cache_thresh()),
|
|
self.metric_name_for_index_cache_hits(): "ignore",
|
|
"scylla_sstables_index_page_cache_misses": "ignore",
|
|
"scylla_sstables_index_page_cache_populations": "ignore",
|
|
},
|
|
)
|
|
assert not errors, "Running query that is suppose to read from cache following errors found:\n" + "\n".join(errors)
|
|
|
|
def test_simple_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
query = "SELECT * FROM cf BYPASS CACHE"
|
|
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session)
|
|
|
|
def test_multiple_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
time.sleep(3)
|
|
for _ in range(20):
|
|
query = "SELECT * FROM cf BYPASS CACHE"
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session)
|
|
|
|
def test_read_from_cache_and_then_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
|
|
no_bypass_query = "SELECT * FROM cf"
|
|
self.verify_read_was_from_cache(node=node, query=no_bypass_query, session=session)
|
|
|
|
bypass_query = "SELECT * FROM cf BYPASS CACHE"
|
|
self.verify_read_was_from_disk(node=node, query=bypass_query, session=session)
|
|
|
|
def insert_data_for_scan_range(self):
|
|
session = self.prepare(insert_data=False)
|
|
# create a table
|
|
create_cf(session=session, name="cf", key_type="int")
|
|
# populate table with k (int) and values (anything)
|
|
query = "INSERT INTO cf (key, c, v) VALUES ({}, '{}', '{}')"
|
|
for idx in range(NUM_OF_QUERY_EXECUTIONS):
|
|
varchar_c = "".join(random.choice(string.ascii_lowercase) for x in range(20))
|
|
varchar_v = "".join(random.choice(string.ascii_lowercase) for x in range(20))
|
|
session.execute(query.format(idx, varchar_c, varchar_v))
|
|
return session
|
|
|
|
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/6045")
|
|
def test_range_scan_bypass_cache(self):
|
|
session = self.insert_data_for_scan_range()
|
|
node = self.cluster.nodelist()[0]
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
"SELECT * FROM cf WHERE key > 10 and key < 20 ALLOW FILTERING",
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "not_changed",
|
|
},
|
|
num_runs=1,
|
|
)
|
|
assert not errors, "Running range query that is suppose to read from cache following errors found:\n" + "\n".join(errors)
|
|
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
"SELECT * FROM cf WHERE key > 10 and key < 20 ALLOW FILTERING BYPASS CACHE",
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "increased_by_1",
|
|
},
|
|
num_runs=1,
|
|
)
|
|
assert not errors, "Running range query that is suppose to read from disk following errors found:\n" + "\n".join(errors)
|
|
|
|
def test_full_scan_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query="SELECT * FROM cf",
|
|
num_runs=1,
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "increased_by_1",
|
|
},
|
|
)
|
|
self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query="SELECT * FROM cf BYPASS CACHE",
|
|
num_runs=1,
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "not_changed",
|
|
},
|
|
)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_create_table_caching_disabled(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_alter_table_caching_disable(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
self.verify_read_was_from_cache(node=node, query=query, session=session)
|
|
# disabling caching for table and checking read comes from disk
|
|
session.execute(f"ALTER TABLE {self.table_name} WITH caching = {{'enabled':false}}")
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_alter_table_caching_enable(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
# enabling caching for table and checking read comes from cache
|
|
session.execute(f"ALTER TABLE {self.table_name} WITH caching = {{'enabled':true}}")
|
|
self.verify_read_was_from_cache(node=node, query=query, session=session)
|
|
|
|
def verify_used_memory_grow(self, node, session):
|
|
grew = 0
|
|
metric = ["scylla_cache_bytes_used"]
|
|
for _ in range(NUM_OF_QUERY_EXECUTIONS):
|
|
cache_bytes_used_before_write = self.get_scylla_cache_reads_metrics(node=node, metrics=metric)[metric[0]]
|
|
insert_c1c2(session, keys=list(range(self.first_key + 10)), cf=self.table_name, ks=self.keyspace_name)
|
|
self.cluster.nodetool(f"flush -- {self.keyspace_name} {self.table_name}")
|
|
cache_bytes_used_after_write = self.get_scylla_cache_reads_metrics(node=node, metrics=metric)[metric[0]]
|
|
if cache_bytes_used_before_write < cache_bytes_used_after_write:
|
|
grew += 1
|
|
self.first_key += 10
|
|
return grew > NUM_OF_QUERY_EXECUTIONS / 2
|
|
|
|
def test_writes_caching_disabled(self):
|
|
session = self.prepare(insert_data=False)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
self.first_key = 0
|
|
assert not self.verify_used_memory_grow(node=node, session=session), "expected to have writes without cache"
|
|
alter_cmd = f"ALTER TABLE {self.keyspace_name}.{self.table_name} WITH CACHING = {{'enabled': 'true'}}"
|
|
session.execute(alter_cmd)
|
|
assert self.verify_used_memory_grow(node=node, session=session), "expected to have writes through cache"
|
|
|
|
def test_writes_caching_enabled(self):
|
|
session = self.prepare(insert_data=False)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
self.first_key = 0
|
|
assert self.verify_used_memory_grow(node=node, session=session), "expected to have writes through cache"
|
|
alter_cmd = f"ALTER TABLE {self.keyspace_name}.{self.table_name} WITH CACHING = {{'enabled': 'false'}}"
|
|
session.execute(alter_cmd)
|
|
assert not self.verify_used_memory_grow(node=node, session=session), "expected to have writes without cache"
|