diff --git a/repair/row_level.cc b/repair/row_level.cc index a101aeeb20..4e511678f5 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -556,9 +556,22 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { // handled by normal compaction. auto off_str = t.uses_tablets() ? sstables::offstrategy(false) : is_offstrategy_supported(_reason); auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard); + bool mark_as_repaired = t.uses_tablets() && _repaired_at.has_value(); + auto& sst_list = w->get_sstable_list_to_mark_as_repaired(); + auto shard = this_shard_id(); + auto inc_repair_handler = [&sst_list, mark_as_repaired, shard, sid = _topo_guard] (sstables::shared_sstable sst) mutable { + if (!mark_as_repaired) { + return; + } + if (shard != this_shard_id()) { + return; + } + sst->being_repaired = sid; + sst_list.insert(sst); + }; _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader), streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, off_str, - _topo_guard, _repaired_at, w->get_sstable_list_to_mark_as_repaired()), + _topo_guard, inc_repair_handler), t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); @@ -2154,8 +2167,8 @@ public: public: future<> mark_sstable_as_repaired() { - auto sstables = _repair_writer->get_sstable_list_to_mark_as_repaired(); - if (_incremental_repair_meta.sst_set || sstables) { + auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired(); + if (_incremental_repair_meta.sst_set || !sstables.empty()) { co_await seastar::async([&] { auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) { auto filename = sst->toc_filename(); @@ -2169,11 +2182,9 @@ public: seastar::thread::maybe_yield(); do_mark_sstable_as_repaired(sst, "existing"); }); - if (sstables) { - for (auto& sst : *sstables) { - seastar::thread::maybe_yield(); - do_mark_sstable_as_repaired(sst, "repair_produced"); - } + for (auto& sst : sstables) { + seastar::thread::maybe_yield(); + do_mark_sstable_as_repaired(sst, "repair_produced"); } }); } diff --git a/repair/writer.hh b/repair/writer.hh index 57cd5bcf5b..560648e104 100644 --- a/repair/writer.hh +++ b/repair/writer.hh @@ -89,7 +89,7 @@ class repair_writer : public enable_lw_shared_from_this { bool _created_writer = false; uint64_t _estimated_partitions = 0; // Holds the sstables produced by repair - lw_shared_ptr _sstables; + sstables::sstable_list _sstables; public: class impl { public: @@ -108,7 +108,6 @@ public: std::unique_ptr impl) : _schema(std::move(schema)) , _permit(std::move(permit)) - , _sstables(make_lw_shared()) , _impl(std::move(impl)) , _mq(&_impl->queue()) {} @@ -143,7 +142,7 @@ public: return _impl->queue(); } - lw_shared_ptr& get_sstable_list_to_mark_as_repaired() { + sstables::sstable_list& get_sstable_list_to_mark_as_repaired() { return _sstables; } diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 37d33896e7..2d012b34b1 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,9 +29,8 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard, - std::optional repaired_at, - lw_shared_ptr sstable_list_to_mark_as_repaired) { - return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> { + std::function on_sstable_written) { + return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, on_sstable_written] (mutation_reader reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { @@ -52,7 +51,7 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); mutation_reader_consumer consumer = [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, &vbw, origin = std::move(origin), - offstrategy, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] (mutation_reader reader) { + offstrategy, on_sstable_written] (mutation_reader reader) { sstables::shared_sstable sst; try { sst = use_view_update_path == db::view::sstable_destination_decision::normal_directory ? cf->make_streaming_sstable_for_write() : cf->make_streaming_staging_sstable(); @@ -70,13 +69,10 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, cfg, encoding_stats{}).then([sst] { return sst->open_data(); - }).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] -> future> { - auto on_add = [sst, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] (sstables::shared_sstable loading_sst) -> future<> { - if (repaired_at && sstables::repair_origin == origin) { - loading_sst->being_repaired = frozen_guard; - if (sstable_list_to_mark_as_repaired) { - sstable_list_to_mark_as_repaired->insert(loading_sst); - } + }).then([cf, sst, offstrategy, origin, on_sstable_written, cfg] -> future> { + auto on_add = [sst, origin, on_sstable_written, cfg] (sstables::shared_sstable loading_sst) -> future<> { + if (on_sstable_written) { + on_sstable_written(loading_sst); } if (loading_sst == sst) { co_await loading_sst->seal_sstable(cfg.backup); diff --git a/streaming/consumer.hh b/streaming/consumer.hh index 2680286119..6689a4af47 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -34,7 +34,6 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard, - std::optional repaired_at = std::nullopt, - lw_shared_ptr sstable_list_to_mark_as_repaired = {}); + std::function on_sstable_written = {}); } diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 69194844b0..10bd096d87 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -9,7 +9,9 @@ from test.cluster.conftest import skip_mode from test.pylib.repair import load_tablet_sstables_repaired_at, create_table_insert_data_for_repair from test.pylib.tablets import get_all_tablet_replicas from test.cluster.tasks.task_manager_client import TaskManagerClient -from test.cluster.util import find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown +from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown + +from cassandra.query import ConsistencyLevel import pytest import asyncio @@ -815,3 +817,52 @@ async def test_incremental_retry_end_repair_stage(manager): # Run the second repair after the first is finished await manager.api.tablet_repair(servers[0].ip_addr, ks, table, "all", await_completion=True, incremental_mode="incremental") + +# Reproducer for https://github.com/scylladb/scylladb/issues/27666. +# This test checks both tablet and vnode table. It tests the code path dealing +# with appending sstables produced by repair to a list work correctly with +# multishard writer when the shard count is different. +@pytest.mark.asyncio +@pytest.mark.parametrize("use_tablet", [False, True]) +async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_tablet): + cmdline0 = [ '--smp', '2'] + cmdline1 = [ '--smp', '3'] + servers = await manager.servers_add(1, cmdline=cmdline0, auto_rack_dc="dc1") + servers.append(await manager.server_add(cmdline=cmdline1, property_file={'dc': 'dc1', 'rack': 'rack2'})) + + cql = manager.get_cql() + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND TABLETS = {{ 'enabled': {str(use_tablet).lower()} }} ") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + async def write_with_cl_one(range_start, range_end): + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ONE + pks = range(range_start, range_end) + await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks]) + + logger.info("Adding data only on first node") + await manager.api.flush_keyspace(servers[1].ip_addr, ks) + await manager.server_stop(servers[1].server_id) + manager.driver_close() + cql = await reconnect_driver(manager) + await write_with_cl_one(0, 10) + await manager.api.flush_keyspace(servers[0].ip_addr, ks) + + logger.info("Adding data only on second node") + await manager.server_start(servers[1].server_id) + await manager.api.flush_keyspace(servers[0].ip_addr, ks) + await manager.server_stop(servers[0].server_id) + manager.driver_close() + cql = await reconnect_driver(manager) + await write_with_cl_one(10, 20) + + await manager.server_start(servers[0].server_id) + await manager.servers_see_each_other(servers) + + if use_tablet: + logger.info("Starting tablet repair") + await manager.api.tablet_repair(servers[1].ip_addr, ks, "test", token='all', incremental_mode="incremental") + else: + logger.info("Starting vnode repair") + await manager.api.repair(servers[1].ip_addr, ks, "test")