diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index 9bda0afd1d..8bdd6086e3 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -2200,6 +2200,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key _static_row_is_alive = false; _live_rows = 0; _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed)); + _used_at_entry = _memory_accounter.used_memory(); } void reconcilable_result_builder::consume(tombstone t) { @@ -2220,7 +2221,7 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom } _live_rows += is_alive; auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema)); - if (is_alive) { + if (is_alive || _slice.options.contains()) { // We are considering finishing current read only after consuming a // live clustering row. While sending a single live row is enough to // guarantee progress, not ending the result on a live row would @@ -2261,6 +2262,17 @@ stop_iteration reconcilable_result_builder::consume_end_of_partition() { } _total_live_rows += _live_rows; _result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() }); + + auto accounted = _memory_accounter.used_memory() - _used_at_entry; + auto actually_used = sizeof(partition) + _result.back().mut().representation().size(); + if (actually_used > accounted) { + _memory_accounter.update(actually_used - accounted); + } + + if (_slice.options.contains()) { + _stop = _stop || _memory_accounter.check(); + } + return _stop; } diff --git a/mutation_query.hh b/mutation_query.hh index 619bf46aca..01a77cbfbe 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -133,6 +133,7 @@ class reconcilable_result_builder { uint64_t _live_rows{}; // make this the last member so it is destroyed first. #7240 utils::chunked_vector _result; + size_t _used_at_entry; private: stop_iteration consume(range_tombstone&& rt); diff --git a/query-request.hh b/query-request.hh index 5281746c25..d4e008c50b 100644 --- a/query-request.hh +++ b/query-request.hh @@ -177,6 +177,11 @@ public: // directly, bypassing the intermediate reconcilable_result format used // in pre 4.5 range scans. range_scan_data_variant, + // When set, mutation query can end a page even if there is no live row in the + // final reconcilable_result. This prevents exchanging large pages when there + // is a lot of dead rows. This flag is needed during rolling upgrades to support + // old coordinators which do not tolerate pages with no live rows. + allow_mutation_read_page_without_live_row, }; using option_set = enum_set>; + option::range_scan_data_variant, + option::allow_mutation_read_page_without_live_row>>; clustering_row_ranges _row_ranges; public: column_id_vector static_columns; // TODO: consider using bitmap diff --git a/replica/mutation_dump.cc b/replica/mutation_dump.cc index a43fecb53e..7d3b6b5f53 100644 --- a/replica/mutation_dump.cc +++ b/replica/mutation_dump.cc @@ -604,7 +604,11 @@ future>> dump_mutations( std::rethrow_exception(std::move(ex)); } - dk_opt = co_await partition_key_generator(); + if (compaction_state->are_limits_reached() || qs.builder.is_short_read()) { + dk_opt = {}; + } else { + dk_opt = co_await partition_key_generator(); + } } co_return make_lw_shared(qs.builder.build(compaction_state->current_full_position())); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b3dfd74a67..596bfcc102 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4980,6 +4980,8 @@ protected: data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); + cmd->slice.options.set(); + // Waited on indirectly. make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout); @@ -5003,13 +5005,18 @@ protected: // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry). - // So in particular, if no host returned count live columns, we know it's not a short read. - bool can_send_short_read = rr_opt && rr_opt->is_short_read() && rr_opt->row_count() > 0; - if (rr_opt && (can_send_short_read || data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit() + // So in particular, if no host returned count live columns, we know it's not a short read due to + // row or partition limits being exhausted and retry is not needed. + if (rr_opt && (rr_opt->is_short_read() + || data_resolver->all_reached_end() + || rr_opt->row_count() >= original_row_limit() || data_resolver->live_partition_count() >= original_partition_limit()) && !data_resolver->any_partition_short_read()) { + tracing::trace(_trace_state, "Read stage is done for read-repair"); + mlogger.trace("reconciled: {}", rr_opt->pretty_printer(_schema)); auto result = ::make_foreign(::make_lw_shared( co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit))); + qlogger.trace("reconciled: {}", result->pretty_printer(_schema, _cmd->slice)); // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) @@ -5032,6 +5039,7 @@ protected: on_read_resolved(); }); } else { + tracing::trace(_trace_state, "Not enough data, need a retry for read-repair"); _proxy->get_stats().read_retries++; _retry_cmd = make_lw_shared(*cmd); // We asked t (= cmd->get_row_limit()) live columns and got l (=data_resolver->total_live_count) ones. @@ -5138,6 +5146,7 @@ public: exec->_targets.erase(i, exec->_targets.end()); } } + tracing::trace(exec->_trace_state, "digest mismatch, starting read repair"); exec->reconcile(exec->_cl, timeout); exec->_proxy->get_stats().read_repair_repaired_blocking++; } @@ -5343,6 +5352,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size() || (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) { + tracing::trace(trace_state, "Creating never_speculating_read_executor - speculative retry is disabled or there are no extra replicas to speculate with"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } @@ -5350,6 +5360,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). + tracing::trace(trace_state, "always_speculating_read_executor (all targets)"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } @@ -5358,16 +5369,20 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw auto local_dc_filter = erm->get_topology().get_local_dc_filter(); if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) { slogger.trace("read executor no extra target to speculate"); + tracing::trace(trace_state, "Creating never_speculating_read_executor - there are no extra replicas to speculate with"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else { target_replicas.push_back(*extra_replica); slogger.trace("creating read executor with extra target {}", *extra_replica); + tracing::trace(trace_state, "Added extra target {} for speculative read", *extra_replica); } } if (retry_type == speculative_retry::type::ALWAYS) { + tracing::trace(trace_state, "Creating always_speculating_read_executor"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else {// PERCENTILE or CUSTOM. + tracing::trace(trace_state, "Creating speculating_read_executor"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } } diff --git a/test/cql-pytest/test_select_from_mutation_fragments.py b/test/cql-pytest/test_select_from_mutation_fragments.py index 2b7d71eea6..c795f6e868 100644 --- a/test/cql-pytest/test_select_from_mutation_fragments.py +++ b/test/cql-pytest/test_select_from_mutation_fragments.py @@ -422,3 +422,30 @@ def test_ck_in_query(cql, test_table): for col_name, expected_value in zip(columns, expected_row): assert hasattr(row, col_name) assert getattr(row, col_name) == expected_value + + +def test_many_partitions(cql, test_keyspace, scylla_only): + num_partitions = 5000 + with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table: + delete_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ?") + for pk in range(num_partitions): + cql.execute(delete_id, (pk,)) + + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({table})")) + pks = set() + partition_starts = 0 + partition_ends = 0 + for row in res: + assert row.pk >= 0 and row.pk < num_partitions + if row.mutation_fragment_kind == "partition start": + partition_starts += 1 + pks.add(row.pk) + elif row.mutation_fragment_kind == "partition end": + partition_ends += 1 + assert row.pk in pks + else: + pytest.fail(f"Unexpected mutation fragment kind: {row.mutation_fragment_kind}") + + assert partition_starts == num_partitions + assert partition_ends == num_partitions + assert len(pks) == num_partitions diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 877c62f489..34e59afe4e 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -310,3 +310,9 @@ class ManagerClient(): logger.debug("ManagerClient getting log filename for %s", server_id) log_filename = await self.client.get_text(f"/cluster/server/{server_id}/get_log_filename") return ScyllaLogFile(self.thread_pool, log_filename) + + async def server_get_workdir(self, server_id: ServerNum) -> str: + return await self.client.get_text(f"/cluster/server/{server_id}/workdir") + + async def server_get_exe(self, server_id: ServerNum) -> str: + return await self.client.get_text(f"/cluster/server/{server_id}/exe") diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 77cfe1e646..511c4a3009 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -230,6 +230,24 @@ class ScyllaRESTAPIClient(): assert level in ["debug", "info", "warning", "trace"] await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip) + async def load_new_sstables(self, node_ip: str, keyspace: str, table: str) -> None: + """Load sstables from upload directory""" + await self.client.post(f"/storage_service/sstables/{keyspace}?cf={table}", host=node_ip) + + async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None: + """Flush the specified or all tables in the keyspace""" + url = f"/storage_service/keyspace_flush/{keyspace}" + if table is not None: + url += "?cf={table}" + await requests.post(url, host=node_ip) + + async def keyspace_compaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None: + """Compact the specified or all tables in the keyspace""" + url = f"/storage_service/keyspace_compaction/{keyspace}" + if table is not None: + url += "?cf={table}" + await requests.post(url, host=node_ip) + class ScyllaMetrics: def __init__(self, lines: list[str]): diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index d286e348d1..7bd6ee9112 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -1069,6 +1069,8 @@ class ScyllaClusterManager: add_put('/cluster/server/{server_id}/update_config', self._server_update_config) add_put('/cluster/server/{server_id}/change_ip', self._server_change_ip) add_get('/cluster/server/{server_id}/get_log_filename', self._server_get_log_filename) + add_get('/cluster/server/{server_id}/workdir', self._server_get_workdir) + add_get('/cluster/server/{server_id}/exe', self._server_get_exe) async def _manager_up(self, _request) -> aiohttp.web.Response: return aiohttp.web.Response(text=f"{self.is_running}") @@ -1279,13 +1281,27 @@ class ScyllaClusterManager: ip_addr = await self.cluster.change_ip(server_id) return aiohttp.web.json_response({"ip_addr": ip_addr}) - async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + async def _server_get_attribute(self, request: aiohttp.web.Request, attribute: str) -> aiohttp.web.Response: + """Generic request handler which gets a particular attribute of a ScyllaServer instance + + To be used to implement concrete handlers, not for direct use. + """ assert self.cluster server_id = ServerNum(int(request.match_info["server_id"])) server = self.cluster.servers[server_id] if not server: return aiohttp.web.Response(status=404, text=f"Server {server_id} unknown") - return aiohttp.web.Response(text=f"{server.log_filename}") + return aiohttp.web.Response(text=f"{getattr(server, attribute)}") + + async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "log_filename") + + async def _server_get_workdir(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "workdir") + + async def _server_get_exe(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "exe") + @asynccontextmanager diff --git a/test/topology_custom/test_read_repair.py b/test/topology_custom/test_read_repair.py new file mode 100644 index 0000000000..86c3cc3d2d --- /dev/null +++ b/test/topology_custom/test_read_repair.py @@ -0,0 +1,315 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +import datetime +import glob +import json +import logging +import os +import pytest +import random +import struct +import subprocess +import tempfile +import time +from typing import TypeAlias, Any + +from cassandra.cluster import ConsistencyLevel, Session # type: ignore +from cassandra.query import SimpleStatement # type: ignore +from cassandra.pool import Host # type: ignore +from cassandra.murmur3 import murmur3 # type: ignore + +from test.pylib.util import wait_for_cql_and_get_hosts +from test.pylib.internal_types import ServerInfo + + +logger = logging.getLogger(__name__) + + +def serialize_int(i: int) -> str: + return struct.pack(">l", i).hex() + + +def serialize_key(i: int) -> str: + return struct.pack(">hl", 4, i).hex() + + +class row_tombstone_data: + pk = 0 + v = 1 + + column_spec = "pk int, ck int, v int, PRIMARY KEY (pk, ck)" + select_query = f"SELECT * FROM ks.tbl WHERE pk = {pk}" + unique_key = 'ck' + + @classmethod + def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int, + deletion_time: datetime.datetime): + rows = [] + formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S") + serialized_value = serialize_int(cls.v) + for ck in range(total_rows): + row = { + "type": "clustering-row", + "key": {"raw": serialize_key(ck)}, + } + if ck in live_rows: + row["marker"] = {"timestamp": live_timestamp} + row["columns"] = {"v": { + "is_live": True, + "type": "regular", + "timestamp": live_timestamp, + "value": serialized_value, + }} + else: + row["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time} + rows.append(row) + + assert len(rows) == total_rows + + return [ + { + "key": {"raw": serialize_key(cls.pk)}, + "clustering_elements": rows, + }, + ] + + @classmethod + def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None: + assert row.pk == cls.pk + if row.partition_region != 2: + return None + if "tombstone" in json.loads(row.metadata): + is_live = False + else: + is_live = True + cols = json.loads(row.value) + assert cols["v"] == str(cls.v) + return row.ck, is_live + + @classmethod + def check_result_row(cls, i: int, row) -> None: + assert row.pk == cls.pk + assert row.ck == i + assert row.v == cls.v + + +class partition_tombstone_data: + v = 1 + + column_spec = "pk int PRIMARY KEY, v int" + select_query = "SELECT * FROM ks.tbl" + unique_key = 'pk' + + partition_tombstone_timestamp = None + partition_live = False + + class PartitionKey: + def __init__(self, value): + self.value = value + self.raw = serialize_key(self.value) + self.token = murmur3(struct.pack(">l", self.value)) + + def __lt__(self, o): + return self.token < o.token + + @classmethod + def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int, + deletion_time: datetime.datetime): + partitions = [] + formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S") + serialized_value = serialize_int(cls.v) + pks = sorted([cls.PartitionKey(pk) for pk in range(total_rows)]) + for pk in pks: + partition: dict[str, Any] = { + "key": {"raw": pk.raw, "token": pk.token}, + } + if pk.value in live_rows: + partition["clustering_elements"] = [ + { + "type": "clustering-row", + "key": {"raw": ""}, + "marker": {"timestamp": live_timestamp}, + "columns": {"v": { + "is_live": True, + "type": "regular", + "timestamp": live_timestamp, + "value": serialized_value, + }}, + }, + ] + else: + partition["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time} + + partitions.append(partition) + + assert len(partitions) == total_rows + return partitions + + @classmethod + def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None: + logger.info(f"check_mutation_row(): kind={row.mutation_fragment_kind}, pk={row.pk}, metadata={row.metadata}," + " value={row.value}") + if row.partition_region == 0: + cls.partition_tombstone_timestamp = json.loads(row.metadata)["tombstone"].get("timestamp") + return None + elif row.partition_region == 3: + is_live = cls.partition_live + cls.partition_tombstone_timestamp = None + cls.partition_live = False + return row.pk, is_live + elif row.partition_region != 2: + return None + + metadata = json.loads(row.metadata) + try: + v = metadata["columns"]["v"] + cls.partition_live = v["is_live"] + assert cls.partition_tombstone_timestamp is None or v["timestamp"] > cls.partition_tombstone_timestamp + cols = json.loads(row.value) + assert cols["v"] == str(cls.v) + except KeyError: + cls.partition_live = False + return None + + @classmethod + def check_result_row(cls, i: int, row) -> None: + assert row.pk == i + assert row.v == cls.v + + +incremental_repair_test_data = [pytest.param(row_tombstone_data, id="row-tombstone"), + pytest.param(partition_tombstone_data, id="partition-tombstone")] + + +@pytest.fixture(scope="function") +def workdir(): + with tempfile.TemporaryDirectory() as tmp_dir: + yield tmp_dir + + +@pytest.mark.parametrize("data_class", incremental_repair_test_data) +@pytest.mark.asyncio +async def test_incremental_read_repair(data_class, workdir, manager): + """Stress the incremental read repair logic + + Write a long stream of row tombstones, with a live row before and after. + """ + seed = int(time.time()) + seed = 1694170155 + logger.info(f"random-seed: {seed}") + random.seed(seed) + cmdline = ["--hinted-handoff-enabled", "0", "--query-tombstone-page-limit", "1000"] + node1 = await manager.server_add(cmdline=cmdline) + node2 = await manager.server_add(cmdline=cmdline) + + cql = manager.get_cql() + + host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30) + + cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") + table_schema = f"CREATE TABLE ks.tbl ({data_class.column_spec}) WITH speculative_retry = 'NONE'" + cql.execute(table_schema) + + schema_file_path = os.path.join(workdir, "schema.cql") + with open(schema_file_path, "w") as schema_file: + schema_file.write(table_schema) + + dead_timestamp = int(time.time()) + live_timestamp = dead_timestamp + 1 + + total_rows = 20000 + max_live_rows = 32 + deletion_time = datetime.datetime.now() + + row_set: TypeAlias = set[int] + + async def generate_and_upload_sstable(node: ServerInfo, node_row: int) -> row_set: + live_rows = {random.randint(0, total_rows - 1) for _ in range(random.randint(0, max_live_rows))} + live_rows.add(node_row) + + sstable = data_class.generate_sstable(total_rows, live_rows, dead_timestamp, live_timestamp, deletion_time) + scylla_exe = await manager.server_get_exe(node.server_id) + node_workdir = await manager.server_get_workdir(node.server_id) + table_upload_dir = glob.glob(os.path.join(node_workdir, "data", "ks", "tbl-*", "upload"))[0] + + input_file_path = os.path.join(workdir, f"node{node.server_id}.sstable.json") + with open(input_file_path, "w") as f: + json.dump(sstable, f, indent=4) + + subprocess.check_call([ + scylla_exe, "sstable", "write", + "--schema-file", schema_file_path, + "--input-file", input_file_path, + "--output-dir", table_upload_dir, + "--generation", "1"]) + + await manager.api.load_new_sstables(node.ip_addr, "ks", "tbl") + + return live_rows + + node1_rows = await generate_and_upload_sstable(node1, 0) + node2_rows = await generate_and_upload_sstable(node2, total_rows - 1) + all_rows = node1_rows | node2_rows + assert len(all_rows) >= 2 + + logger.info(f"node1_rows: {len(node1_rows)} rows, row ids: {node1_rows}") + logger.info(f"node2_rows: {len(node2_rows)} rows, row ids: {node2_rows}") + logger.info(f"all_rows: {len(all_rows)} rows, row ids: {all_rows}") + + def check_rows(cql: Session, host: Host, expected_live_rows: row_set) -> None: + actual_live_rows = set() + actual_dead_rows = set() + for row in cql.execute("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl)", host=host): + res = data_class.check_mutation_row(row, expected_live_rows) + if res is None: + continue + row_id, is_live = res + if is_live: + actual_live_rows.add(row_id) + else: + actual_dead_rows.add(row_id) + + # Account rows that have a tombstone but are live only once. + actual_dead_rows -= actual_live_rows + + assert actual_live_rows == expected_live_rows + assert len(actual_live_rows) + len(actual_dead_rows) == total_rows + + logger.info("Check rows with CL=ONE before read-repair") + check_rows(cql, host1, node1_rows) + check_rows(cql, host2, node2_rows) + + logger.info("Run read-repair") + res = cql.execute(SimpleStatement(data_class.select_query, consistency_level=ConsistencyLevel.ALL)) + res_rows = [] + pages = [] + while True: + res_rows.extend(list(res.current_rows)) + pages.append(list(res.current_rows)) + if res.has_more_pages: + res.fetch_next_page() + else: + break + + logger.debug(f"repair: {len(pages)} pages: {pages}") + assert len(pages) > 1 + assert len(res_rows) == len(all_rows) + actual_row_ids = set() + for res_row in res_rows: + row_id = getattr(res_row, data_class.unique_key) + actual_row_ids.add(row_id) + assert row_id in all_rows + data_class.check_result_row(row_id, res_row) + assert actual_row_ids == all_rows + + for node in (node1, node2): + manager.api.keyspace_flush(node.ip_addr, "ks") + manager.api.keyspace_compaction(node.ip_addr, "ks") + + logger.info("Check rows with CL=ONE after read-repair") + check_rows(cql, host1, all_rows) + check_rows(cql, host2, all_rows) diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index c1b93efa71..4ffa43c7f8 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -2349,6 +2349,7 @@ class json_mutation_stream_parser { }; private: + struct parsing_aborted : public std::exception { }; class impl { queue _queue; stream _stream; @@ -2364,7 +2365,12 @@ private: , _thread([this] { _reader.Parse(_stream, _handler); }) { } ~impl() { - _thread.join().get(); + _queue.abort(std::make_exception_ptr(parsing_aborted{})); + try { + _thread.join().get(); + } catch (...) { + sst_log.warn("json_mutation_stream_parser: parser thread exited with exception: {}", std::current_exception()); + } } future operator()() { return _queue.pop_eventually().handle_exception([this] (std::exception_ptr e) -> mutation_fragment_v2_opt {