mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'database, storage_proxy: Reconcile pages with dead rows and partitions incrementally' from Botond Dénes
Currently, mutation query on replica side will not respond with a result which doesn't have at least one live row. This causes problems if there is a lot of dead rows or partitions before we reach a live row, which stem from the fact that resulting reconcilable_result will be large: 1. Large allocations. Serialization of reconcilable_result causes large allocations for storing result rows in std::deque 2. Reactor stalls. Serialization of reconcilable_result on the replica side and on the coordinator side causes reactor stalls. This impacts not only the query at hand. For 1M dead rows, freezing takes 130ms, unfreezing takes 500ms. Coordinator does multiple freezes and unfreezes. The reactor stall on the coordinator side is >5s 3. Too large repair mutations. If reconciliation works on large pages, repair may fail due to too large mutation size. 1M dead rows is already too much: Refs https://github.com/scylladb/scylladb/issues/9111. This patch fixes all of the above by making mutation reads respect the memory accounter's limit for the page size, even for dead rows. This patch also addresses the problem of client-side timeouts during paging. Reconciling queries processing long strings of tombstones will now properly page tombstones,like regular queries do. My testing shows that this solution even increases efficiency. I tested with a cluster of 2 nodes, and a table of RF=2. The data layout was as follows (1 partition): * Node1: 1 live row, 1M dead rows * Node2: 1M dead rows, 1 live row This was designed to trigger reconciliation right from the very start of the query. Before: ``` Running query (node2, CL=ONE, cold cache) Query done, duration: 140.0633503ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (node2, CL=ONE, hot cache) Query done, duration: 66.7195275ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (all-nodes, CL=ALL, reconcile, cold-cache) Query done, duration: 873.5400742ms, pages: 2, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)] ``` After: ``` Running query (node2, CL=ONE, cold cache) Query done, duration: 136.9035122ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (node2, CL=ONE, hot cache) Query done, duration: 69.5286021ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)] Running query (all-nodes, CL=ALL, reconcile, cold-cache) Query done, duration: 162.6239498ms, pages: 100, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)] ``` Non-reconciling queries have almost identical duration (1 few ms changes can be observed between runs). Note how in the after case, the reconciling read also produces 100 pages, vs. just 2 pages in the before case, leading to a much lower duration (less than 1/4 of the before). Refs https://github.com/scylladb/scylladb/issues/7929 Refs https://github.com/scylladb/scylladb/issues/3672 Refs https://github.com/scylladb/scylladb/issues/7933 Fixes https://github.com/scylladb/scylladb/issues/9111 Closes #14923 * github.com:scylladb/scylladb: test/topology_custom: add test_read_repair.py replica/mutation_dump: detect end-of-page in range-scans tools/scylla-sstable: write: abort parser thread if writing fails test/pylib: add REST methods to get node exe and workdir paths test/pylib/rest_client: add load_new_sstables, keyspace_{flush,compaction} service/storage_proxy: add trace points for the actual read executor type service/storage_proxy: add trace points for read-repair storage_proxy: Add more trace-level logging to read-repair database: Fix accounting of small partitions in mutation query database, storage_proxy: Reconcile pages with no live rows incrementally
This commit is contained in:
@@ -2200,6 +2200,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key
|
||||
_static_row_is_alive = false;
|
||||
_live_rows = 0;
|
||||
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed));
|
||||
_used_at_entry = _memory_accounter.used_memory();
|
||||
}
|
||||
|
||||
void reconcilable_result_builder::consume(tombstone t) {
|
||||
@@ -2220,7 +2221,7 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom
|
||||
}
|
||||
_live_rows += is_alive;
|
||||
auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema));
|
||||
if (is_alive) {
|
||||
if (is_alive || _slice.options.contains<query::partition_slice::option::allow_mutation_read_page_without_live_row>()) {
|
||||
// We are considering finishing current read only after consuming a
|
||||
// live clustering row. While sending a single live row is enough to
|
||||
// guarantee progress, not ending the result on a live row would
|
||||
@@ -2261,6 +2262,17 @@ stop_iteration reconcilable_result_builder::consume_end_of_partition() {
|
||||
}
|
||||
_total_live_rows += _live_rows;
|
||||
_result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() });
|
||||
|
||||
auto accounted = _memory_accounter.used_memory() - _used_at_entry;
|
||||
auto actually_used = sizeof(partition) + _result.back().mut().representation().size();
|
||||
if (actually_used > accounted) {
|
||||
_memory_accounter.update(actually_used - accounted);
|
||||
}
|
||||
|
||||
if (_slice.options.contains<query::partition_slice::option::allow_mutation_read_page_without_live_row>()) {
|
||||
_stop = _stop || _memory_accounter.check();
|
||||
}
|
||||
|
||||
return _stop;
|
||||
}
|
||||
|
||||
|
||||
@@ -133,6 +133,7 @@ class reconcilable_result_builder {
|
||||
uint64_t _live_rows{};
|
||||
// make this the last member so it is destroyed first. #7240
|
||||
utils::chunked_vector<partition> _result;
|
||||
size_t _used_at_entry;
|
||||
|
||||
private:
|
||||
stop_iteration consume(range_tombstone&& rt);
|
||||
|
||||
@@ -177,6 +177,11 @@ public:
|
||||
// directly, bypassing the intermediate reconcilable_result format used
|
||||
// in pre 4.5 range scans.
|
||||
range_scan_data_variant,
|
||||
// When set, mutation query can end a page even if there is no live row in the
|
||||
// final reconcilable_result. This prevents exchanging large pages when there
|
||||
// is a lot of dead rows. This flag is needed during rolling upgrades to support
|
||||
// old coordinators which do not tolerate pages with no live rows.
|
||||
allow_mutation_read_page_without_live_row,
|
||||
};
|
||||
using option_set = enum_set<super_enum<option,
|
||||
option::send_clustering_key,
|
||||
@@ -191,7 +196,8 @@ public:
|
||||
option::with_digest,
|
||||
option::bypass_cache,
|
||||
option::always_return_static_content,
|
||||
option::range_scan_data_variant>>;
|
||||
option::range_scan_data_variant,
|
||||
option::allow_mutation_read_page_without_live_row>>;
|
||||
clustering_row_ranges _row_ranges;
|
||||
public:
|
||||
column_id_vector static_columns; // TODO: consider using bitmap
|
||||
|
||||
@@ -604,7 +604,11 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
|
||||
dk_opt = co_await partition_key_generator();
|
||||
if (compaction_state->are_limits_reached() || qs.builder.is_short_read()) {
|
||||
dk_opt = {};
|
||||
} else {
|
||||
dk_opt = co_await partition_key_generator();
|
||||
}
|
||||
}
|
||||
|
||||
co_return make_lw_shared<query::result>(qs.builder.build(compaction_state->current_full_position()));
|
||||
|
||||
@@ -4980,6 +4980,8 @@ protected:
|
||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
cmd->slice.options.set<query::partition_slice::option::allow_mutation_read_page_without_live_row>();
|
||||
|
||||
// Waited on indirectly.
|
||||
make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout);
|
||||
|
||||
@@ -5003,13 +5005,18 @@ protected:
|
||||
|
||||
// We generate a retry if at least one node reply with count live columns but after merge we have less
|
||||
// than the total number of column we are interested in (which may be < count on a retry).
|
||||
// So in particular, if no host returned count live columns, we know it's not a short read.
|
||||
bool can_send_short_read = rr_opt && rr_opt->is_short_read() && rr_opt->row_count() > 0;
|
||||
if (rr_opt && (can_send_short_read || data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit()
|
||||
// So in particular, if no host returned count live columns, we know it's not a short read due to
|
||||
// row or partition limits being exhausted and retry is not needed.
|
||||
if (rr_opt && (rr_opt->is_short_read()
|
||||
|| data_resolver->all_reached_end()
|
||||
|| rr_opt->row_count() >= original_row_limit()
|
||||
|| data_resolver->live_partition_count() >= original_partition_limit())
|
||||
&& !data_resolver->any_partition_short_read()) {
|
||||
tracing::trace(_trace_state, "Read stage is done for read-repair");
|
||||
mlogger.trace("reconciled: {}", rr_opt->pretty_printer(_schema));
|
||||
auto result = ::make_foreign(::make_lw_shared<query::result>(
|
||||
co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit)));
|
||||
qlogger.trace("reconciled: {}", result->pretty_printer(_schema, _cmd->slice));
|
||||
// wait for write to complete before returning result to prevent multiple concurrent read requests to
|
||||
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
|
||||
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
|
||||
@@ -5032,6 +5039,7 @@ protected:
|
||||
on_read_resolved();
|
||||
});
|
||||
} else {
|
||||
tracing::trace(_trace_state, "Not enough data, need a retry for read-repair");
|
||||
_proxy->get_stats().read_retries++;
|
||||
_retry_cmd = make_lw_shared<query::read_command>(*cmd);
|
||||
// We asked t (= cmd->get_row_limit()) live columns and got l (=data_resolver->total_live_count) ones.
|
||||
@@ -5138,6 +5146,7 @@ public:
|
||||
exec->_targets.erase(i, exec->_targets.end());
|
||||
}
|
||||
}
|
||||
tracing::trace(exec->_trace_state, "digest mismatch, starting read repair");
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_proxy->get_stats().read_repair_repaired_blocking++;
|
||||
}
|
||||
@@ -5343,6 +5352,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
|
||||
if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size()
|
||||
|| (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) {
|
||||
tracing::trace(trace_state, "Creating never_speculating_read_executor - speculative retry is disabled or there are no extra replicas to speculate with");
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
|
||||
@@ -5350,6 +5360,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
|
||||
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
|
||||
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
|
||||
tracing::trace(trace_state, "always_speculating_read_executor (all targets)");
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
|
||||
@@ -5358,16 +5369,20 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
|
||||
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
|
||||
slogger.trace("read executor no extra target to speculate");
|
||||
tracing::trace(trace_state, "Creating never_speculating_read_executor - there are no extra replicas to speculate with");
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
} else {
|
||||
target_replicas.push_back(*extra_replica);
|
||||
slogger.trace("creating read executor with extra target {}", *extra_replica);
|
||||
tracing::trace(trace_state, "Added extra target {} for speculative read", *extra_replica);
|
||||
}
|
||||
}
|
||||
|
||||
if (retry_type == speculative_retry::type::ALWAYS) {
|
||||
tracing::trace(trace_state, "Creating always_speculating_read_executor");
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
} else {// PERCENTILE or CUSTOM.
|
||||
tracing::trace(trace_state, "Creating speculating_read_executor");
|
||||
return ::make_shared<speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,3 +422,30 @@ def test_ck_in_query(cql, test_table):
|
||||
for col_name, expected_value in zip(columns, expected_row):
|
||||
assert hasattr(row, col_name)
|
||||
assert getattr(row, col_name) == expected_value
|
||||
|
||||
|
||||
def test_many_partitions(cql, test_keyspace, scylla_only):
|
||||
num_partitions = 5000
|
||||
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
|
||||
delete_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ?")
|
||||
for pk in range(num_partitions):
|
||||
cql.execute(delete_id, (pk,))
|
||||
|
||||
res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({table})"))
|
||||
pks = set()
|
||||
partition_starts = 0
|
||||
partition_ends = 0
|
||||
for row in res:
|
||||
assert row.pk >= 0 and row.pk < num_partitions
|
||||
if row.mutation_fragment_kind == "partition start":
|
||||
partition_starts += 1
|
||||
pks.add(row.pk)
|
||||
elif row.mutation_fragment_kind == "partition end":
|
||||
partition_ends += 1
|
||||
assert row.pk in pks
|
||||
else:
|
||||
pytest.fail(f"Unexpected mutation fragment kind: {row.mutation_fragment_kind}")
|
||||
|
||||
assert partition_starts == num_partitions
|
||||
assert partition_ends == num_partitions
|
||||
assert len(pks) == num_partitions
|
||||
|
||||
@@ -310,3 +310,9 @@ class ManagerClient():
|
||||
logger.debug("ManagerClient getting log filename for %s", server_id)
|
||||
log_filename = await self.client.get_text(f"/cluster/server/{server_id}/get_log_filename")
|
||||
return ScyllaLogFile(self.thread_pool, log_filename)
|
||||
|
||||
async def server_get_workdir(self, server_id: ServerNum) -> str:
|
||||
return await self.client.get_text(f"/cluster/server/{server_id}/workdir")
|
||||
|
||||
async def server_get_exe(self, server_id: ServerNum) -> str:
|
||||
return await self.client.get_text(f"/cluster/server/{server_id}/exe")
|
||||
|
||||
@@ -230,6 +230,24 @@ class ScyllaRESTAPIClient():
|
||||
assert level in ["debug", "info", "warning", "trace"]
|
||||
await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip)
|
||||
|
||||
async def load_new_sstables(self, node_ip: str, keyspace: str, table: str) -> None:
|
||||
"""Load sstables from upload directory"""
|
||||
await self.client.post(f"/storage_service/sstables/{keyspace}?cf={table}", host=node_ip)
|
||||
|
||||
async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
|
||||
"""Flush the specified or all tables in the keyspace"""
|
||||
url = f"/storage_service/keyspace_flush/{keyspace}"
|
||||
if table is not None:
|
||||
url += "?cf={table}"
|
||||
await requests.post(url, host=node_ip)
|
||||
|
||||
async def keyspace_compaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
|
||||
"""Compact the specified or all tables in the keyspace"""
|
||||
url = f"/storage_service/keyspace_compaction/{keyspace}"
|
||||
if table is not None:
|
||||
url += "?cf={table}"
|
||||
await requests.post(url, host=node_ip)
|
||||
|
||||
|
||||
class ScyllaMetrics:
|
||||
def __init__(self, lines: list[str]):
|
||||
|
||||
@@ -1069,6 +1069,8 @@ class ScyllaClusterManager:
|
||||
add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
|
||||
add_put('/cluster/server/{server_id}/change_ip', self._server_change_ip)
|
||||
add_get('/cluster/server/{server_id}/get_log_filename', self._server_get_log_filename)
|
||||
add_get('/cluster/server/{server_id}/workdir', self._server_get_workdir)
|
||||
add_get('/cluster/server/{server_id}/exe', self._server_get_exe)
|
||||
|
||||
async def _manager_up(self, _request) -> aiohttp.web.Response:
|
||||
return aiohttp.web.Response(text=f"{self.is_running}")
|
||||
@@ -1279,13 +1281,27 @@ class ScyllaClusterManager:
|
||||
ip_addr = await self.cluster.change_ip(server_id)
|
||||
return aiohttp.web.json_response({"ip_addr": ip_addr})
|
||||
|
||||
async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
|
||||
async def _server_get_attribute(self, request: aiohttp.web.Request, attribute: str) -> aiohttp.web.Response:
|
||||
"""Generic request handler which gets a particular attribute of a ScyllaServer instance
|
||||
|
||||
To be used to implement concrete handlers, not for direct use.
|
||||
"""
|
||||
assert self.cluster
|
||||
server_id = ServerNum(int(request.match_info["server_id"]))
|
||||
server = self.cluster.servers[server_id]
|
||||
if not server:
|
||||
return aiohttp.web.Response(status=404, text=f"Server {server_id} unknown")
|
||||
return aiohttp.web.Response(text=f"{server.log_filename}")
|
||||
return aiohttp.web.Response(text=f"{getattr(server, attribute)}")
|
||||
|
||||
async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
|
||||
return await self._server_get_attribute(request, "log_filename")
|
||||
|
||||
async def _server_get_workdir(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
|
||||
return await self._server_get_attribute(request, "workdir")
|
||||
|
||||
async def _server_get_exe(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
|
||||
return await self._server_get_attribute(request, "exe")
|
||||
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
315
test/topology_custom/test_read_repair.py
Normal file
315
test/topology_custom/test_read_repair.py
Normal file
@@ -0,0 +1,315 @@
|
||||
#
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import datetime
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
import random
|
||||
import struct
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
from typing import TypeAlias, Any
|
||||
|
||||
from cassandra.cluster import ConsistencyLevel, Session # type: ignore
|
||||
from cassandra.query import SimpleStatement # type: ignore
|
||||
from cassandra.pool import Host # type: ignore
|
||||
from cassandra.murmur3 import murmur3 # type: ignore
|
||||
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def serialize_int(i: int) -> str:
|
||||
return struct.pack(">l", i).hex()
|
||||
|
||||
|
||||
def serialize_key(i: int) -> str:
|
||||
return struct.pack(">hl", 4, i).hex()
|
||||
|
||||
|
||||
class row_tombstone_data:
|
||||
pk = 0
|
||||
v = 1
|
||||
|
||||
column_spec = "pk int, ck int, v int, PRIMARY KEY (pk, ck)"
|
||||
select_query = f"SELECT * FROM ks.tbl WHERE pk = {pk}"
|
||||
unique_key = 'ck'
|
||||
|
||||
@classmethod
|
||||
def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int,
|
||||
deletion_time: datetime.datetime):
|
||||
rows = []
|
||||
formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
serialized_value = serialize_int(cls.v)
|
||||
for ck in range(total_rows):
|
||||
row = {
|
||||
"type": "clustering-row",
|
||||
"key": {"raw": serialize_key(ck)},
|
||||
}
|
||||
if ck in live_rows:
|
||||
row["marker"] = {"timestamp": live_timestamp}
|
||||
row["columns"] = {"v": {
|
||||
"is_live": True,
|
||||
"type": "regular",
|
||||
"timestamp": live_timestamp,
|
||||
"value": serialized_value,
|
||||
}}
|
||||
else:
|
||||
row["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time}
|
||||
rows.append(row)
|
||||
|
||||
assert len(rows) == total_rows
|
||||
|
||||
return [
|
||||
{
|
||||
"key": {"raw": serialize_key(cls.pk)},
|
||||
"clustering_elements": rows,
|
||||
},
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None:
|
||||
assert row.pk == cls.pk
|
||||
if row.partition_region != 2:
|
||||
return None
|
||||
if "tombstone" in json.loads(row.metadata):
|
||||
is_live = False
|
||||
else:
|
||||
is_live = True
|
||||
cols = json.loads(row.value)
|
||||
assert cols["v"] == str(cls.v)
|
||||
return row.ck, is_live
|
||||
|
||||
@classmethod
|
||||
def check_result_row(cls, i: int, row) -> None:
|
||||
assert row.pk == cls.pk
|
||||
assert row.ck == i
|
||||
assert row.v == cls.v
|
||||
|
||||
|
||||
class partition_tombstone_data:
|
||||
v = 1
|
||||
|
||||
column_spec = "pk int PRIMARY KEY, v int"
|
||||
select_query = "SELECT * FROM ks.tbl"
|
||||
unique_key = 'pk'
|
||||
|
||||
partition_tombstone_timestamp = None
|
||||
partition_live = False
|
||||
|
||||
class PartitionKey:
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
self.raw = serialize_key(self.value)
|
||||
self.token = murmur3(struct.pack(">l", self.value))
|
||||
|
||||
def __lt__(self, o):
|
||||
return self.token < o.token
|
||||
|
||||
@classmethod
|
||||
def generate_sstable(cls, total_rows: int, live_rows: set[int], dead_timestamp: int, live_timestamp: int,
|
||||
deletion_time: datetime.datetime):
|
||||
partitions = []
|
||||
formatted_deletion_time = deletion_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
serialized_value = serialize_int(cls.v)
|
||||
pks = sorted([cls.PartitionKey(pk) for pk in range(total_rows)])
|
||||
for pk in pks:
|
||||
partition: dict[str, Any] = {
|
||||
"key": {"raw": pk.raw, "token": pk.token},
|
||||
}
|
||||
if pk.value in live_rows:
|
||||
partition["clustering_elements"] = [
|
||||
{
|
||||
"type": "clustering-row",
|
||||
"key": {"raw": ""},
|
||||
"marker": {"timestamp": live_timestamp},
|
||||
"columns": {"v": {
|
||||
"is_live": True,
|
||||
"type": "regular",
|
||||
"timestamp": live_timestamp,
|
||||
"value": serialized_value,
|
||||
}},
|
||||
},
|
||||
]
|
||||
else:
|
||||
partition["tombstone"] = {"timestamp": dead_timestamp, "deletion_time": formatted_deletion_time}
|
||||
|
||||
partitions.append(partition)
|
||||
|
||||
assert len(partitions) == total_rows
|
||||
return partitions
|
||||
|
||||
@classmethod
|
||||
def check_mutation_row(cls, row, expected_live_rows: set[int]) -> tuple | None:
|
||||
logger.info(f"check_mutation_row(): kind={row.mutation_fragment_kind}, pk={row.pk}, metadata={row.metadata},"
|
||||
" value={row.value}")
|
||||
if row.partition_region == 0:
|
||||
cls.partition_tombstone_timestamp = json.loads(row.metadata)["tombstone"].get("timestamp")
|
||||
return None
|
||||
elif row.partition_region == 3:
|
||||
is_live = cls.partition_live
|
||||
cls.partition_tombstone_timestamp = None
|
||||
cls.partition_live = False
|
||||
return row.pk, is_live
|
||||
elif row.partition_region != 2:
|
||||
return None
|
||||
|
||||
metadata = json.loads(row.metadata)
|
||||
try:
|
||||
v = metadata["columns"]["v"]
|
||||
cls.partition_live = v["is_live"]
|
||||
assert cls.partition_tombstone_timestamp is None or v["timestamp"] > cls.partition_tombstone_timestamp
|
||||
cols = json.loads(row.value)
|
||||
assert cols["v"] == str(cls.v)
|
||||
except KeyError:
|
||||
cls.partition_live = False
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def check_result_row(cls, i: int, row) -> None:
|
||||
assert row.pk == i
|
||||
assert row.v == cls.v
|
||||
|
||||
|
||||
incremental_repair_test_data = [pytest.param(row_tombstone_data, id="row-tombstone"),
|
||||
pytest.param(partition_tombstone_data, id="partition-tombstone")]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def workdir():
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
yield tmp_dir
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_class", incremental_repair_test_data)
|
||||
@pytest.mark.asyncio
|
||||
async def test_incremental_read_repair(data_class, workdir, manager):
|
||||
"""Stress the incremental read repair logic
|
||||
|
||||
Write a long stream of row tombstones, with a live row before and after.
|
||||
"""
|
||||
seed = int(time.time())
|
||||
seed = 1694170155
|
||||
logger.info(f"random-seed: {seed}")
|
||||
random.seed(seed)
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--query-tombstone-page-limit", "1000"]
|
||||
node1 = await manager.server_add(cmdline=cmdline)
|
||||
node2 = await manager.server_add(cmdline=cmdline)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
table_schema = f"CREATE TABLE ks.tbl ({data_class.column_spec}) WITH speculative_retry = 'NONE'"
|
||||
cql.execute(table_schema)
|
||||
|
||||
schema_file_path = os.path.join(workdir, "schema.cql")
|
||||
with open(schema_file_path, "w") as schema_file:
|
||||
schema_file.write(table_schema)
|
||||
|
||||
dead_timestamp = int(time.time())
|
||||
live_timestamp = dead_timestamp + 1
|
||||
|
||||
total_rows = 20000
|
||||
max_live_rows = 32
|
||||
deletion_time = datetime.datetime.now()
|
||||
|
||||
row_set: TypeAlias = set[int]
|
||||
|
||||
async def generate_and_upload_sstable(node: ServerInfo, node_row: int) -> row_set:
|
||||
live_rows = {random.randint(0, total_rows - 1) for _ in range(random.randint(0, max_live_rows))}
|
||||
live_rows.add(node_row)
|
||||
|
||||
sstable = data_class.generate_sstable(total_rows, live_rows, dead_timestamp, live_timestamp, deletion_time)
|
||||
scylla_exe = await manager.server_get_exe(node.server_id)
|
||||
node_workdir = await manager.server_get_workdir(node.server_id)
|
||||
table_upload_dir = glob.glob(os.path.join(node_workdir, "data", "ks", "tbl-*", "upload"))[0]
|
||||
|
||||
input_file_path = os.path.join(workdir, f"node{node.server_id}.sstable.json")
|
||||
with open(input_file_path, "w") as f:
|
||||
json.dump(sstable, f, indent=4)
|
||||
|
||||
subprocess.check_call([
|
||||
scylla_exe, "sstable", "write",
|
||||
"--schema-file", schema_file_path,
|
||||
"--input-file", input_file_path,
|
||||
"--output-dir", table_upload_dir,
|
||||
"--generation", "1"])
|
||||
|
||||
await manager.api.load_new_sstables(node.ip_addr, "ks", "tbl")
|
||||
|
||||
return live_rows
|
||||
|
||||
node1_rows = await generate_and_upload_sstable(node1, 0)
|
||||
node2_rows = await generate_and_upload_sstable(node2, total_rows - 1)
|
||||
all_rows = node1_rows | node2_rows
|
||||
assert len(all_rows) >= 2
|
||||
|
||||
logger.info(f"node1_rows: {len(node1_rows)} rows, row ids: {node1_rows}")
|
||||
logger.info(f"node2_rows: {len(node2_rows)} rows, row ids: {node2_rows}")
|
||||
logger.info(f"all_rows: {len(all_rows)} rows, row ids: {all_rows}")
|
||||
|
||||
def check_rows(cql: Session, host: Host, expected_live_rows: row_set) -> None:
|
||||
actual_live_rows = set()
|
||||
actual_dead_rows = set()
|
||||
for row in cql.execute("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl)", host=host):
|
||||
res = data_class.check_mutation_row(row, expected_live_rows)
|
||||
if res is None:
|
||||
continue
|
||||
row_id, is_live = res
|
||||
if is_live:
|
||||
actual_live_rows.add(row_id)
|
||||
else:
|
||||
actual_dead_rows.add(row_id)
|
||||
|
||||
# Account rows that have a tombstone but are live only once.
|
||||
actual_dead_rows -= actual_live_rows
|
||||
|
||||
assert actual_live_rows == expected_live_rows
|
||||
assert len(actual_live_rows) + len(actual_dead_rows) == total_rows
|
||||
|
||||
logger.info("Check rows with CL=ONE before read-repair")
|
||||
check_rows(cql, host1, node1_rows)
|
||||
check_rows(cql, host2, node2_rows)
|
||||
|
||||
logger.info("Run read-repair")
|
||||
res = cql.execute(SimpleStatement(data_class.select_query, consistency_level=ConsistencyLevel.ALL))
|
||||
res_rows = []
|
||||
pages = []
|
||||
while True:
|
||||
res_rows.extend(list(res.current_rows))
|
||||
pages.append(list(res.current_rows))
|
||||
if res.has_more_pages:
|
||||
res.fetch_next_page()
|
||||
else:
|
||||
break
|
||||
|
||||
logger.debug(f"repair: {len(pages)} pages: {pages}")
|
||||
assert len(pages) > 1
|
||||
assert len(res_rows) == len(all_rows)
|
||||
actual_row_ids = set()
|
||||
for res_row in res_rows:
|
||||
row_id = getattr(res_row, data_class.unique_key)
|
||||
actual_row_ids.add(row_id)
|
||||
assert row_id in all_rows
|
||||
data_class.check_result_row(row_id, res_row)
|
||||
assert actual_row_ids == all_rows
|
||||
|
||||
for node in (node1, node2):
|
||||
manager.api.keyspace_flush(node.ip_addr, "ks")
|
||||
manager.api.keyspace_compaction(node.ip_addr, "ks")
|
||||
|
||||
logger.info("Check rows with CL=ONE after read-repair")
|
||||
check_rows(cql, host1, all_rows)
|
||||
check_rows(cql, host2, all_rows)
|
||||
@@ -2349,6 +2349,7 @@ class json_mutation_stream_parser {
|
||||
};
|
||||
|
||||
private:
|
||||
struct parsing_aborted : public std::exception { };
|
||||
class impl {
|
||||
queue<mutation_fragment_v2_opt> _queue;
|
||||
stream _stream;
|
||||
@@ -2364,7 +2365,12 @@ private:
|
||||
, _thread([this] { _reader.Parse(_stream, _handler); })
|
||||
{ }
|
||||
~impl() {
|
||||
_thread.join().get();
|
||||
_queue.abort(std::make_exception_ptr(parsing_aborted{}));
|
||||
try {
|
||||
_thread.join().get();
|
||||
} catch (...) {
|
||||
sst_log.warn("json_mutation_stream_parser: parser thread exited with exception: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
future<mutation_fragment_v2_opt> operator()() {
|
||||
return _queue.pop_eventually().handle_exception([this] (std::exception_ptr e) -> mutation_fragment_v2_opt {
|
||||
|
||||
Reference in New Issue
Block a user