# Copyright 2024-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 import requests import time from collections import defaultdict from dataclasses import dataclass, field from ..cqlpy.util import new_test_table, new_test_keyspace, sleep_till_whole_second @dataclass class TombstonePurgeStats: attempts: int = 0 failures_due_to_memtables: int = 0 failures_due_to_other_sstables: int = 0 @dataclass class SStablesStats: input: list[dict] = field(default_factory=list) output: list[dict] = field(default_factory=list) def waitAndGetCompleteCompactionHistory(rest_api, table): # cql-pytest/run.py::run_scylla_cmd() passes "--smp 2" to scylla, so we # use this value to ensure compaction results from all shards arrived # to the table. SCYLLA_SMP_COUNT = 2 ks, cf = table.split('.') while True: response = rest_api.send("GET", "compaction_manager/compaction_history") assert response.status_code == requests.codes.ok table_entry_count = sum(1 for data in response.json() if data["ks"] == ks and data["cf"] == cf) if table_entry_count == SCYLLA_SMP_COUNT: return response assert table_entry_count < SCYLLA_SMP_COUNT time.sleep(0.2) def extractTombstonePurgeStatistics(response, ks) -> TombstonePurgeStats: ''' Extract compaction history statistics for a given keyspace ks from the response and squash it as there will be as many items as the number of shards. ''' stats = TombstonePurgeStats() for data in response.json(): if data["ks"] == ks: stats.attempts += data["total_tombstone_purge_attempt"] stats.failures_due_to_memtables += data["total_tombstone_purge_failure_due_to_overlapping_with_memtable"] stats.failures_due_to_other_sstables += data["total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable"] return stats def extractSStablesStatistics(response, ks) -> SStablesStats: ''' Extract compaction history statistics for a given keyspace ks from the response and squash it as there will be as many items as the number of shards. ''' stats = SStablesStats() for data in response.json(): if data["ks"] == ks: stats.input += data["sstables_in"] stats.output += data["sstables_out"] return stats def extractRowsMergedAsSortedList(response, ks): ''' Extract rows_merged statistics for a given keyspace ks from the response and squash it as there will be as many items as the number of shards. Return a sorted list of the form [{"key": , "value": }...] ''' total = defaultdict(int) for data in response.json(): if data["ks"] == ks: for rows in data["rows_merged"]: total[rows["key"]] += rows["value"] return [{"key": key, "value": value} for key, value in sorted(total.items())] def populateSomeData(cql, cf: str, pk_range: tuple[int], timestamp: int | None = None, step: int = 0): stmt = cql.prepare(f"INSERT INTO {cf} (pk, ck, v) VALUES (?, ?, ?) {'USING TIMESTAMP ?' if timestamp else ''}") for pk in range(*pk_range): for ck in range(1, 6): data = [pk, ck*11+100, 0] if timestamp is not None: timestamp += step data.append(timestamp) cql.execute(stmt, data) def alterSomeData(cql, cf: str, timestamp: int | None = None): using_timestamp = f"USING TIMESTAMP {timestamp}" if timestamp else '' cql.execute(f"DELETE FROM {cf} {using_timestamp} WHERE pk=1 and ck=122") cql.execute(f"DELETE FROM {cf} {using_timestamp} WHERE pk=5 and ck=155") cql.execute(f"DELETE FROM {cf} {using_timestamp} WHERE pk=3 and ck>111 AND ck<144") cql.execute(f"UPDATE {cf} {using_timestamp} SET v=100 WHERE pk=2 AND ck=122") cql.execute(f"DELETE FROM {cf} {using_timestamp} WHERE pk=5") def test_compactionhistory_rows_merged_null_compaction_strategy(cql, rest_api): with new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: with new_test_table(cql, ks, "pk int, ck int, v int, PRIMARY KEY (pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'};") as cf: populateSomeData(cql, cf, (1, 6)) response = rest_api.send("POST", f"storage_service/keyspace_flush/{ks}") assert response.status_code == requests.codes.ok alterSomeData(cql, cf) response = rest_api.send("POST", f"storage_service/keyspace_flush/{ks}") assert response.status_code == requests.codes.ok response = rest_api.send("POST", f"storage_service/keyspace_compaction/{ks}") assert response.status_code == requests.codes.ok response = waitAndGetCompleteCompactionHistory(rest_api, cf) assert extractRowsMergedAsSortedList(response, ks) == [{"key": 1, "value": 27}, {"key": 2, "value": 10}] def test_compactionhistory_rows_merged_time_window_compaction_strategy(cql, rest_api): with new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: compaction_opt = "{'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'MINUTES', 'compaction_window_size': 1}" with new_test_table(cql, ks, "pk int, ck int, v int, PRIMARY KEY (pk, ck)", f"WITH compaction = {compaction_opt};") as cf: now = int(time.time() * 1e6) # ms window_size = int(6e7) # 1 minutes in microseconds step = int(1e6) # 1 second in microseconds # Spread data across 2 windows by simulating a write process. `USING TIMESTAMP` is # provided to distribute the writes in the first one-minute window while updates and # deletes are propagated into the second 1-minute window. # # To assign a timestamp to a window in TWCS, we just divide it with the respective # duration and use the result as the window id (discarding the remainder). start = (now // window_size - 1)*window_size populateSomeData(cql, cf, (1, 6), start, step) response = rest_api.send("POST", f"storage_service/keyspace_flush/{ks}") assert response.status_code == requests.codes.ok alterSomeData(cql, cf, start + window_size) response = rest_api.send("POST", f"storage_service/keyspace_flush/{ks}") assert response.status_code == requests.codes.ok response = rest_api.send("POST", f"storage_service/keyspace_compaction/{ks}") assert response.status_code == requests.codes.ok response = waitAndGetCompleteCompactionHistory(rest_api, cf) assert extractRowsMergedAsSortedList(response, ks) == [{"key": 1, "value": 27}, {"key": 2, "value": 10}] def test_compactionhistory_tombstone_purge_statistics(cql, rest_api): with new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: with new_test_table(cql, ks, "pk int, ck int, v int, PRIMARY KEY (pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'} AND tombstone_gc = {'mode': 'immediate'};") as cf: timestamp = int(time.time()) populateSomeData(cql, cf, (1, 11), timestamp - 10) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok populateSomeData(cql, cf, (11, 21), timestamp - 5) alterSomeData(cql, cf, timestamp - 5) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok # Sleep a second to let the commitlog minimum gc time, in seconds, be greater than the tombstone deletion time sleep_till_whole_second(1) response = rest_api.send("POST", f"storage_service/keyspace_compaction/{ks}") assert response.status_code == requests.codes.ok response = waitAndGetCompleteCompactionHistory(rest_api, cf) stats = extractTombstonePurgeStatistics(response, ks) assert stats == TombstonePurgeStats(4, 0, 0) stats = extractSStablesStatistics(response, ks) assert len(stats.input) == 4 and len(stats.output) == 2 def test_compactionhistory_tombstone_purge_statistics_overlapping_with_memtable(cql, rest_api): with new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: with new_test_table(cql, ks, "pk int, ck int, v int, PRIMARY KEY (pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'} AND tombstone_gc = {'mode': 'immediate'};") as cf: timestamp = int(time.time()) populateSomeData(cql, cf, (11, 21), timestamp - 5) alterSomeData(cql, cf, timestamp - 5) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok # Sleep a second to let the commitlog minimum gc time, in seconds, be greater than the tombstone deletion time sleep_till_whole_second(1) populateSomeData(cql, cf, (1, 11), timestamp - 10) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok # Do not flush to keep it in memtable cql.execute(f"UPDATE {cf} USING TIMESTAMP {timestamp - 7} SET v=100 WHERE pk=1 AND ck=122") cql.execute(f"UPDATE {cf} USING TIMESTAMP {timestamp - 7} SET v=100 WHERE pk=7 AND ck=122") response = rest_api.send("POST", f"storage_service/keyspace_compaction/{ks}", {"flush_memtables": "false"}) assert response.status_code == requests.codes.ok response = waitAndGetCompleteCompactionHistory(rest_api, cf) stats = extractTombstonePurgeStatistics(response, ks) assert stats == TombstonePurgeStats(4, 1, 0) def test_compactionhistory_tombstone_purge_statistics_overlapping_with_other_sstables(cql, rest_api): with new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: compaction_opt = "{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'min_sstable_size': 0}" with new_test_table(cql, ks, "pk int, ck int, v int, PRIMARY KEY (pk, ck)", f"WITH compaction = {compaction_opt} AND tombstone_gc = {{'mode': 'immediate'}};") as cf: timestamp = int(time.time()) cql.execute(f"UPDATE {cf} USING TIMESTAMP {timestamp - 7} SET v=100 WHERE pk=1 AND ck=122") cql.execute(f"UPDATE {cf} USING TIMESTAMP {timestamp - 7} SET v=100 WHERE pk=7 AND ck=122") response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok # Now produce two additional sstable that will get into the same bucket # and hence be compacted together but not with the sstable from above. populateSomeData(cql, cf, (11, 21), timestamp - 5) alterSomeData(cql, cf, timestamp - 5) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok # Sleep a second to let the commitlog minimum gc time, in seconds, be greater than the tombstone deletion time sleep_till_whole_second(1) populateSomeData(cql, cf, (1, 11), timestamp - 10) response = rest_api.send("POST", "storage_service/flush") assert response.status_code == requests.codes.ok response = waitAndGetCompleteCompactionHistory(rest_api, cf) stats = extractTombstonePurgeStatistics(response, ks) assert stats == TombstonePurgeStats(4, 0, 1) stats = extractSStablesStatistics(response, ks) assert len(stats.input) == 4 and len(stats.output) == 2