Files
scylladb/sstables/writer.cc
Michał Chojnowski 94088f0a41 sstables/writer: add an accessor for the current write position in Data.db
It will be used by index tests to know the ground truth for
where each partition and row are written, so that this can
be checked against the index.
2025-09-07 00:32:02 +02:00

93 lines
3.0 KiB
C++

/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "sstables.hh"
#include "sstable_writer.hh"
#include "writer.hh"
#include "mx/writer.hh"
namespace sstables {
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats, shard_id shard) {
if (sst.get_version() < oldest_writable_sstable_format) {
on_internal_error(sstlog, format("writing sstables with too old format: {}", static_cast<int>(sst.get_version())));
}
_impl = mc::make_writer(sst, s, estimated_partitions, cfg, enc_stats, shard);
if (cfg.replay_position) {
_impl->_collector.set_replay_position(cfg.replay_position.value());
}
if (cfg.sstable_level) {
_impl->_collector.set_sstable_level(cfg.sstable_level.value());
}
sst.get_stats().on_open_for_writing();
}
void sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
_impl->_validator(dk);
_impl->_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view::for_partition_start(), {});
_impl->_sst.get_stats().on_partition_write();
return _impl->consume_new_partition(dk);
}
void sstable_writer::consume(tombstone t) {
_impl->_sst.get_stats().on_tombstone_write();
return _impl->consume(t);
}
stop_iteration sstable_writer::consume(static_row&& sr) {
_impl->_validator(mutation_fragment_v2::kind::static_row, sr.position(), {});
if (!sr.empty()) {
_impl->_sst.get_stats().on_static_row_write();
}
return _impl->consume(std::move(sr));
}
stop_iteration sstable_writer::consume(clustering_row&& cr) {
_impl->_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {});
_impl->_sst.get_stats().on_row_write();
return _impl->consume(std::move(cr));
}
stop_iteration sstable_writer::consume(range_tombstone_change&& rtc) {
_impl->_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
_impl->_sst.get_stats().on_range_tombstone_write();
return _impl->consume(std::move(rtc));
}
stop_iteration sstable_writer::consume_end_of_partition() {
_impl->_validator.on_end_of_partition();
return _impl->consume_end_of_partition();
}
void sstable_writer::consume_end_of_stream() {
_impl->_validator.on_end_of_stream();
if (_impl->_c_stats.capped_local_deletion_time) {
_impl->_sst.get_stats().on_capped_local_deletion_time();
}
return _impl->consume_end_of_stream();
}
void sstable_writer::set_repaired_at(int64_t repaired_at) {
_impl->set_repaired_at(repaired_at);
}
uint64_t sstable_writer::data_file_position_for_tests() const {
return _impl->data_file_position_for_tests();
}
sstable_writer::sstable_writer(sstable_writer&& o) = default;
sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default;
sstable_writer::~sstable_writer() {
if (_impl) {
_impl->_sst.get_stats().on_close_for_writing();
}
}
} // namespace sstables