repair: Fix sstable_list_to_mark_as_repaired with multishard writer

It was obseved:

```
test_repair_disjoint_row_2nodes_diff_shard_count was spuriously failing due to
segfault.

backtrace pointed to a failure when allocating an object from the chain of
freed objects, which indicates memory corruption.

(gdb) bt
    at ./seastar/include/seastar/core/shared_ptr.hh:275
    at ./seastar/include/seastar/core/shared_ptr.hh:430
Usual suspect is use-after-free, so ran the reproducer in the sanitize mode,
which indicated shared ptr was being copied into another cpu through the
multi shard writer:

seastar - shared_ptr accessed on non-owner cpu, at: ...
--------
seastar::smp_message_queue::async_work_item<mutation_writer::multishard_writer::make_shard_writer...

```

The multishard writer itself was fine, the problem was in the streaming consumer
for repair copying a shared ptr. It could work fine with same smp setting, since
there will be only 1 shard in the consumer path, from rpc handler all the way
to the consumer. But with mixed smp setting, the ptr would be copied into the
cpus involved, and since the shared ptr is not cpu safe, the refcount change
can go wrong, causing double free, use-after-free.

To fix, we pass a generic incremental repair handler to the streaming
consumer. The handler is safe to be copied to different shards. It will
be a no op if incremental repair is not enabled or on a different shard.

A reproducer test is added. The test could reproduce the crash
consistently before the fix and work well after the fix.

Fixes #27666

Closes scylladb/scylladb#27870
This commit is contained in:
Asias He
2025-12-25 15:56:10 +08:00
committed by Avi Kivity
parent 5f48ab3875
commit 0aabf51380
5 changed files with 81 additions and 25 deletions

View File

@@ -556,9 +556,22 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
// handled by normal compaction.
auto off_str = t.uses_tablets() ? sstables::offstrategy(false) : is_offstrategy_supported(_reason);
auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard);
bool mark_as_repaired = t.uses_tablets() && _repaired_at.has_value();
auto& sst_list = w->get_sstable_list_to_mark_as_repaired();
auto shard = this_shard_id();
auto inc_repair_handler = [&sst_list, mark_as_repaired, shard, sid = _topo_guard] (sstables::shared_sstable sst) mutable {
if (!mark_as_repaired) {
return;
}
if (shard != this_shard_id()) {
return;
}
sst->being_repaired = sid;
sst_list.insert(sst);
};
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, off_str,
_topo_guard, _repaired_at, w->get_sstable_list_to_mark_as_repaired()),
_topo_guard, inc_repair_handler),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
@@ -2154,8 +2167,8 @@ public:
public:
future<> mark_sstable_as_repaired() {
auto sstables = _repair_writer->get_sstable_list_to_mark_as_repaired();
if (_incremental_repair_meta.sst_set || sstables) {
auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired();
if (_incremental_repair_meta.sst_set || !sstables.empty()) {
co_await seastar::async([&] {
auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) {
auto filename = sst->toc_filename();
@@ -2169,11 +2182,9 @@ public:
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "existing");
});
if (sstables) {
for (auto& sst : *sstables) {
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "repair_produced");
}
for (auto& sst : sstables) {
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "repair_produced");
}
});
}

View File

