Files
scylladb/mutation_writer/token_group_based_splitting_writer.cc
Raphael S. Carvalho 68f23d54d8 replica: Fix split compaction when tablet boundaries change
Consider the following:
1) balancer emits split decision
2) split compaction starts
3) split decision is revoked
4) emits merge decision
5) completes merge, before compaction in step 2 finishes

After last step, split compaction initiated in step 2 can fail
because it works with the global tablet map, rather than the
map when the compaction started. With the global state changing
under its feet, on merge, the mutation splitting writer will
think it's going backwards since sibling tablets are merged.

This problem was also seen when running load-and-stream, where
split initiated by the sstable writer failed, split completed,
and the unsplit sstable is left in the table dir, causing
problems in the restart.

To fix this, let's make split compaction always work with
the state when it started, not a global state.

Fixes #24153.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-09-07 05:20:23 -03:00

120 lines
4.1 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "mutation_writer/token_group_based_splitting_writer.hh"
#include <seastar/core/shared_mutex.hh>
#include <seastar/core/on_internal_error.hh>
#include "mutation_writer/feed_writers.hh"
#include "utils/error_injection.hh"
namespace mutation_writer {
static logging::logger logger("token_group_based_splitting_mutation_writer");
class token_group_based_splitting_mutation_writer {
schema_ptr _schema;
reader_permit _permit;
classify_by_token_group _classify;
mutation_reader_consumer _consumer;
token_group_id _current_group_id = 0;
std::optional<bucket_writer> _current_writer;
private:
future<> write(mutation_fragment_v2&& mf) {
return _current_writer->consume(std::move(mf));
}
inline void allocate_new_writer_if_needed() {
if (!_current_writer) [[unlikely]] {
_current_writer = bucket_writer(_schema, _permit, _consumer);
}
}
// Keeps the previous writer alive while closed
// and then allocates a new write, if needed.
future<> do_switch_to_new_writer() {
_current_writer->consume_end_of_stream();
// reset _current_writer while closing the previous one
// to prevent race with close() after abort()
auto wr = std::exchange(_current_writer, std::nullopt);
co_await wr->close();
allocate_new_writer_if_needed();
co_await utils::get_local_injector().inject("splitting_mutation_writer_switch_wait", utils::wait_for_message(std::chrono::seconds(60)));
}
// Called frequently, hence yields (and allocates)
// only on the unlikely slow path.
future<> maybe_switch_to_new_writer(dht::token t) {
auto prev_group_id = _current_group_id;
_current_group_id = _classify(t);
if (_current_group_id < prev_group_id) [[unlikely]] {
on_internal_error(logger, format("Token group id cannot go backwards, current={}, previous={}", _current_group_id, prev_group_id));
}
if (_current_writer && _current_group_id > prev_group_id) [[unlikely]] {
return do_switch_to_new_writer();
}
allocate_new_writer_if_needed();
return make_ready_future<>();
}
public:
token_group_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_token_group classify, mutation_reader_consumer consumer)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _classify(std::move(classify))
, _consumer(std::move(consumer))
{}
future<> consume(partition_start&& ps) {
return maybe_switch_to_new_writer(ps.key().token()).then([this, ps = std::move(ps)] () mutable {
return write(mutation_fragment_v2(*_schema, _permit, std::move(ps)));
});
}
future<> consume(static_row&& sr) {
return write(mutation_fragment_v2(*_schema, _permit, std::move(sr)));
}
future<> consume(clustering_row&& cr) {
return write(mutation_fragment_v2(*_schema, _permit, std::move(cr)));
}
future<> consume(range_tombstone_change&& rt) {
return write(mutation_fragment_v2(*_schema, _permit, std::move(rt)));
}
future<> consume(partition_end&& pe) {
return write(mutation_fragment_v2(*_schema, _permit, std::move(pe)));
}
void consume_end_of_stream() {
if (_current_writer) {
_current_writer->consume_end_of_stream();
}
}
void abort(std::exception_ptr ep) {
if (_current_writer) {
_current_writer->abort(ep);
}
}
future<> close() noexcept {
return _current_writer ? _current_writer->close() : make_ready_future<>();
}
};
future<> segregate_by_token_group(mutation_reader producer, classify_by_token_group classify, mutation_reader_consumer consumer) {
auto schema = producer.schema();
auto permit = producer.permit();
return feed_writer(
std::move(producer),
token_group_based_splitting_mutation_writer(std::move(schema), std::move(permit), std::move(classify), std::move(consumer)));
}
} // namespace mutation_writer