Files
scylladb/db/corrupt_data_handler.cc
Lakshmi Narayanan Sreethar 46dfe09e64 db/corrupt_data_handler: guard stop() against null _fragment_semaphore
The `system_table_corrupt_data_handler::_fragment_semaphore` member is
initialized only when the `system_keyspace` sharded service is
initialized by `main`. If the server shuts down before that due to an
unrelated reason, `_fragment_semaphore` remains default-initialized to
`nullptr`. When the shutdown process later attempts to call `stop()` on
`system_table_corrupt_data_handler`, it tries to call `stop()` on
`_fragment_semaphore`, leading to a segfault.

Fix this by checking if `_fragment_semaphore` is null before invoking
`stop()` on it.

Although `corrupt_data_handler` was backported to 2025.1, this issue
does not occur in 2025.2 and master. The recent versions include #23113,
which changes how the system keyspace is stopped and PR #24492, which
originally introduced `corrupt_data_handler`, builds on that change to
ensure `_fragment_semaphore` is stopped only if it has been created.

Fixes #24920

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>

Closes scylladb/scylladb#24931
2025-07-14 12:06:29 +03:00

140 lines
6.1 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/corrupt_data_handler.hh"
#include "reader_concurrency_semaphore.hh"
#include "replica/database.hh"
#include "utils/UUID_gen.hh"
static logging::logger corrupt_data_logger("corrupt_data");
namespace sm = seastar::metrics;
namespace db {
corrupt_data_handler::corrupt_data_handler(register_metrics rm) {
if (rm) {
_metrics.add_group("corrupt_data", {
sm::make_counter("entries_reported", _stats.corrupt_data_reported,
sm::description("Counts the number of corrupt data instances reported to the corrupt data handler. "
"A non-zero value indicates that the database suffered data corruption."))
});
}
}
future<corrupt_data_handler::entry_id> corrupt_data_handler::record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
++_stats.corrupt_data_reported;
++_stats.corrupt_clustering_rows_reported;
return do_record_corrupt_clustering_row(s, pk, std::move(cr), std::move(origin), std::move(sstable_name)).then([this] (entry_id id) {
if (id) {
++_stats.corrupt_data_recorded;
++_stats.corrupt_clustering_rows_recorded;
}
return id;
});
}
system_table_corrupt_data_handler::system_table_corrupt_data_handler(config cfg, register_metrics rm)
: corrupt_data_handler(rm)
, _entry_ttl(cfg.entry_ttl)
{
}
system_table_corrupt_data_handler::~system_table_corrupt_data_handler() {
}
reader_permit system_table_corrupt_data_handler::make_fragment_permit(const schema& s) {
return _fragment_semaphore->make_tracking_only_permit(s.shared_from_this(), "system_table_corrupt_data_handler::make_fragment_permit", db::no_timeout, {});
}
future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_record_corrupt_mutation_fragment(
gate::holder permit,
const schema& user_table_schema,
const partition_key& pk,
const clustering_key& ck,
mutation_fragment_v2::kind kind,
frozen_mutation_fragment_v2 fmf,
sstring origin,
std::optional<sstring> sstable_name) {
const corrupt_data_handler::entry_id id{utils::UUID_gen::get_time_UUID()};
const auto corrupt_data_schema = _sys_ks->local_db().find_column_family(system_keyspace::NAME, system_keyspace::CORRUPT_DATA).schema();
// Using the lower-level mutation API to avoid large allocation warnings when linearizing the frozen mutation fragment.
mutation entry_mutation(corrupt_data_schema, partition_key::from_exploded(*corrupt_data_schema, {serialized(user_table_schema.ks_name()), serialized(user_table_schema.cf_name())}));
auto& entry_row = entry_mutation.partition().clustered_row(*corrupt_data_schema, clustering_key::from_single_value(*corrupt_data_schema, serialized(timeuuid_native_type{id.uuid()})));
const auto timestamp = api::new_timestamp();
auto set_cell_raw = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, managed_bytes cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value, _entry_ttl));
};
auto set_cell = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, data_value cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value.serialize_nonnull(), _entry_ttl));
};
entry_row.apply(row_marker(timestamp, _entry_ttl, gc_clock::now() + _entry_ttl));
set_cell("partition_key", data_value(to_bytes(pk.representation())));
set_cell("clustering_key", data_value(to_bytes(ck.representation())));
set_cell("mutation_fragment_kind", fmt::to_string(kind));
// FIXME: Exposing knowledge here that bytes are serialized by just storing the raw value.
// Need to replace with a fragmented-buffer serialize API call, which we don't have yet.
set_cell_raw("frozen_mutation_fragment", std::move(fmf).representation().to_managed_bytes());
set_cell("origin", origin);
set_cell("sstable_name", sstable_name);
return _sys_ks->apply_mutation(std::move(entry_mutation)).then([id] {
return id;
});
}
future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
if (!_sys_ks) {
co_return corrupt_data_handler::entry_id::create_null_id();
}
auto permit = _gate.hold();
const auto ck = cr.key();
auto fmf = freeze(s, mutation_fragment_v2(s, make_fragment_permit(s), std::move(cr)));
co_return co_await do_record_corrupt_mutation_fragment(std::move(permit), s, pk, ck, mutation_fragment_v2::kind::clustering_row, std::move(fmf),
std::move(origin), std::move(sstable_name));
}
void system_table_corrupt_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_sys_ks = sys_ks.shared_from_this();
_fragment_semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "system_table_corrupt_data_handler", reader_concurrency_semaphore::register_metrics::no);
}
void system_table_corrupt_data_handler::unplug_system_keyspace() noexcept {
_sys_ks = nullptr;
}
future<> system_table_corrupt_data_handler::stop() noexcept {
co_await _gate.close();
if (_fragment_semaphore) {
co_await _fragment_semaphore->stop();
}
}
future<corrupt_data_handler::entry_id> nop_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
return make_ready_future<entry_id>(entry_id::create_null_id());
}
} // namespace db