@@ -89,7 +89,7 @@ class repair_writer : public enable_lw_shared_from_this<repair_writer> {
bool _created_writer = false;
uint64_t _estimated_partitions = 0;
// Holds the sstables produced by repair
lw_shared_ptr<sstables::sstable_list> _sstables;
sstables::sstable_list _sstables;
public:
class impl {
public:
@@ -108,7 +108,6 @@ public:
std::unique_ptr<impl> impl)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _sstables(make_lw_shared<sstables::sstable_list>())
, _impl(std::move(impl))
, _mq(&_impl->queue())
{}
@@ -143,7 +142,7 @@ public:
return _impl->queue();
}
lw_shared_ptr<sstables::sstable_list>& get_sstable_list_to_mark_as_repaired() {
sstables::sstable_list& get_sstable_list_to_mark_as_repaired() {
return _sstables;
}

View File

@@ -29,9 +29,8 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
stream_reason reason,
sstables::offstrategy offstrategy,
service::frozen_topology_guard frozen_guard,
std::optional<int64_t> repaired_at,
lw_shared_ptr<sstables::sstable_list> sstable_list_to_mark_as_repaired) {
return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> {
std::function<void (sstables::shared_sstable sst)> on_sstable_written) {
return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, on_sstable_written] (mutation_reader reader) -> future<> {
std::exception_ptr ex;
try {
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
@@ -52,7 +51,7 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
mutation_reader_consumer consumer =
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, &vbw, origin = std::move(origin),
offstrategy, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] (mutation_reader reader) {
offstrategy, on_sstable_written] (mutation_reader reader) {
sstables::shared_sstable sst;
try {
sst = use_view_update_path == db::view::sstable_destination_decision::normal_directory ? cf->make_streaming_sstable_for_write() : cf->make_streaming_staging_sstable();
@@ -70,13 +69,10 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cfg, encoding_stats{}).then([sst] {
return sst->open_data();
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] -> future<std::vector<sstables::shared_sstable>> {
auto on_add = [sst, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] (sstables::shared_sstable loading_sst) -> future<> {
if (repaired_at && sstables::repair_origin == origin) {
loading_sst->being_repaired = frozen_guard;
if (sstable_list_to_mark_as_repaired) {
sstable_list_to_mark_as_repaired->insert(loading_sst);
}
}).then([cf, sst, offstrategy, origin, on_sstable_written, cfg] -> future<std::vector<sstables::shared_sstable>> {
auto on_add = [sst, origin, on_sstable_written, cfg] (sstables::shared_sstable loading_sst) -> future<> {
if (on_sstable_written) {
on_sstable_written(loading_sst);
}
if (loading_sst == sst) {
co_await loading_sst->seal_sstable(cfg.backup);

View File

@@ -34,7 +34,6 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
stream_reason reason,
sstables::offstrategy offstrategy,
service::frozen_topology_guard,
std::optional<int64_t> repaired_at = std::nullopt,
lw_shared_ptr<sstables::sstable_list> sstable_list_to_mark_as_repaired = {});
std::function<void (sstables::shared_sstable sst)> on_sstable_written = {});
}

View File

@@ -9,7 +9,9 @@ from test.cluster.conftest import skip_mode
from test.pylib.repair import load_tablet_sstables_repaired_at, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from cassandra.query import ConsistencyLevel
import pytest
import asyncio
@@ -815,3 +817,52 @@ async def test_incremental_retry_end_repair_stage(manager):
# Run the second repair after the first is finished
await manager.api.tablet_repair(servers[0].ip_addr, ks, table, "all", await_completion=True, incremental_mode="incremental")
# Reproducer for https://github.com/scylladb/scylladb/issues/27666.
# This test checks both tablet and vnode table. It tests the code path dealing
# with appending sstables produced by repair to a list work correctly with
# multishard writer when the shard count is different.
@pytest.mark.asyncio
@pytest.mark.parametrize("use_tablet", [False, True])
async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_tablet):
cmdline0 = [ '--smp', '2']
cmdline1 = [ '--smp', '3']
servers = await manager.servers_add(1, cmdline=cmdline0, auto_rack_dc="dc1")
servers.append(await manager.server_add(cmdline=cmdline1, property_file={'dc': 'dc1', 'rack': 'rack2'}))
cql = manager.get_cql()
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND TABLETS = {{ 'enabled': {str(use_tablet).lower()} }} ") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
async def write_with_cl_one(range_start, range_end):
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
pks = range(range_start, range_end)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
logger.info("Adding data only on first node")
await manager.api.flush_keyspace(servers[1].ip_addr, ks)
await manager.server_stop(servers[1].server_id)
manager.driver_close()
cql = await reconnect_driver(manager)
await write_with_cl_one(0, 10)
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Adding data only on second node")
await manager.server_start(servers[1].server_id)
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await manager.server_stop(servers[0].server_id)
manager.driver_close()
cql = await reconnect_driver(manager)
await write_with_cl_one(10, 20)
await manager.server_start(servers[0].server_id)
await manager.servers_see_each_other(servers)
if use_tablet:
logger.info("Starting tablet repair")
await manager.api.tablet_repair(servers[1].ip_addr, ks, "test", token='all', incremental_mode="incremental")
else:
logger.info("Starting vnode repair")
await manager.api.repair(servers[1].ip_addr, ks, "test")