From 2c8a0e417594547fd3df0797915ff506e9fc374d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 3 Aug 2021 01:18:07 +0200 Subject: [PATCH 01/10] database, storage_proxy: Reconcile pages with no live rows incrementally Currently, mutation query on replica side will not respond with a result which doesn't have at least one live row. This causes problems if there is a lot of dead rows or partitions before we reach a live row, which stems from the fact that resulting reconcilable_result will be large: * Large allocations. Serialization of reconcilable_result causes large allocations for storing result rows in std::deque * Reactor stalls. Serialization of reconcilable_result on the replica side and on the coordinator side causes reactor stalls. This impacts not only the query at hand. For 1M dead rows, freezing takes 130ms, unfreezing takes 500ms. Coordinator does multiple freezes and unfreezes. The reactor stall on the coordinator side is >5s. * Large repair mutations. If reconciliation works on large pages, repair may fail due to too large mutation size. 1M dead rows is already too much: Refs #9111. This patch fixes all of the above by making mutation reads respect the memory accounter's limit for the page size, even for dead rows. This patch also addresses the problem of client-side timeouts during paging. Reconciling queries processing long strings of tombstones will now properly page tombstones,like regular queries do. My testing shows that this solution even increases efficiency. I tested with a cluster of 2 nodes, and a table of RF=2. The data layout was as follows (1 partition): Node1: 1 live row, 1M dead rows Node2: 1M dead rows, 1 live row This was designed to trigger reconciliation right from the very start of the query. Before: Running query (node2, CL=ONE, cold cache) Query done, duration: 140.0633503ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (node2, CL=ONE, hot cache) Query done, duration: 66.7195275ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (all-nodes, CL=ALL, reconcile, cold-cache) Query done, duration: 873.5400742ms, pages: 2, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)] After: Running query (node2, CL=ONE, cold cache) Query done, duration: 136.9035122ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (node2, CL=ONE, hot cache) Query done, duration: 69.5286021ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (all-nodes, CL=ALL, reconcile, cold-cache) Query done, duration: 162.6239498ms, pages: 100, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)] Non-reconciling queries have almost identical duration (1 few ms changes can be observed between runs). Note how in the after case, the reconciling read also produces 100 pages, vs. just 2 pages in the before case, leading to a much lower duration (less than 1/4 of the before). Refs #7929 Refs #3672 Refs #7933 Fixes #9111 --- mutation/mutation_partition.cc | 2 +- query-request.hh | 8 +++++++- service/storage_proxy.cc | 10 +++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index 9bda0afd1d..384b6a1b7d 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -2220,7 +2220,7 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom } _live_rows += is_alive; auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema)); - if (is_alive) { + if (is_alive || _slice.options.contains()) { // We are considering finishing current read only after consuming a // live clustering row. While sending a single live row is enough to // guarantee progress, not ending the result on a live row would diff --git a/query-request.hh b/query-request.hh index 5281746c25..d4e008c50b 100644 --- a/query-request.hh +++ b/query-request.hh @@ -177,6 +177,11 @@ public: // directly, bypassing the intermediate reconcilable_result format used // in pre 4.5 range scans. range_scan_data_variant, + // When set, mutation query can end a page even if there is no live row in the + // final reconcilable_result. This prevents exchanging large pages when there + // is a lot of dead rows. This flag is needed during rolling upgrades to support + // old coordinators which do not tolerate pages with no live rows. + allow_mutation_read_page_without_live_row, }; using option_set = enum_set>; + option::range_scan_data_variant, + option::allow_mutation_read_page_without_live_row>>; clustering_row_ranges _row_ranges; public: column_id_vector static_columns; // TODO: consider using bitmap diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b3dfd74a67..5001970a30 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4980,6 +4980,8 @@ protected: data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); + cmd->slice.options.set(); + // Waited on indirectly. make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout); @@ -5003,9 +5005,11 @@ protected: // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry). - // So in particular, if no host returned count live columns, we know it's not a short read. - bool can_send_short_read = rr_opt && rr_opt->is_short_read() && rr_opt->row_count() > 0; - if (rr_opt && (can_send_short_read || data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit() + // So in particular, if no host returned count live columns, we know it's not a short read due to + // row or partition limits being exhausted and retry is not needed. + if (rr_opt && (rr_opt->is_short_read() + || data_resolver->all_reached_end() + || rr_opt->row_count() >= original_row_limit() || data_resolver->live_partition_count() >= original_partition_limit()) && !data_resolver->any_partition_short_read()) { auto result = ::make_foreign(::make_lw_shared( From 0d773c9f9fc23e1258729160e039378be4408c09 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 18 Aug 2021 01:28:45 +0200 Subject: [PATCH 02/10] database: Fix accounting of small partitions in mutation query The partition key size was ignored by the accounter, as well as the partition tombstone. As a result, a sequence of partitions with just tombstones would be accounted as taking no memory and page size limitter to not kick in. Fix by accounting the real size of accumulated frozen_mutation. Also, break pages across partitions even if there are no live rows. The coordinator can handle it now. Refs #7933 --- mutation/mutation_partition.cc | 12 ++++++++++++ mutation_query.hh | 1 + 2 files changed, 13 insertions(+) diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index 384b6a1b7d..8bdd6086e3 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -2200,6 +2200,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key _static_row_is_alive = false; _live_rows = 0; _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed)); + _used_at_entry = _memory_accounter.used_memory(); } void reconcilable_result_builder::consume(tombstone t) { @@ -2261,6 +2262,17 @@ stop_iteration reconcilable_result_builder::consume_end_of_partition() { } _total_live_rows += _live_rows; _result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() }); + + auto accounted = _memory_accounter.used_memory() - _used_at_entry; + auto actually_used = sizeof(partition) + _result.back().mut().representation().size(); + if (actually_used > accounted) { + _memory_accounter.update(actually_used - accounted); + } + + if (_slice.options.contains()) { + _stop = _stop || _memory_accounter.check(); + } + return _stop; } diff --git a/mutation_query.hh b/mutation_query.hh index 619bf46aca..01a77cbfbe 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -133,6 +133,7 @@ class reconcilable_result_builder { uint64_t _live_rows{}; // make this the last member so it is destroyed first. #7240 utils::chunked_vector _result; + size_t _used_at_entry; private: stop_iteration consume(range_tombstone&& rt); From f76f5f6bfe281092767839b26191d7b4043864ef Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 17 Aug 2021 20:10:00 +0200 Subject: [PATCH 03/10] storage_proxy: Add more trace-level logging to read-repair Extremely helpful in debugging. --- service/storage_proxy.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 5001970a30..53895f8565 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5012,8 +5012,10 @@ protected: || rr_opt->row_count() >= original_row_limit() || data_resolver->live_partition_count() >= original_partition_limit()) && !data_resolver->any_partition_short_read()) { + mlogger.trace("reconciled: {}", rr_opt->pretty_printer(_schema)); auto result = ::make_foreign(::make_lw_shared( co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit))); + qlogger.trace("reconciled: {}", result->pretty_printer(_schema, _cmd->slice)); // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) From 727e519c3ab0fd3b049be32a565dec7a312c6958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 31 Aug 2023 04:23:19 -0400 Subject: [PATCH 04/10] service/storage_proxy: add trace points for read-repair Currently the fact that read-repair was triggered can only be inferred from seeing mutation reads in the trace. This patch adds an explicit trace point for when read repair is triggered and also when it is finished or retried. --- service/storage_proxy.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 53895f8565..9dec7f5e99 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5012,6 +5012,7 @@ protected: || rr_opt->row_count() >= original_row_limit() || data_resolver->live_partition_count() >= original_partition_limit()) && !data_resolver->any_partition_short_read()) { + tracing::trace(_trace_state, "Read stage is done for read-repair"); mlogger.trace("reconciled: {}", rr_opt->pretty_printer(_schema)); auto result = ::make_foreign(::make_lw_shared( co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit))); @@ -5038,6 +5039,7 @@ protected: on_read_resolved(); }); } else { + tracing::trace(_trace_state, "Not enough data, need a retry for read-repair"); _proxy->get_stats().read_retries++; _retry_cmd = make_lw_shared(*cmd); // We asked t (= cmd->get_row_limit()) live columns and got l (=data_resolver->total_live_count) ones. @@ -5144,6 +5146,7 @@ public: exec->_targets.erase(i, exec->_targets.end()); } } + tracing::trace(exec->_trace_state, "digest mismatch, starting read repair"); exec->reconcile(exec->_cl, timeout); exec->_proxy->get_stats().read_repair_repaired_blocking++; } From ff29f430600dd5c370525a0b622a1d4f956ddc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 31 Aug 2023 04:24:26 -0400 Subject: [PATCH 05/10] service/storage_proxy: add trace points for the actual read executor type There is currently a trace point for when the read executor is created, but this only contains the initial replica set and doesn't mention which read executor is created in the end. This patch adds trace points for each different return path, so it is clear from the trace whether speculative read can happen or not. --- service/storage_proxy.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9dec7f5e99..596bfcc102 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5352,6 +5352,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size() || (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) { + tracing::trace(trace_state, "Creating never_speculating_read_executor - speculative retry is disabled or there are no extra replicas to speculate with"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } @@ -5359,6 +5360,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). + tracing::trace(trace_state, "always_speculating_read_executor (all targets)"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } @@ -5367,16 +5369,20 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw auto local_dc_filter = erm->get_topology().get_local_dc_filter(); if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) { slogger.trace("read executor no extra target to speculate"); + tracing::trace(trace_state, "Creating never_speculating_read_executor - there are no extra replicas to speculate with"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else { target_replicas.push_back(*extra_replica); slogger.trace("creating read executor with extra target {}", *extra_replica); + tracing::trace(trace_state, "Added extra target {} for speculative read", *extra_replica); } } if (retry_type == speculative_retry::type::ALWAYS) { + tracing::trace(trace_state, "Creating always_speculating_read_executor"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else {// PERCENTILE or CUSTOM. + tracing::trace(trace_state, "Creating speculating_read_executor"); return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } } From dc269cb6bd24810057249db09e7adec5da8030f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 7 Sep 2023 11:45:47 -0400 Subject: [PATCH 06/10] test/pylib/rest_client: add load_new_sstables, keyspace_{flush,compaction} To support the equivalent (roughly) of the following nodetool commands: * nodetool refresh * nodetool flush * nodetool compact --- test/pylib/rest_client.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 77cfe1e646..511c4a3009 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -230,6 +230,24 @@ class ScyllaRESTAPIClient(): assert level in ["debug", "info", "warning", "trace"] await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip) + async def load_new_sstables(self, node_ip: str, keyspace: str, table: str) -> None: + """Load sstables from upload directory""" + await self.client.post(f"/storage_service/sstables/{keyspace}?cf={table}", host=node_ip) + + async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None: + """Flush the specified or all tables in the keyspace""" + url = f"/storage_service/keyspace_flush/{keyspace}" + if table is not None: + url += "?cf={table}" + await requests.post(url, host=node_ip) + + async def keyspace_compaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None: + """Compact the specified or all tables in the keyspace""" + url = f"/storage_service/keyspace_compaction/{keyspace}" + if table is not None: + url += "?cf={table}" + await requests.post(url, host=node_ip) + class ScyllaMetrics: def __init__(self, lines: list[str]): From 46e37436d0b7ec1229489b1e05ae94929236eb53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 7 Sep 2023 11:46:36 -0400 Subject: [PATCH 07/10] test/pylib: add REST methods to get node exe and workdir paths --- test/pylib/manager_client.py | 6 ++++++ test/pylib/scylla_cluster.py | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 877c62f489..34e59afe4e 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -310,3 +310,9 @@ class ManagerClient(): logger.debug("ManagerClient getting log filename for %s", server_id) log_filename = await self.client.get_text(f"/cluster/server/{server_id}/get_log_filename") return ScyllaLogFile(self.thread_pool, log_filename) + + async def server_get_workdir(self, server_id: ServerNum) -> str: + return await self.client.get_text(f"/cluster/server/{server_id}/workdir") + + async def server_get_exe(self, server_id: ServerNum) -> str: + return await self.client.get_text(f"/cluster/server/{server_id}/exe") diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index d286e348d1..7bd6ee9112 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -1069,6 +1069,8 @@ class ScyllaClusterManager: add_put('/cluster/server/{server_id}/update_config', self._server_update_config) add_put('/cluster/server/{server_id}/change_ip', self._server_change_ip) add_get('/cluster/server/{server_id}/get_log_filename', self._server_get_log_filename) + add_get('/cluster/server/{server_id}/workdir', self._server_get_workdir) + add_get('/cluster/server/{server_id}/exe', self._server_get_exe) async def _manager_up(self, _request) -> aiohttp.web.Response: return aiohttp.web.Response(text=f"{self.is_running}") @@ -1279,13 +1281,27 @@ class ScyllaClusterManager: ip_addr = await self.cluster.change_ip(server_id) return aiohttp.web.json_response({"ip_addr": ip_addr}) - async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + async def _server_get_attribute(self, request: aiohttp.web.Request, attribute: str) -> aiohttp.web.Response: + """Generic request handler which gets a particular attribute of a ScyllaServer instance + + To be used to implement concrete handlers, not for direct use. + """ assert self.cluster server_id = ServerNum(int(request.match_info["server_id"])) server = self.cluster.servers[server_id] if not server: return aiohttp.web.Response(status=404, text=f"Server {server_id} unknown") - return aiohttp.web.Response(text=f"{server.log_filename}") + return aiohttp.web.Response(text=f"{getattr(server, attribute)}") + + async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "log_filename") + + async def _server_get_workdir(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "workdir") + + async def _server_get_exe(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + return await self._server_get_attribute(request, "exe") + @asynccontextmanager From 82f45637572d50159899f490478252a5e9c58c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 8 Sep 2023 02:47:47 -0400 Subject: [PATCH 08/10] tools/scylla-sstable: write: abort parser thread if writing fails Currently if writing the sstable fails, e.g. because the input data is out-of-order, the json parser thread hangs because its output is no longer consumed. This results in the entire application just freezing. Fix this by aborting the parsing thread explicitely in the json_mutation_stream_parser destructor. If the parser thread existed successfully, this will be a no-op, but on the error-path, this will ensure that the parser thread doesn't hang. --- tools/scylla-sstable.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index c1b93efa71..4ffa43c7f8 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -2349,6 +2349,7 @@ class json_mutation_stream_parser { }; private: + struct parsing_aborted : public std::exception { }; class impl { queue _queue; stream _stream; @@ -2364,7 +2365,12 @@ private: , _thread([this] { _reader.Parse(_stream, _handler); }) { } ~impl() { - _thread.join().get(); + _queue.abort(std::make_exception_ptr(parsing_aborted{})); + try { + _thread.join().get(); + } catch (...) { + sst_log.warn("json_mutation_stream_parser: parser thread exited with exception: {}", std::current_exception()); + } } future operator()() { return _queue.pop_eventually().handle_exception([this] (std::exception_ptr e) -> mutation_fragment_v2_opt { From b55cead5cde278ba7ec4537e5d38983117c59d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 11 Sep 2023 06:57:44 -0400 Subject: [PATCH 09/10] replica/mutation_dump: detect end-of-page in range-scans The current read-loop fails to detect end-of-page and if the query result buider cuts the page, it will just proceed to the next partition. This will result in distorted query results, as the result builder will request for the consumption to stop after each clustering row. To fix, check if the page was cut before moving on to the next partition. A unit test reproducing the bug was also added. --- replica/mutation_dump.cc | 6 ++++- .../test_select_from_mutation_fragments.py | 27 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/replica/mutation_dump.cc b/replica/mutation_dump.cc index a43fecb53e..7d3b6b5f53 100644 --- a/replica/mutation_dump.cc +++ b/replica/mutation_dump.cc @@ -604,7 +604,11 @@ future>> dump_mutations( std::rethrow_exception(std::move(ex)); } - dk_opt = co_await partition_key_generator(); + if (compaction_state->are_limits_reached() || qs.builder.is_short_read()) { + dk_opt = {}; + } else { + dk_opt = co_await partition_key_generator(); + } } co_return make_lw_shared(qs.builder.build(compaction_state->current_full_position())); diff --git a/test/cql-pytest/test_select_from_mutation_fragments.py b/test/cql-pytest/test_select_from_mutation_fragments.py index 2b7d71eea6..c795f6e868 100644 --- a/test/cql-pytest/test_select_from_mutation_fragments.py +++ b/test/cql-pytest/test_select_from_mutation_fragments.py @@ -422,3 +422,30 @@ def test_ck_in_query(cql, test_table): for col_name, expected_value in zip(columns, expected_row): assert hasattr(row, col_name) assert getattr(row, col_name) == expected_value + + +def test_many_partitions(cql, test_keyspace, scylla_only): + num_partitions = 5000 + with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table: + delete_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ?") + for pk in range(num_partitions): + cql.execute(delete_id, (pk,)) + + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({table})")) + pks = set() + partition_starts = 0 + partition_ends = 0 + for row in res: + assert row.pk >= 0 and row.pk < num_partitions + if row.mutation_fragment_kind == "partition start": + partition_starts += 1 + pks.add(row.pk) + elif row.mutation_fragment_kind == "partition end": + partition_ends += 1 + assert row.pk in pks + else: + pytest.fail(f"Unexpected mutation fragment kind: {row.mutation_fragment_kind}") + + assert partition_starts == num_partitions + assert partition_ends == num_partitions + assert len(pks) == num_partitions From f770ff7a2bed6ec8623fcb973ee7e3795172d8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 4 Sep 2023 07:21:21 -0400 Subject: [PATCH 10/10] test/topology_custom: add test_read_repair.py --- test/topology_custom/test_read_repair.py | 315 +++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 test/topology_custom/test_read_repair.py diff --git a/test/topology_custom/test_read_repair.py b/test/topology_custom/test_read_repair.py new file mode 100644 index 0000000000..86c3cc3d2d --- /dev/null +++ b/test/topology_custom/test_read_repair.py @@ -0,0 +1,315 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +import datetime +import glob +import json +import logging +import os +import pytest +import random +import struct +import subprocess +import tempfile +import time +from typing import TypeAlias, Any + +from cassandra.cluster import ConsistencyLevel, Session # type: ignore +from cassandra.query import SimpleStatement # type: ignore +from cassandra.pool import Host # type: ignore +from cassandra.murmur3 import murmur3 # type: ignore + +from test.pylib.util import wait_for_cql_and_get_hosts +from test.pylib.internal_types import ServerInfo + + +logger = logging.getLogger(__name__) + + +def serialize_int(i: int) -> str: + return struct.pack(">l", i).hex() + + +def serialize_key(i: int) -> str: + return struct.pack(">hl", 4, i).hex() + + +class row_tombstone_data: + pk = 0 + v = 1 + + column_spec = "pk int, ck int, v int, PRIMARY KEY (pk, ck)" + select_query = f"SELECT * FROM ks.tbl WHERE pk = {pk}" + unique_key = 'ck' + + @classmethod + def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int, + deletion_time: datetime.datetime): + rows = [] + formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S") + serialized_value = serialize_int(cls.v) + for ck in range(total_rows): + row = { + "type": "clustering-row", + "key": {"raw": serialize_key(ck)}, + } + if ck in live_rows: + row["marker"] = {"timestamp": live_timestamp} + row["columns"] = {"v": { + "is_live": True, + "type": "regular", + "timestamp": live_timestamp, + "value": serialized_value, + }} + else: + row["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time} + rows.append(row) + + assert len(rows) == total_rows + + return [ + { + "key": {"raw": serialize_key(cls.pk)}, + "clustering_elements": rows, + }, + ] + + @classmethod + def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None: + assert row.pk == cls.pk + if row.partition_region != 2: + return None + if "tombstone" in json.loads(row.metadata): + is_live = False + else: + is_live = True + cols = json.loads(row.value) + assert cols["v"] == str(cls.v) + return row.ck, is_live + + @classmethod + def check_result_row(cls, i: int, row) -> None: + assert row.pk == cls.pk + assert row.ck == i + assert row.v == cls.v + + +class partition_tombstone_data: + v = 1 + + column_spec = "pk int PRIMARY KEY, v int" + select_query = "SELECT * FROM ks.tbl" + unique_key = 'pk' + + partition_tombstone_timestamp = None + partition_live = False + + class PartitionKey: + def __init__(self, value): + self.value = value + self.raw = serialize_key(self.value) + self.token = murmur3(struct.pack(">l", self.value)) + + def __lt__(self, o): + return self.token < o.token + + @classmethod + def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int, + deletion_time: datetime.datetime): + partitions = [] + formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S") + serialized_value = serialize_int(cls.v) + pks = sorted([cls.PartitionKey(pk) for pk in range(total_rows)]) + for pk in pks: + partition: dict[str, Any] = { + "key": {"raw": pk.raw, "token": pk.token}, + } + if pk.value in live_rows: + partition["clustering_elements"] = [ + { + "type": "clustering-row", + "key": {"raw": ""}, + "marker": {"timestamp": live_timestamp}, + "columns": {"v": { + "is_live": True, + "type": "regular", + "timestamp": live_timestamp, + "value": serialized_value, + }}, + }, + ] + else: + partition["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time} + + partitions.append(partition) + + assert len(partitions) == total_rows + return partitions + + @classmethod + def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None: + logger.info(f"check_mutation_row(): kind={row.mutation_fragment_kind}, pk={row.pk}, metadata={row.metadata}," + " value={row.value}") + if row.partition_region == 0: + cls.partition_tombstone_timestamp = json.loads(row.metadata)["tombstone"].get("timestamp") + return None + elif row.partition_region == 3: + is_live = cls.partition_live + cls.partition_tombstone_timestamp = None + cls.partition_live = False + return row.pk, is_live + elif row.partition_region != 2: + return None + + metadata = json.loads(row.metadata) + try: + v = metadata["columns"]["v"] + cls.partition_live = v["is_live"] + assert cls.partition_tombstone_timestamp is None or v["timestamp"] > cls.partition_tombstone_timestamp + cols = json.loads(row.value) + assert cols["v"] == str(cls.v) + except KeyError: + cls.partition_live = False + return None + + @classmethod + def check_result_row(cls, i: int, row) -> None: + assert row.pk == i + assert row.v == cls.v + + +incremental_repair_test_data = [pytest.param(row_tombstone_data, id="row-tombstone"), + pytest.param(partition_tombstone_data, id="partition-tombstone")] + + +@pytest.fixture(scope="function") +def workdir(): + with tempfile.TemporaryDirectory() as tmp_dir: + yield tmp_dir + + +@pytest.mark.parametrize("data_class", incremental_repair_test_data) +@pytest.mark.asyncio +async def test_incremental_read_repair(data_class, workdir, manager): + """Stress the incremental read repair logic + + Write a long stream of row tombstones, with a live row before and after. + """ + seed = int(time.time()) + seed = 1694170155 + logger.info(f"random-seed: {seed}") + random.seed(seed) + cmdline = ["--hinted-handoff-enabled", "0", "--query-tombstone-page-limit", "1000"] + node1 = await manager.server_add(cmdline=cmdline) + node2 = await manager.server_add(cmdline=cmdline) + + cql = manager.get_cql() + + host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30) + + cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") + table_schema = f"CREATE TABLE ks.tbl ({data_class.column_spec}) WITH speculative_retry = 'NONE'" + cql.execute(table_schema) + + schema_file_path = os.path.join(workdir, "schema.cql") + with open(schema_file_path, "w") as schema_file: + schema_file.write(table_schema) + + dead_timestamp = int(time.time()) + live_timestamp = dead_timestamp + 1 + + total_rows = 20000 + max_live_rows = 32 + deletion_time = datetime.datetime.now() + + row_set: TypeAlias = set[int] + + async def generate_and_upload_sstable(node: ServerInfo, node_row: int) -> row_set: + live_rows = {random.randint(0, total_rows - 1) for _ in range(random.randint(0, max_live_rows))} + live_rows.add(node_row) + + sstable = data_class.generate_sstable(total_rows, live_rows, dead_timestamp, live_timestamp, deletion_time) + scylla_exe = await manager.server_get_exe(node.server_id) + node_workdir = await manager.server_get_workdir(node.server_id) + table_upload_dir = glob.glob(os.path.join(node_workdir, "data", "ks", "tbl-*", "upload"))[0] + + input_file_path = os.path.join(workdir, f"node{node.server_id}.sstable.json") + with open(input_file_path, "w") as f: + json.dump(sstable, f, indent=4) + + subprocess.check_call([ + scylla_exe, "sstable", "write", + "--schema-file", schema_file_path, + "--input-file", input_file_path, + "--output-dir", table_upload_dir, + "--generation", "1"]) + + await manager.api.load_new_sstables(node.ip_addr, "ks", "tbl") + + return live_rows + + node1_rows = await generate_and_upload_sstable(node1, 0) + node2_rows = await generate_and_upload_sstable(node2, total_rows - 1) + all_rows = node1_rows | node2_rows + assert len(all_rows) >= 2 + + logger.info(f"node1_rows: {len(node1_rows)} rows, row ids: {node1_rows}") + logger.info(f"node2_rows: {len(node2_rows)} rows, row ids: {node2_rows}") + logger.info(f"all_rows: {len(all_rows)} rows, row ids: {all_rows}") + + def check_rows(cql: Session, host: Host, expected_live_rows: row_set) -> None: + actual_live_rows = set() + actual_dead_rows = set() + for row in cql.execute("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl)", host=host): + res = data_class.check_mutation_row(row, expected_live_rows) + if res is None: + continue + row_id, is_live = res + if is_live: + actual_live_rows.add(row_id) + else: + actual_dead_rows.add(row_id) + + # Account rows that have a tombstone but are live only once. + actual_dead_rows -= actual_live_rows + + assert actual_live_rows == expected_live_rows + assert len(actual_live_rows) + len(actual_dead_rows) == total_rows + + logger.info("Check rows with CL=ONE before read-repair") + check_rows(cql, host1, node1_rows) + check_rows(cql, host2, node2_rows) + + logger.info("Run read-repair") + res = cql.execute(SimpleStatement(data_class.select_query, consistency_level=ConsistencyLevel.ALL)) + res_rows = [] + pages = [] + while True: + res_rows.extend(list(res.current_rows)) + pages.append(list(res.current_rows)) + if res.has_more_pages: + res.fetch_next_page() + else: + break + + logger.debug(f"repair: {len(pages)} pages: {pages}") + assert len(pages) > 1 + assert len(res_rows) == len(all_rows) + actual_row_ids = set() + for res_row in res_rows: + row_id = getattr(res_row, data_class.unique_key) + actual_row_ids.add(row_id) + assert row_id in all_rows + data_class.check_result_row(row_id, res_row) + assert actual_row_ids == all_rows + + for node in (node1, node2): + manager.api.keyspace_flush(node.ip_addr, "ks") + manager.api.keyspace_compaction(node.ip_addr, "ks") + + logger.info("Check rows with CL=ONE after read-repair") + check_rows(cql, host1, all_rows) + check_rows(cql, host2, all_rows)