Trie-based indexes and older indexes have a difference in metrics, and the test uses the metrics to check for bypass cache. To choose the right metrics, it uses highest_supported_sstable_format, which is inappropriate, because the sstable format chosen for writes by Scylla might be different than highest_supported_sstable_format. Use chosen_sstable_format instead.
312 lines
15 KiB
Python
312 lines
15 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import logging
|
|
import random
|
|
import string
|
|
import time
|
|
from collections.abc import Callable
|
|
|
|
import pytest
|
|
|
|
from dtest_class import Tester, create_cf, create_ks, get_ip_from_node, chosen_sstable_format
|
|
from tools.data import create_c1c2_table, insert_c1c2
|
|
from tools.metrics import get_node_metrics
|
|
|
|
logger = logging.getLogger(__file__)
|
|
NUM_OF_QUERY_EXECUTIONS = 100
|
|
|
|
|
|
@pytest.mark.single_node
|
|
class TestBypassCache(Tester):
|
|
"""
|
|
Test that will verify if the select statement will skip cache during its read
|
|
Introduced by commit 2a371c2689d327a10f5888f39414ec66efedb093
|
|
"""
|
|
|
|
METRIC_VALIDATORS = {
|
|
"ignore": lambda before, after: "",
|
|
"not_changed": lambda before, after: "" if before == after else "should not be changed",
|
|
"increased_by_1": lambda before, after: "" if before + 1 == after else "should be increased by 1",
|
|
"increased_by_at_least_1": lambda before, after: "" if before < after else "should be increased by at least 1",
|
|
}
|
|
|
|
def gen_less_than(self, value):
|
|
return lambda before, after: "" if before + value >= after else f"changed by more than {value}"
|
|
|
|
def gen_more_than(self, value):
|
|
return lambda before, after: "" if before + value < after else f"changed by less than {value}"
|
|
|
|
def prepare( # noqa: PLR0913
|
|
self,
|
|
nodes=1,
|
|
keyspace_name="bypass_cache",
|
|
rf=1,
|
|
options_dict=None,
|
|
table_name="user_events",
|
|
insert_data=True,
|
|
smp=1,
|
|
cache_index_pages=None,
|
|
):
|
|
self.keyspace_name = keyspace_name
|
|
self.table_name = table_name
|
|
cluster = self.cluster
|
|
if options_dict:
|
|
cluster.set_configuration_options(values=options_dict)
|
|
jvm_args = ["--smp", str(smp)]
|
|
if cache_index_pages is not None:
|
|
jvm_args += ["--cache-index-pages", "1" if cache_index_pages else "0"]
|
|
cluster.populate(nodes).start(jvm_args=jvm_args)
|
|
node1 = cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node1)
|
|
self.tablets = "tablets" in self.scylla_features
|
|
create_ks(session=session, name=keyspace_name, rf=rf)
|
|
|
|
if insert_data:
|
|
create_c1c2_table(session)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, ks=keyspace_name)
|
|
|
|
self.sstable_format = chosen_sstable_format(node1)
|
|
|
|
return session
|
|
|
|
def get_scylla_cache_reads_metrics(self, node, metrics):
|
|
return get_node_metrics(get_ip_from_node(node), metrics=metrics)
|
|
|
|
def run_query_and_check_metrics(self, node, session, query, metrics_validators: dict[str, str | Callable], num_runs: int = NUM_OF_QUERY_EXECUTIONS):
|
|
metrics = [*metrics_validators.keys()]
|
|
metric_data_before = self.get_scylla_cache_reads_metrics(node=node, metrics=metrics)
|
|
logger.info("Metrics before query:\n%s", "\n".join(f"{metric}: {value}" for metric, value in metric_data_before.items()))
|
|
logger.info('Running query "%s" %s times', query, num_runs)
|
|
for _ in range(num_runs):
|
|
session.execute(query)
|
|
metric_data_after = self.get_scylla_cache_reads_metrics(node=node, metrics=metrics)
|
|
logger.info("Metrics after query:\n%s", "\n".join(f"{metric}: {value}" for metric, value in metric_data_after.items()))
|
|
metric_errors = []
|
|
for metric, validator in metrics_validators.items():
|
|
if isinstance(validator, str):
|
|
validator_fn = self.METRIC_VALIDATORS.get(validator)
|
|
if not validator_fn:
|
|
raise ValueError("Can't find validator with name %s", validator)
|
|
elif callable(validator):
|
|
validator_fn = validator
|
|
else:
|
|
raise ValueError("Only str or callable is acceptable")
|
|
before = metric_data_before.get(metric, 0)
|
|
after = metric_data_after.get(metric, 0)
|
|
error = validator_fn(before, after)
|
|
if error:
|
|
metric_errors.append(f"Metric {metric} {error}, before:{before}, after:{after}")
|
|
return metric_errors
|
|
|
|
def cache_thresh(self):
|
|
return 300 if not self.tablets else 150
|
|
|
|
def metric_name_for_index_cache_hits(self):
|
|
"""
|
|
`scylla_sstables_index_page_cache_*` are cache stats for the `cached_file` objects used by index readers.
|
|
`scylla_sstables_index_page_hits` is the name for hits in the "structured" cache (with parsed and materialized
|
|
"index pages" of Index.db)
|
|
|
|
BIG-index sstables (like "me") use both types of caches.
|
|
BTI-index sstables (like "ms") use only the former.
|
|
"""
|
|
return "scylla_sstables_index_page_cache_hits" if self.sstable_format == "ms" else "scylla_sstables_index_page_hits"
|
|
|
|
def verify_read_was_from_disk(self, node, query, session, index_cache_involved: bool = False):
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 remove index_cache_involved, assume that it is false
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query,
|
|
metrics_validators={
|
|
"scylla_cache_reads": self.gen_less_than(self.cache_thresh()),
|
|
# Internal reads can also use the sstable index page cache, so we
|
|
# cannot make assumptions about the index metrics not changing
|
|
self.metric_name_for_index_cache_hits(): self.gen_more_than(self.cache_thresh()) if index_cache_involved else "ignore",
|
|
"scylla_sstables_index_page_cache_misses": "increased_by_at_least_1" if index_cache_involved else "ignore",
|
|
"scylla_sstables_index_page_cache_populations": "increased_by_at_least_1" if index_cache_involved else "ignore",
|
|
},
|
|
)
|
|
assert not errors, "Running query that is suppose to read from disk following errors found:\n" + "\n".join(errors)
|
|
|
|
def verify_read_was_from_cache(self, node, query, session):
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query,
|
|
metrics_validators={
|
|
"scylla_cache_reads": self.gen_more_than(self.cache_thresh()),
|
|
self.metric_name_for_index_cache_hits(): "ignore",
|
|
"scylla_sstables_index_page_cache_misses": "ignore",
|
|
"scylla_sstables_index_page_cache_populations": "ignore",
|
|
},
|
|
)
|
|
assert not errors, "Running query that is suppose to read from cache following errors found:\n" + "\n".join(errors)
|
|
|
|
def test_simple_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
query = "SELECT * FROM cf BYPASS CACHE"
|
|
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session)
|
|
|
|
def test_multiple_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
time.sleep(3)
|
|
for _ in range(20):
|
|
query = "SELECT * FROM cf BYPASS CACHE"
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session)
|
|
|
|
def test_read_from_cache_and_then_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
|
|
no_bypass_query = "SELECT * FROM cf"
|
|
self.verify_read_was_from_cache(node=node, query=no_bypass_query, session=session)
|
|
|
|
bypass_query = "SELECT * FROM cf BYPASS CACHE"
|
|
self.verify_read_was_from_disk(node=node, query=bypass_query, session=session)
|
|
|
|
def insert_data_for_scan_range(self):
|
|
session = self.prepare(insert_data=False)
|
|
# create a table
|
|
create_cf(session=session, name="cf", key_type="int")
|
|
# populate table with k (int) and values (anything)
|
|
query = "INSERT INTO cf (key, c, v) VALUES ({}, '{}', '{}')"
|
|
for idx in range(NUM_OF_QUERY_EXECUTIONS):
|
|
varchar_c = "".join(random.choice(string.ascii_lowercase) for x in range(20))
|
|
varchar_v = "".join(random.choice(string.ascii_lowercase) for x in range(20))
|
|
session.execute(query.format(idx, varchar_c, varchar_v))
|
|
return session
|
|
|
|
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/6045")
|
|
def test_range_scan_bypass_cache(self):
|
|
session = self.insert_data_for_scan_range()
|
|
node = self.cluster.nodelist()[0]
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
"SELECT * FROM cf WHERE key > 10 and key < 20 ALLOW FILTERING",
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "not_changed",
|
|
},
|
|
num_runs=1,
|
|
)
|
|
assert not errors, "Running range query that is suppose to read from cache following errors found:\n" + "\n".join(errors)
|
|
|
|
errors = self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
"SELECT * FROM cf WHERE key > 10 and key < 20 ALLOW FILTERING BYPASS CACHE",
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "increased_by_1",
|
|
},
|
|
num_runs=1,
|
|
)
|
|
assert not errors, "Running range query that is suppose to read from disk following errors found:\n" + "\n".join(errors)
|
|
|
|
def test_full_scan_bypass_cache(self):
|
|
session = self.prepare()
|
|
node = self.cluster.nodelist()[0]
|
|
self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query="SELECT * FROM cf",
|
|
num_runs=1,
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "increased_by_1",
|
|
},
|
|
)
|
|
self.run_query_and_check_metrics(
|
|
node,
|
|
session,
|
|
query="SELECT * FROM cf BYPASS CACHE",
|
|
num_runs=1,
|
|
metrics_validators={
|
|
"select_partition_range_scan": "increased_by_1",
|
|
"select_partition_range_scan_no_bypass_cache": "not_changed",
|
|
},
|
|
)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_create_table_caching_disabled(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_alter_table_caching_disable(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
self.verify_read_was_from_cache(node=node, query=query, session=session)
|
|
# disabling caching for table and checking read comes from disk
|
|
session.execute(f"ALTER TABLE {self.table_name} WITH caching = {{'enabled':false}}")
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
|
|
@pytest.mark.parametrize("cache_index_pages", [True, False], ids=["cache_index_pages", "no_cache_index_pages"])
|
|
def test_alter_table_caching_enable(self, cache_index_pages: bool):
|
|
session = self.prepare(insert_data=False, cache_index_pages=cache_index_pages)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
node.flush()
|
|
query = f"select * from {self.table_name}"
|
|
# TODO: After https://github.com/scylladb/scylla/issues/9968 is solved, remove index_cache_involved=True
|
|
self.verify_read_was_from_disk(node=node, query=query, session=session, index_cache_involved=cache_index_pages)
|
|
# enabling caching for table and checking read comes from cache
|
|
session.execute(f"ALTER TABLE {self.table_name} WITH caching = {{'enabled':true}}")
|
|
self.verify_read_was_from_cache(node=node, query=query, session=session)
|
|
|
|
def verify_used_memory_grow(self, node, session):
|
|
grew = 0
|
|
metric = ["scylla_cache_bytes_used"]
|
|
for _ in range(NUM_OF_QUERY_EXECUTIONS):
|
|
cache_bytes_used_before_write = self.get_scylla_cache_reads_metrics(node=node, metrics=metric)[metric[0]]
|
|
insert_c1c2(session, keys=list(range(self.first_key + 10)), cf=self.table_name, ks=self.keyspace_name)
|
|
self.cluster.nodetool(f"flush -- {self.keyspace_name} {self.table_name}")
|
|
cache_bytes_used_after_write = self.get_scylla_cache_reads_metrics(node=node, metrics=metric)[metric[0]]
|
|
if cache_bytes_used_before_write < cache_bytes_used_after_write:
|
|
grew += 1
|
|
self.first_key += 10
|
|
return grew > NUM_OF_QUERY_EXECUTIONS / 2
|
|
|
|
def test_writes_caching_disabled(self):
|
|
session = self.prepare(insert_data=False)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name, caching=False)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
self.first_key = 0
|
|
assert not self.verify_used_memory_grow(node=node, session=session), "expected to have writes without cache"
|
|
alter_cmd = f"ALTER TABLE {self.keyspace_name}.{self.table_name} WITH CACHING = {{'enabled': 'true'}}"
|
|
session.execute(alter_cmd)
|
|
assert self.verify_used_memory_grow(node=node, session=session), "expected to have writes through cache"
|
|
|
|
def test_writes_caching_enabled(self):
|
|
session = self.prepare(insert_data=False)
|
|
node = self.cluster.nodelist()[0]
|
|
create_c1c2_table(session, cf=self.table_name)
|
|
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
|
self.first_key = 0
|
|
assert self.verify_used_memory_grow(node=node, session=session), "expected to have writes through cache"
|
|
alter_cmd = f"ALTER TABLE {self.keyspace_name}.{self.table_name} WITH CACHING = {{'enabled': 'false'}}"
|
|
session.execute(alter_cmd)
|
|
assert not self.verify_used_memory_grow(node=node, session=session), "expected to have writes without cache"
|