Files
scylladb/sstables/mx/writer.cc
Botond Dénes eed3a6d407 sstables/mx/writer: move post-cell write yield to collection write loop
Introduced by 54bddeb3b5, the yield was
added to write_cell(), to also help the general case where there is no
collection. Arguably this was unnecessary and this patch moves the yield
to write_collection(), to the cell write loop instead, so regular cells
don't have to poll the preempt flag.

Closes scylladb/scylladb#29013
2026-03-12 21:26:35 +02:00

1743 lines
71 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "sstables/mx/writer.hh"
#include "sstables/writer.hh"
#include "sstables/trie/bti_index.hh"
#include "encoding_stats.hh"
#include "schema/schema.hh"
#include "mutation/mutation_fragment.hh"
#include "vint-serialization.hh"
#include "sstables/types.hh"
#include "sstables/mx/types.hh"
#include "mutation/atomic_cell.hh"
#include "utils/assert.hh"
#include "utils/exceptions.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include <functional>
#include <boost/iterator/iterator_facade.hpp>
#include <boost/container/static_vector.hpp>
logging::logger slogger("mc_writer");
namespace sstables {
namespace mc {
using indexed_columns = std::vector<std::reference_wrapper<const column_definition>>;
// There is a special case when we need to treat a non-full clustering key prefix as a full one
// for serialization purposes. This is the case that may occur with a compact table.
// For historical reasons a compact table may have rows with missing trailing clustering columns in their clustering keys.
// Consider:
// cqlsh:test> CREATE TABLE cf (pk int, ck1 int, ck2 int, rc int, primary key (pk, ck1, ck2)) WITH COMPACT STORAGE;
// cqlsh:test> INSERT INTO cf (pk, ck1, rc) VALUES (1, 1, 1);
// cqlsh:test> SELECT * FROM cf;
//
// pk | ck1 | ck2 | rc
// ----+-----+------+----
// 1 | 1 | null | 1
//
// (1 rows)
// In this case, the clustering key of the row will have length 1, but for serialization purposes we want to treat
// it as a full prefix of length 2.
// So we use ephemerally_full_prefix to distinguish this kind of clustering keys
using ephemerally_full_prefix = seastar::bool_class<struct ephemerally_full_prefix_tag>;
// A helper CRTP base class for input ranges.
// Derived classes should implement the following functions:
// bool next() const;
// generates the next value, if possible;
// returns true if the next value has been evaluated, false otherwise
// explicit operator bool() const;
// tells whether the range can produce more items
// TODO: turn description into a concept
template <typename InputRange, typename ValueType>
struct input_range_base {
private:
InputRange& self() {
return static_cast<InputRange&>(*this);
}
const InputRange& self() const {
return static_cast<const InputRange&>(*this);
}
public:
// Use the same type for iterator and const_iterator
using const_iterator = class iterator
: public boost::iterator_facade<
iterator,
const ValueType,
std::input_iterator_tag,
const ValueType
>
{
private:
const InputRange* _range;
friend class input_range_base;
friend class boost::iterator_core_access;
explicit iterator(const InputRange& range)
: _range(range.next() ? &range : nullptr)
{}
void increment() {
SCYLLA_ASSERT(_range);
if (!_range->next()) {
_range = nullptr;
}
}
bool equal(iterator that) const {
return (_range == that._range);
}
const ValueType dereference() const {
SCYLLA_ASSERT(_range);
return _range->get_value();
}
public:
iterator() : _range{} {}
};
iterator begin() const { return iterator{self()}; }
iterator end() const { return iterator{}; }
};
struct clustering_block {
constexpr static size_t max_block_size = 32;
uint64_t header = 0;
struct described_value {
managed_bytes_view value;
std::reference_wrapper<const abstract_type> type;
};
boost::container::static_vector<described_value, clustering_block::max_block_size> values;
};
class clustering_blocks_input_range
: public input_range_base<clustering_blocks_input_range, clustering_block> {
private:
const schema& _schema;
const clustering_key_prefix& _prefix;
size_t _serialization_limit_size;
mutable clustering_block _current_block;
mutable uint32_t _offset = 0;
public:
clustering_blocks_input_range(const schema& s, const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full)
: _schema(s)
, _prefix(prefix) {
_serialization_limit_size = is_ephemerally_full == ephemerally_full_prefix::yes
? _schema.clustering_key_size()
: _prefix.size(_schema);
}
bool next() const {
if (_offset == _serialization_limit_size) {
// No more values to encode
return false;
}
// Each block contains up to max_block_size values
auto limit = std::min(_serialization_limit_size, _offset + clustering_block::max_block_size);
_current_block = {};
SCYLLA_ASSERT (_offset % clustering_block::max_block_size == 0);
while (_offset < limit) {
auto shift = _offset % clustering_block::max_block_size;
if (_offset < _prefix.size(_schema)) {
managed_bytes_view value = _prefix.get_component(_schema, _offset);
if (value.empty()) {
_current_block.header |= (uint64_t(1) << (shift * 2));
} else {
const auto& compound_type = _prefix.get_compound_type(_schema);
const auto& types = compound_type->types();
if (_offset < types.size()) {
const auto& type = *types[_offset];
_current_block.values.push_back({value, type});
} else {
// FIXME: might happen due to bad thrift key.
// See https://github.com/scylladb/scylla/issues/7568
//
// Consider turning into exception when the issue is fixed
// and the key is rejected in thrift handler layer, and if
// the bad key could not find its way to existing sstables.
slogger.warn("prefix {} (size={}): offset {} >= types.size {}", _prefix, _prefix.size(_schema), _offset, types.size());
_current_block.header |= (uint64_t(1) << ((shift * 2) + 1));
}
}
} else {
// This (and all subsequent) values of the prefix are missing (null)
// This branch is only ever taken for an ephemerally_full_prefix
_current_block.header |= (uint64_t(1) << ((shift * 2) + 1));
}
++_offset;
}
return true;
}
clustering_block get_value() const { return _current_block; };
explicit operator bool() const {
return (_offset < _serialization_limit_size);
}
};
template <typename W>
requires Writer<W>
static void write(sstable_version_types v, W& out, const clustering_block& block) {
write_vint(out, block.header);
for (const auto& block_value: block.values) {
write_cell_value(v, out, block_value.type, block_value.value);
}
}
template <typename W>
requires Writer<W>
void write_clustering_prefix(sstable_version_types v, W& out, const schema& s,
const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full) {
clustering_blocks_input_range range{s, prefix, is_ephemerally_full};
for (const auto block: range) {
write(v, out, block);
}
}
// This range generates a sequence of values that represent information
// about missing columns for SSTables 3.0 format.
class missing_columns_input_range
: public input_range_base<missing_columns_input_range, uint64_t> {
private:
const indexed_columns& _columns;
const row& _row;
mutable uint64_t _current_value = 0;
mutable size_t _current_index = 0;
mutable bool _large_mode_produced_size = false;
enum class encoding_mode {
small,
large_encode_present,
large_encode_missing,
} _mode;
public:
missing_columns_input_range(const indexed_columns& columns, const row& row)
: _columns(columns)
, _row(row) {
auto row_size = _row.size();
auto total_size = _columns.size();
_current_index = row_size < total_size ? 0 : total_size;
_mode = (total_size < 64) ? encoding_mode::small :
(row_size < total_size / 2) ? encoding_mode::large_encode_present :
encoding_mode::large_encode_missing;
}
bool next() const {
auto total_size = _columns.size();
if (_current_index == total_size) {
// No more values to encode
return false;
}
if (_mode == encoding_mode::small) {
// Set bit for every missing column
for (const auto& [index, column]: _columns | std::views::enumerate) {
auto cell = _row.find_cell(column.get().id);
if (!cell) {
_current_value |= (uint64_t(1) << index);
}
}
_current_index = total_size;
return true;
} else {
// For either of large modes, output the difference between total size and row size first
if (!_large_mode_produced_size) {
_current_value = total_size - _row.size();
_large_mode_produced_size = true;
return true;
}
if (_mode == encoding_mode::large_encode_present) {
while (_current_index < total_size) {
auto cell = _row.find_cell(_columns[_current_index].get().id);
if (cell) {
_current_value = _current_index;
++_current_index;
return true;
}
++_current_index;
}
} else {
SCYLLA_ASSERT(_mode == encoding_mode::large_encode_missing);
while (_current_index < total_size) {
auto cell = _row.find_cell(_columns[_current_index].get().id);
if (!cell) {
_current_value = _current_index;
++_current_index;
return true;
}
++_current_index;
}
}
}
return false;
}
uint64_t get_value() const { return _current_value; }
explicit operator bool() const
{
return (_current_index < _columns.size());
}
};
template <typename W>
requires Writer<W>
void write_missing_columns(W& out, const indexed_columns& columns, const row& row) {
for (const auto value: missing_columns_input_range{columns, row}) {
write_vint(out, value);
}
}
template <typename T, typename W>
requires Writer<W>
void write_unsigned_delta_vint(W& out, T value, T base) {
using unsigned_type = std::make_unsigned_t<T>;
unsigned_type unsigned_delta = static_cast<unsigned_type>(value) - static_cast<unsigned_type>(base);
// sign-extend to 64-bits
using signed_type = std::make_signed_t<T>;
int64_t delta = static_cast<int64_t>(static_cast<signed_type>(unsigned_delta));
// write as unsigned 64-bit varint
write_vint(out, static_cast<uint64_t>(delta));
}
template <typename W>
requires Writer<W>
void write_delta_timestamp(W& out, api::timestamp_type timestamp, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, timestamp, enc_stats.min_timestamp);
}
template <typename W>
requires Writer<W>
void write_delta_ttl(W& out, gc_clock::duration ttl, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, ttl.count(), enc_stats.min_ttl.count());
}
template <typename W>
requires Writer<W>
void write_delta_local_deletion_time(W& out, int64_t local_deletion_time, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, local_deletion_time, enc_stats.min_local_deletion_time.time_since_epoch().count());
}
template <typename W>
requires Writer<W>
void write_delta_local_deletion_time(W& out, gc_clock::time_point ldt, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, ldt.time_since_epoch().count(), enc_stats.min_local_deletion_time.time_since_epoch().count());
}
static bytes_array_vint_size to_bytes_array_vint_size(bytes b) {
bytes_array_vint_size result;
result.value = std::move(b);
return result;
}
static bytes_array_vint_size to_bytes_array_vint_size(const sstring& s) {
bytes_array_vint_size result;
result.value = to_bytes(s);
return result;
}
static sstring pk_type_to_string(const schema& s) {
if (s.partition_key_size() == 1) {
return s.partition_key_columns().begin()->type->name();
} else {
return seastar::format("org.apache.cassandra.db.marshal.CompositeType({})",
fmt::join(s.partition_key_columns()
| std::views::transform(std::mem_fn(&column_definition::type))
| std::views::transform(std::mem_fn(&abstract_type::name)),
","));
}
}
struct sstable_schema {
serialization_header header;
indexed_columns regular_columns;
indexed_columns static_columns;
};
static
sstable_schema make_sstable_schema(const schema& s, const encoding_stats& enc_stats, const sstable_writer_config& cfg) {
sstable_schema sst_sch;
serialization_header& header = sst_sch.header;
// mc serialization header minimum values are delta-encoded based on the default timestamp epoch times
// Note: We rely on implicit conversion to uint64_t when subtracting the signed epoch values below
// for preventing signed integer overflow.
header.min_timestamp_base.value = static_cast<uint64_t>(enc_stats.min_timestamp) - encoding_stats::timestamp_epoch;
header.min_local_deletion_time_base.value = static_cast<uint64_t>(enc_stats.min_local_deletion_time.time_since_epoch().count()) - encoding_stats::deletion_time_epoch;
header.min_ttl_base.value = static_cast<uint64_t>(enc_stats.min_ttl.count()) - encoding_stats::ttl_epoch;
header.pk_type_name = to_bytes_array_vint_size(pk_type_to_string(s));
header.clustering_key_types_names.elements.reserve(s.clustering_key_size());
for (const auto& ck_column : s.clustering_key_columns()) {
auto ck_type_name = to_bytes_array_vint_size(ck_column.type->name());
header.clustering_key_types_names.elements.push_back(std::move(ck_type_name));
}
auto add_column = [&] (const column_definition& column) {
serialization_header::column_desc cd;
cd.name = to_bytes_array_vint_size(column.name());
cd.type_name = to_bytes_array_vint_size(column.type->name());
if (column.is_static()) {
header.static_columns.elements.push_back(std::move(cd));
sst_sch.static_columns.push_back(column);
} else if (column.is_regular()) {
header.regular_columns.elements.push_back(std::move(cd));
sst_sch.regular_columns.push_back(column);
}
};
for (const auto& column : s.v3().all_columns()) {
add_column(column);
}
// For static and regular columns, we write all simple columns first followed by collections
// These containers have columns partitioned by atomicity
auto pred = [] (const std::reference_wrapper<const column_definition>& column) { return column.get().is_atomic(); };
std::ranges::stable_partition(sst_sch.regular_columns, pred);
std::ranges::stable_partition(sst_sch.static_columns, pred);
return sst_sch;
}
enum class cell_flags : uint8_t {
none = 0x00,
is_deleted_mask = 0x01, // Whether the cell is a tombstone or not.
is_expiring_mask = 0x02, // Whether the cell is expiring.
has_empty_value_mask = 0x04, // Whether the cell has an empty value. This will be the case for a tombstone in particular.
use_row_timestamp_mask = 0x08, // Whether the cell has the same timestamp as the row this is a cell of.
use_row_ttl_mask = 0x10, // Whether the cell has the same TTL as the row this is a cell of.
};
inline cell_flags operator& (cell_flags lhs, cell_flags rhs) {
return cell_flags(static_cast<uint8_t>(lhs) & static_cast<uint8_t>(rhs));
}
inline cell_flags& operator |= (cell_flags& lhs, cell_flags rhs) {
lhs = cell_flags(static_cast<uint8_t>(lhs) | static_cast<uint8_t>(rhs));
return lhs;
}
enum class row_flags : uint8_t {
none = 0x00,
// Signal the end of the partition. Nothing follows a <flags> field with that flag.
end_of_partition = 0x01,
// Whether the encoded unfiltered is a marker or a row. All following flags apply only to rows.
is_marker = 0x02,
// Whether the encoded row has a timestamp (i.e. its liveness_info is not empty).
has_timestamp = 0x04,
// Whether the encoded row has some expiration info (i.e. if its liveness_info contains TTL and local_deletion).
has_ttl = 0x08,
// Whether the encoded row has some deletion info.
has_deletion = 0x10,
// Whether the encoded row has all of the columns from the header present.
has_all_columns = 0x20,
// Whether the encoded row has some complex deletion for at least one of its complex columns.
has_complex_deletion = 0x40,
// If present, another byte is read containing the "extended flags" below.
extension_flag = 0x80
};
inline row_flags operator& (row_flags lhs, row_flags rhs) {
return row_flags(static_cast<uint8_t>(lhs) & static_cast<uint8_t>(rhs));
}
inline row_flags& operator |= (row_flags& lhs, row_flags rhs) {
lhs = row_flags(static_cast<uint8_t>(lhs) | static_cast<uint8_t>(rhs));
return lhs;
}
enum class row_extended_flags : uint8_t {
none = 0x00,
// Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
is_static = 0x01,
// Cassandra-specific flag, indicates whether the row deletion is shadowable.
// This flag is deprecated in Origin - see CASSANDRA-11500.
// This flag is never set by Scylla and it fails to read files that have it set.
has_shadowable_deletion_cassandra = 0x02,
// Scylla-specific flag, indicates whether the row deletion is shadowable.
// If set, the shadowable tombstone is written right after the row deletion.
// This is only used by Materialized Views that are not supposed to be exported.
has_shadowable_deletion_scylla = 0x80,
};
// A range tombstone marker (RT marker) represents a bound of a range tombstone
// in a SSTables 3.x ('m') data file.
// RT markers can be of two types called "bounds" and "boundaries" in Origin nomenclature.
//
// A bound simply represents either a start or an end bound of a full range tombstone.
//
// A boundary can be thought of as two merged adjacent bounds and is used to represent adjacent
// range tombstones. An RT marker of a boundary type has two tombstones corresponding to two
// range tombstones this boundary belongs to.
struct rt_marker {
clustering_key_prefix clustering;
bound_kind_m kind;
tombstone tomb;
std::optional<tombstone> boundary_tomb; // only engaged for rt_marker of a boundary type
position_in_partition_view position() const {
if (kind == bound_kind_m::incl_start || kind == bound_kind_m::excl_end_incl_start) {
return position_in_partition_view::before_key(clustering);
}
return position_in_partition_view::after_all_prefixed(clustering);
}
// We need this one to uniformly write rows and RT markers inside write_clustered().
const clustering_key_prefix& key() const { return clustering; }
};
static bound_kind_m get_kind(const clustering_row&) {
return bound_kind_m::clustering;
}
static bound_kind_m get_kind(const rt_marker& marker) {
return marker.kind;
}
template<typename T>
concept Clustered = requires(T t) {
{ t.key() } -> std::convertible_to<const clustering_key_prefix&>;
{ get_kind(t) } -> std::same_as<bound_kind_m>;
};
// Used for writing SSTables in 'mc' format.
class writer : public sstable_writer::writer_impl {
private:
const encoding_stats _enc_stats;
shard_id _shard; // Specifies which shard the new SStable will belong to.
bool _compression_enabled = false;
std::unique_ptr<file_writer> _data_writer;
std::unique_ptr<file_writer> _index_writer;
std::unique_ptr<file_writer> _rows_writer;
std::unique_ptr<file_writer> _partitions_writer;
optimized_optional<trie::bti_row_index_writer> _bti_row_index_writer;
optimized_optional<trie::bti_partition_index_writer> _bti_partition_index_writer;
// The key of the last `consume_new_partition` call.
// A partition key can only be inserted into Partitions.db
// after its intra-partition index is written to Rows.db.
// So we need to hold onto the key until `consume_end_of_partition`.
std::optional<dht::decorated_key> _current_dk_for_bti;
// Position of the Data writer at the moment of the last
// `consume_new_partition` call.
uint64_t _current_partition_position = 0;
// If true, we don't build the bloom filter during the main pass
// (when Data.db is written), but we instead write the murmur hashes
// of keys to a temporary file, and later (after Data.db is written,
// but before the sstable is sealed) we build the bloom filter from that.
//
// (Ideally this mechanism should only be used if the optimal size of the
// filter can't be well estimated in advance. As of this writing we use
// this mechanism every time the Index component isn't being written).
bool _delayed_filter = true;
// The writer of the temporary file used when `_delayed_filter` is true.
std::unique_ptr<file_writer> _hashes_writer;
bool _tombstone_written = false;
bool _static_row_written = false;
// The length of partition header (partition key, partition deletion and static row, if present)
// as written to the data file
// Used for writing promoted index
uint64_t _partition_header_length = 0;
uint64_t _prev_row_start = 0;
std::optional<key> _partition_key;
utils::hashed_key _current_murmur_hash{{0, 0}};
std::optional<key> _first_key, _last_key;
index_sampling_state _index_sampling_state;
bytes_ostream _tmp_bufs;
uint64_t _num_partitions_consumed = 0;
const sstable_schema _sst_schema;
struct cdef_and_collection {
const column_definition* cdef;
std::reference_wrapper<const atomic_cell_or_collection> collection;
};
// Used to defer writing collections until all atomic cells are written
std::vector<cdef_and_collection> _collections;
tombstone _current_tombstone;
struct pi_block {
clustering_info first;
clustering_info last;
uint64_t offset;
uint64_t width;
std::optional<tombstone> open_marker;
// The range tombstone active between this block
// and its predecessor.
// (If there's no predecessor, this is a "live" tombstone).
tombstone preceding_range_tombstone;
};
// _pi_write_m is used temporarily for building the promoted
// index (column sample) of one partition when writing a new sstable.
struct {
// Unfortunately we cannot output the promoted index directly to the
// index file because it needs to be prepended by its size.
// first_entry is used for deferring serialization into blocks for small partitions.
std::optional<pi_block> first_entry;
bytes_ostream blocks; // Serialized pi_blocks.
bytes_ostream offsets; // Serialized block offsets (uint32_t) relative to the start of "blocks".
uint64_t promoted_index_size = 0; // Number of pi_blocks inside blocks and first_entry;
sstables::deletion_time partition_tombstone;
tombstone range_tombstone_preceding_current_block;
uint64_t block_start_offset;
uint64_t block_next_start_offset;
std::optional<clustering_info> first_clustering;
std::optional<clustering_info> last_clustering;
// for this partition
size_t desired_block_size;
size_t auto_scale_threshold;
// from write config
size_t promoted_index_block_size;
size_t promoted_index_auto_scale_threshold;
} _pi_write_m;
run_id _run_identifier;
bool _write_regular_as_static; // See #4139
large_data_stats_entry _partition_size_entry;
large_data_stats_entry _rows_in_partition_entry;
large_data_stats_entry _row_size_entry;
large_data_stats_entry _cell_size_entry;
large_data_stats_entry _elements_in_collection_entry;
void init_file_writers();
// Returns the closed writer
std::unique_ptr<file_writer> close_writer(std::unique_ptr<file_writer>& w);
uint32_t close_digest_writer(std::unique_ptr<file_writer>& w);
void close_data_writer();
void close_index_writer();
void close_rows_writer();
void close_partitions_writer();
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
consume(tombstone());
}
}
void ensure_static_row_is_written_if_needed() {
if (!_sst_schema.static_columns.empty() && !_static_row_written) {
consume(static_row{});
}
}
void drain_tombstones(std::optional<position_in_partition_view> pos = {});
void maybe_add_summary_entry(const dht::token& token, bytes_view key) {
if (_index_writer) {
return sstables::maybe_add_summary_entry(
_sst._components->summary, token, key, get_data_offset(),
_index_writer->offset(), _index_sampling_state);
}
}
void maybe_set_pi_first_clustering(const clustering_info& info, tombstone range_tombstone);
void maybe_add_pi_block();
void add_pi_block();
void write_pi_block(const pi_block&);
uint64_t get_data_offset() const {
if (_sst.has_component(component_type::CompressionInfo)) {
// Variable returned by compressed_file_length() is constantly updated by compressed output stream.
return _sst._components->compression.compressed_file_length();
} else {
return _data_writer->offset();
}
}
void write_delta_timestamp(bytes_ostream& writer, api::timestamp_type timestamp) {
sstables::mc::write_delta_timestamp(writer, timestamp, _enc_stats);
}
void write_delta_ttl(bytes_ostream& writer, gc_clock::duration ttl) {
sstables::mc::write_delta_ttl(writer, ttl, _enc_stats);
}
void write_delta_local_deletion_time(bytes_ostream& writer, gc_clock::time_point ldt) {
sstables::mc::write_delta_local_deletion_time(writer, ldt, _enc_stats);
}
void do_write_delta_deletion_time(bytes_ostream& writer, const tombstone& t) {
sstables::mc::write_delta_timestamp(writer, t.timestamp, _enc_stats);
sstables::mc::write_delta_local_deletion_time(writer, t.deletion_time, _enc_stats);
}
void write_delta_deletion_time(bytes_ostream& writer, const tombstone& t) {
if (t) {
do_write_delta_deletion_time(writer, t);
} else {
sstables::mc::write_delta_timestamp(writer, api::missing_timestamp, _enc_stats);
sstables::mc::write_delta_local_deletion_time(writer, no_deletion_time, _enc_stats);
}
}
deletion_time to_deletion_time(tombstone t) {
deletion_time dt;
if (t) {
bool capped;
int32_t ldt = adjusted_local_deletion_time(t.deletion_time, capped);
if (capped) {
slogger.warn("Capping tombstone local_deletion_time {} to max {}", t.deletion_time.time_since_epoch().count(), ldt);
slogger.warn("Capping tombstone in sstable = {}, partition_key = {}", _sst.get_filename(), _partition_key->to_partition_key(_schema));
_sst.get_stats().on_capped_tombstone_deletion_time();
}
dt.local_deletion_time = ldt;
dt.marked_for_delete_at = t.timestamp;
} else {
// Default values for live, non-deleted rows.
dt.local_deletion_time = no_deletion_time;
dt.marked_for_delete_at = api::missing_timestamp;
}
return dt;
}
struct row_time_properties {
std::optional<api::timestamp_type> timestamp;
std::optional<gc_clock::duration> ttl;
std::optional<gc_clock::time_point> local_deletion_time;
};
void maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows);
void maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const uint64_t row_size);
void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements);
void record_corrupt_row(clustering_row&& clustered_row);
// Writes single atomic cell
void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, std::optional<bytes_view> cell_path = {});
// Writes information about row liveness (formerly 'row marker')
void write_liveness_info(bytes_ostream& writer, const row_marker& marker);
// Writes a collection of cells, representing either a CQL collection or fields of a non-frozen user type
void write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, const column_definition& cdef, collection_mutation_view collection,
const row_time_properties& properties, bool has_complex_deletion);
void write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind,
const row& row_body, const row_time_properties& properties, bool has_complex_deletion);
void write_row_body(bytes_ostream& writer, const clustering_row& row, bool has_complex_deletion);
void write_static_row(const row&, column_kind);
void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key, bool is_dead = false) {
++_c_stats.rows_count;
if (is_dead) {
++_c_stats.dead_rows_count;
}
maybe_record_large_rows(_sst, *_partition_key, clustering_key, row_size);
}
void collect_range_tombstone_stats() {
++_c_stats.rows_count;
++_c_stats.range_tombstones_count;
}
// Clustered is a term used to denote an entity that has a clustering key prefix
// and constitutes an entry of a partition.
// Both clustered_rows and rt_markers are instances of Clustered
void write_clustered(const clustering_row& clustered_row, uint64_t prev_row_size);
void write_clustered(const rt_marker& marker, uint64_t prev_row_size);
template <typename T>
requires Clustered<T>
void write_clustered(const T& clustered, tombstone preceding_range_tombstone) {
clustering_info info {clustered.key(), get_kind(clustered)};
maybe_set_pi_first_clustering(info, preceding_range_tombstone);
uint64_t pos = _data_writer->offset();
write_clustered(clustered, pos - _prev_row_start);
_pi_write_m.last_clustering = info;
_prev_row_start = pos;
maybe_add_pi_block();
}
void write_promoted_index();
void consume(rt_marker&& marker, tombstone preceding_range_tombstone);
// Must be called in a seastar thread.
void flush_tmp_bufs(file_writer& writer) {
for (auto&& buf : _tmp_bufs) {
thread::maybe_yield();
writer.write(buf);
}
_tmp_bufs.clear();
}
void flush_tmp_bufs() {
flush_tmp_bufs(*_data_writer);
}
public:
writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats,
shard_id shard = this_shard_id())
: sstable_writer::writer_impl(sst, s, cfg)
, _enc_stats(enc_stats)
, _shard(shard)
, _tmp_bufs(_sst.sstable_buffer_size)
, _sst_schema(make_sstable_schema(s, _enc_stats, _cfg))
, _run_identifier(cfg.run_identifier)
, _write_regular_as_static(s.is_static_compact_table())
, _partition_size_entry(
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_partition_threshold_bytes(),
}
)
, _rows_in_partition_entry(
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_rows_count_threshold(),
}
)
, _row_size_entry(
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_row_threshold_bytes(),
}
)
, _cell_size_entry(
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_cell_threshold_bytes(),
}
)
, _elements_in_collection_entry(
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_collection_elements_count_threshold(),
}
)
{
// This can be 0 in some cases, which is albeit benign, can wreak havoc
// in lower-level writer code, so clamp it to [1, +inf) here, which is
// exactly what callers used to do anyway.
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
_sst.open_sstable(cfg.origin);
_sst.create_data().get();
_compression_enabled = !_sst.has_component(component_type::CRC);
_delayed_filter = _sst.has_component(component_type::Filter) && !_sst.has_component(component_type::Index);
init_file_writers();
_sst._shards = { shard };
_cfg.monitor->on_write_started(_data_writer->offset_tracker());
if (!_delayed_filter) {
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _sst._schema->bloom_filter_fp_chance(), utils::filter_format::m_format);
}
_pi_write_m.promoted_index_block_size = cfg.promoted_index_block_size;
_pi_write_m.promoted_index_auto_scale_threshold = cfg.promoted_index_auto_scale_threshold;
_index_sampling_state.summary_byte_cost = _cfg.summary_byte_cost;
if (_index_writer) {
prepare_summary(_sst._components->summary, estimated_partitions, _schema.min_index_interval());
}
}
~writer();
void consume_new_partition(const dht::decorated_key& dk) override;
void consume(tombstone t) override;
stop_iteration consume(static_row&& sr) override;
stop_iteration consume(clustering_row&& cr) override;
stop_iteration consume(range_tombstone_change&& rtc) override;
stop_iteration consume_end_of_partition() override;
void consume_end_of_stream() override;
uint64_t data_file_position_for_tests() const override;
};
writer::~writer() {
auto close_writer = [](auto& writer) {
if (writer) {
try {
writer->close();
} catch (...) {
sstlog.error("writer failed to close file: {}", std::current_exception());
}
}
};
close_writer(_index_writer);
close_writer(_data_writer);
close_writer(_partitions_writer);
close_writer(_rows_writer);
close_writer(_hashes_writer);
}
void writer::maybe_set_pi_first_clustering(const clustering_info& info, tombstone preceding_range_tombstone) {
uint64_t pos = _data_writer->offset();
if (!_pi_write_m.first_clustering) {
_pi_write_m.first_clustering = info;
_pi_write_m.range_tombstone_preceding_current_block = preceding_range_tombstone;
_pi_write_m.block_start_offset = pos;
_pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size;
}
}
void writer::add_pi_block() {
auto block = pi_block{
*_pi_write_m.first_clustering,
*_pi_write_m.last_clustering,
_pi_write_m.block_start_offset - _c_stats.start_offset,
_data_writer->offset() - _pi_write_m.block_start_offset,
(_current_tombstone ? std::make_optional(_current_tombstone) : std::optional<tombstone>{}),
_pi_write_m.range_tombstone_preceding_current_block,
};
// An index with only one block (that spans the entire partition)
// would be relatively useless, and wouldn't carry its weight.
// So if we are adding the first block, we don't write it out yet --
// we only remember it in `first_entry`, and we wait until another
// entry appears. (If it doesn't, then there will be no row index
// for this partition).
// If this is a second block, we write out the deferred `first_entry`
// first, and then we proceed normally.
if (_pi_write_m.promoted_index_size == 0) {
_pi_write_m.first_entry.emplace(std::move(block));
++_pi_write_m.promoted_index_size;
return;
} else if (_pi_write_m.promoted_index_size == 1) {
write_pi_block(*_pi_write_m.first_entry);
}
write_pi_block(block);
++_pi_write_m.promoted_index_size;
// auto-scale?
if (_pi_write_m.blocks.size() >= _pi_write_m.auto_scale_threshold) {
_pi_write_m.desired_block_size *= 2;
_pi_write_m.auto_scale_threshold += _pi_write_m.promoted_index_auto_scale_threshold;
_sst.get_stats().on_promoted_index_auto_scale();
}
}
void writer::maybe_add_pi_block() {
uint64_t pos = _data_writer->offset();
if (pos >= _pi_write_m.block_next_start_offset) {
add_pi_block();
_pi_write_m.first_clustering.reset();
_pi_write_m.range_tombstone_preceding_current_block = tombstone();
_pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size;
}
}
void writer::init_file_writers() {
auto out = _sst._storage->make_data_or_index_sink(_sst, component_type::Data).get();
if (!_compression_enabled) {
_data_writer = std::make_unique<crc32_checksummed_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.get_filename());
} else {
auto compressor = _sst.manager().get_compressor_factory().make_compressor_for_writing(_sst._schema).get();
_data_writer = std::make_unique<file_writer>(
make_compressed_file_m_format_output_stream(
output_stream<char>(std::move(out)),
&_sst._components->compression,
_sst._schema->get_compressor_params(),
std::move(compressor)), _sst.get_filename());
}
if (_sst.has_component(component_type::Index)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get();
_index_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.index_filename());
}
if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get();
_rows_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows));
_bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer);
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get();
_partitions_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions));
_bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer);
}
if (_delayed_filter) {
file_output_stream_options options;
options.buffer_size = 32 * 1024;
_hashes_writer = std::make_unique<file_writer>(_sst.make_component_file_writer(component_type::TemporaryHashes, std::move(options),
sstable_write_open_flags).get());
}
}
std::unique_ptr<file_writer> writer::close_writer(std::unique_ptr<file_writer>& w) {
auto writer = std::move(w);
writer->close();
return writer;
}
uint32_t writer::close_digest_writer(std::unique_ptr<file_writer>& w) {
auto writer = close_writer(w);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
return chksum_wr->full_checksum();
}
void writer::close_data_writer() {
auto writer = close_writer(_data_writer);
if (!_compression_enabled) {
auto chksum_wr = static_cast<crc32_checksummed_file_writer*>(writer.get());
_sst.write_digest(chksum_wr->full_checksum());
_sst.write_crc(chksum_wr->finalize_checksum());
} else {
_sst.write_digest(_sst._components->compression.get_full_checksum());
}
}
void writer::close_index_writer() {
if (_index_writer) {
_sst.get_components_digests().map[component_type::Index] = close_digest_writer(_index_writer);
}
}
void writer::close_partitions_writer() {
if (_partitions_writer) {
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
_sst.get_version(),
_first_key.value(),
_last_key.value());
_sst.get_components_digests().map[component_type::Partitions] = close_digest_writer(_partitions_writer);
}
}
void writer::close_rows_writer() {
if (_rows_writer) {
// Append some garbage padding to the file just to ensure that it's never empty.
// (Otherwise it would be empty if the sstable contains only small partitions).
// This is a hack to work around some bad interactions between zero-sized files
// and object storage. (It seems that e.g. minio considers a zero-sized file
// upload to be a no-op, which breaks some assumptions).
uint32_t garbage = seastar::cpu_to_be(0x13371337);
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
_sst.get_components_digests().map[component_type::Rows] = close_digest_writer(_rows_writer);
}
}
void writer::consume_new_partition(const dht::decorated_key& dk) {
_c_stats.start_offset = _data_writer->offset();
_prev_row_start = _data_writer->offset();
_partition_key = key::from_partition_key(_schema, dk.key());
maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key));
_current_murmur_hash = utils::make_hashed_key(bytes_view(*_partition_key));
if (_hashes_writer) {
std::array<uint64_t, 2> hash = {
seastar::cpu_to_le(_current_murmur_hash.hash()[0]),
seastar::cpu_to_le(_current_murmur_hash.hash()[1])
};
_hashes_writer->write(reinterpret_cast<const char*>(hash.data()), sizeof(hash));
} else {
_sst._components->filter->add(_current_murmur_hash);
}
_collector.add_key(bytes_view(*_partition_key));
_num_partitions_consumed++;
auto p_key = disk_string_view<uint16_t>();
p_key.value = bytes_view(*_partition_key);
if (_index_writer) {
// Write index file entry from partition key into index file.
// Write an index entry minus the "promoted index" (sample of columns)
// part. We can only write that after processing the entire partition
// and collecting the sample of columns.
write(_sst.get_version(), *_index_writer, p_key);
write_vint(*_index_writer, _data_writer->offset());
}
_current_dk_for_bti = dk;
_current_partition_position = _data_writer->offset();
_pi_write_m.first_entry.reset();
_pi_write_m.blocks.clear();
_pi_write_m.offsets.clear();
_pi_write_m.promoted_index_size = 0;
_pi_write_m.partition_tombstone = {};
_pi_write_m.first_clustering.reset();
_pi_write_m.last_clustering.reset();
_pi_write_m.desired_block_size = _pi_write_m.promoted_index_block_size;
_pi_write_m.auto_scale_threshold = _pi_write_m.promoted_index_auto_scale_threshold;
write(_sst.get_version(), *_data_writer, p_key);
_partition_header_length = _data_writer->offset() - _c_stats.start_offset;
_tombstone_written = false;
_static_row_written = false;
}
void writer::consume(tombstone t) {
uint64_t current_pos = _data_writer->offset();
_pi_write_m.partition_tombstone = to_deletion_time(t);
write(_sst.get_version(), *_data_writer, _pi_write_m.partition_tombstone);
_partition_header_length += (_data_writer->offset() - current_pos);
_c_stats.update(t);
_tombstone_written = true;
if (t) {
_collector.update_min_max_components(position_in_partition_view::before_all_clustered_rows());
_collector.update_min_max_components(position_in_partition_view::after_all_clustered_rows());
}
}
void writer::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t partition_size, uint64_t rows, uint64_t range_rombstones, uint64_t dead_rows) {
auto& size_entry = _partition_size_entry;
auto& row_count_entry = _rows_in_partition_entry;
size_entry.max_value = std::max(size_entry.max_value, partition_size);
row_count_entry.max_value = std::max(row_count_entry.max_value, rows);
auto ret = _sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size, rows, range_rombstones, dead_rows).get();
size_entry.above_threshold += unsigned(bool(ret.size));
row_count_entry.above_threshold += unsigned(bool(ret.rows));
}
void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) {
auto& entry = _row_size_entry;
if (entry.max_value < row_size) {
entry.max_value = row_size;
}
if (_sst.get_large_data_handler().maybe_record_large_rows(sst, partition_key, clustering_key, row_size).get()) {
entry.above_threshold++;
};
}
void writer::maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) {
auto& cell_size_entry = _cell_size_entry;
if (cell_size_entry.max_value < cell_size) {
cell_size_entry.max_value = cell_size;
}
auto& collection_elements_entry = _elements_in_collection_entry;
if (collection_elements_entry.max_value < collection_elements) {
collection_elements_entry.max_value = collection_elements;
}
if (_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size, collection_elements).get()) {
if (cell_size > cell_size_entry.threshold) {
cell_size_entry.above_threshold++;
}
if (collection_elements > collection_elements_entry.threshold) {
collection_elements_entry.above_threshold++;
}
};
}
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
const column_definition& cdef, const row_time_properties& properties, std::optional<bytes_view> cell_path) {
uint64_t current_pos = writer.size();
bool is_deleted = !cell.is_live();
bool has_value = !is_deleted && !cell.value().empty();
bool use_row_timestamp = (properties.timestamp == cell.timestamp());
bool is_row_expiring = properties.ttl.has_value();
bool is_cell_expiring = cell.is_live_and_has_ttl();
bool use_row_ttl = is_row_expiring && is_cell_expiring &&
properties.ttl == cell.ttl() &&
properties.local_deletion_time == cell.deletion_time();
cell_flags flags = cell_flags::none;
if ((!has_value && !cdef.is_counter()) || is_deleted) {
flags |= cell_flags::has_empty_value_mask;
}
if (is_deleted) {
flags |= cell_flags::is_deleted_mask;
} else if (is_cell_expiring) {
flags |= cell_flags::is_expiring_mask;
}
if (use_row_timestamp) {
flags |= cell_flags::use_row_timestamp_mask;
}
if (use_row_ttl) {
flags |= cell_flags::use_row_ttl_mask;
}
write(_sst.get_version(), writer, flags);
if (!use_row_timestamp) {
write_delta_timestamp(writer, cell.timestamp());
}
if (!use_row_ttl) {
if (is_deleted) {
write_delta_local_deletion_time(writer, cell.deletion_time());
} else if (is_cell_expiring) {
write_delta_local_deletion_time(writer, cell.expiry());
write_delta_ttl(writer, cell.ttl());
}
}
if (bool(cell_path)) {
write_vint(writer, cell_path->size());
write(_sst.get_version(), writer, *cell_path);
}
if (cdef.is_counter()) {
if (!is_deleted) {
SCYLLA_ASSERT(!cell.is_counter_update());
auto ccv = counter_cell_view(cell);
write_counter_value(ccv, writer, _sst.get_version(), [] (bytes_ostream& out, uint32_t value) {
return write_vint(out, value);
});
}
} else {
if (has_value) {
write_cell_value(_sst.get_version(), writer, *cdef.type, cell.value());
}
}
// Collect cell statistics
// We record collections in write_collection, so ignore them here
if (cdef.is_atomic()) {
uint64_t size = writer.size() - current_pos;
maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size, 0);
}
auto timestamp = cell.timestamp();
if (is_deleted) {
_c_stats.update_timestamp(timestamp, is_live::no);
_c_stats.update_local_deletion_time_and_tombstone_histogram(cell.deletion_time());
_sst.get_stats().on_cell_tombstone_write();
return;
}
_c_stats.update_timestamp(timestamp, is_live::yes);
if (is_cell_expiring) {
_c_stats.update_ttl(cell.ttl());
// tombstone histogram is updated with expiration time because if ttl is longer
// than gc_grace_seconds for all data, sstable will be considered fully expired
// when actually nothing is expired.
_c_stats.update_local_deletion_time_and_tombstone_histogram(cell.expiry());
} else { // regular live cell
_c_stats.update_local_deletion_time(std::numeric_limits<int>::max());
}
_sst.get_stats().on_cell_write();
}
void writer::write_liveness_info(bytes_ostream& writer, const row_marker& marker) {
if (marker.is_missing()) {
return;
}
api::timestamp_type timestamp = marker.timestamp();
if (marker.is_live()) {
_c_stats.update_timestamp(timestamp, is_live::yes);
_c_stats.update_live_row_marker_timestamp(timestamp);
} else {
_c_stats.update_timestamp(timestamp, is_live::no);
}
write_delta_timestamp(writer, timestamp);
auto write_expiring_liveness_info = [this, &writer] (gc_clock::duration ttl, gc_clock::time_point ldt) {
_c_stats.update_ttl(ttl);
_c_stats.update_local_deletion_time_and_tombstone_histogram(ldt);
write_delta_ttl(writer, ttl);
write_delta_local_deletion_time(writer, ldt);
};
if (!marker.is_live()) {
write_expiring_liveness_info(gc_clock::duration(expired_liveness_ttl), marker.deletion_time());
} else if (marker.is_expiring()) {
write_expiring_liveness_info(marker.ttl(), marker.expiry());
} else {
_c_stats.update_ttl(0);
_c_stats.update_local_deletion_time(std::numeric_limits<int32_t>::max());
}
}
void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key,
const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties,
bool has_complex_deletion) {
uint64_t current_pos = writer.size();
uint64_t collection_elements = 0;
collection.with_deserialized(*cdef.type, [&] (collection_mutation_view_description mview) {
if (has_complex_deletion) {
write_delta_deletion_time(writer, mview.tomb);
_c_stats.update(mview.tomb);
}
collection_elements = mview.cells.size();
write_vint(writer, collection_elements);
if (!mview.cells.empty()) {
++_c_stats.column_count;
}
for (const auto& [cell_path, cell]: mview.cells) {
write_cell(writer, clustering_key, cell, cdef, properties, cell_path);
thread::maybe_yield();
}
_c_stats.cells_count += collection_elements;
});
uint64_t size = writer.size() - current_pos;
maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size, collection_elements);
}
void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body,
const row_time_properties& properties, bool has_complex_deletion) {
// Note that missing columns are written based on the whole set of regular columns as defined by schema.
// This differs from Origin where all updated columns are tracked and the set of filled columns of a row
// is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases
// but still valid.
write_missing_columns(writer, kind == column_kind::static_column ? _sst_schema.static_columns : _sst_schema.regular_columns, row_body);
row_body.for_each_cell([this, &writer, kind, &properties, clustering_key] (column_id id, const atomic_cell_or_collection& c) {
auto&& column_definition = _schema.column_at(kind, id);
if (!column_definition.is_atomic()) {
_collections.push_back({&column_definition, c});
return;
}
atomic_cell_view cell = c.as_atomic_cell(column_definition);
++_c_stats.cells_count;
++_c_stats.column_count;
write_cell(writer, clustering_key, cell, column_definition, properties);
});
for (const auto& col: _collections) {
write_collection(writer, clustering_key, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion);
}
_collections.clear();
}
void writer::write_row_body(bytes_ostream& writer, const clustering_row& row, bool has_complex_deletion) {
write_liveness_info(writer, row.marker());
auto write_tombstone_and_update_stats = [this, &writer] (const tombstone& t) {
_c_stats.do_update(t);
do_write_delta_deletion_time(writer, t);
};
if (row.tomb().regular()) {
write_tombstone_and_update_stats(row.tomb().regular());
}
if (row.tomb().is_shadowable()) {
write_tombstone_and_update_stats(row.tomb().tomb());
}
row_time_properties properties;
if (!row.marker().is_missing()) {
properties.timestamp = row.marker().timestamp();
if (row.marker().is_expiring()) {
properties.ttl = row.marker().ttl();
properties.local_deletion_time = row.marker().deletion_time();
}
}
return write_cells(writer, &row.key(), column_kind::regular_column, row.cells(), properties, has_complex_deletion);
}
// Find if any complex column (collection or non-frozen user type) in the row contains a column-wide tombstone
static bool row_has_complex_deletion(const schema& s, const row& r, column_kind kind) {
bool result = false;
r.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& c) {
auto&& cdef = s.column_at(kind, id);
if (cdef.is_atomic()) {
return stop_iteration::no;
}
return c.as_collection_mutation().with_deserialized(*cdef.type, [&] (collection_mutation_view_description mview) {
if (mview.tomb) {
result = true;
}
return stop_iteration(static_cast<bool>(mview.tomb));
});
});
return result;
}
void writer::write_static_row(const row& static_row, column_kind kind) {
uint64_t current_pos = _data_writer->offset();
// Static row flag is stored in extended flags so extension_flag is always set for static rows
row_flags flags = row_flags::extension_flag;
if (static_row.size() == _sst_schema.static_columns.size()) {
flags |= row_flags::has_all_columns;
}
bool has_complex_deletion = row_has_complex_deletion(_schema, static_row, kind);
if (has_complex_deletion) {
flags |= row_flags::has_complex_deletion;
}
write(_sst.get_version(), *_data_writer, flags);
write(_sst.get_version(), *_data_writer, row_extended_flags::is_static);
write_vint(_tmp_bufs, 0); // as the static row always comes first, the previous row size is always zero
write_cells(_tmp_bufs, nullptr, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion);
write_vint(*_data_writer, _tmp_bufs.size());
flush_tmp_bufs();
_partition_header_length += (_data_writer->offset() - current_pos);
collect_row_stats(_data_writer->offset() - current_pos, nullptr);
_static_row_written = true;
}
stop_iteration writer::consume(static_row&& sr) {
ensure_tombstone_is_written();
write_static_row(sr.cells(), column_kind::static_column);
return stop_iteration::no;
}
void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_row_size) {
uint64_t current_pos = _data_writer->offset();
row_flags flags = row_flags::none;
row_extended_flags ext_flags = row_extended_flags::none;
const row_marker& marker = clustered_row.marker();
if (!marker.is_missing()) {
flags |= row_flags::has_timestamp;
if (!marker.is_live() || marker.is_expiring()) {
flags |= row_flags::has_ttl;
}
}
const bool is_dead = !clustered_row.is_live(_schema, tombstone(), gc_clock::now());
if (clustered_row.tomb().regular()) {
flags |= row_flags::has_deletion;
}
if (clustered_row.tomb().is_shadowable()) {
flags |= row_flags::extension_flag;
ext_flags = row_extended_flags::has_shadowable_deletion_scylla;
}
if (clustered_row.cells().size() == _sst_schema.regular_columns.size()) {
flags |= row_flags::has_all_columns;
}
bool has_complex_deletion = row_has_complex_deletion(_schema, clustered_row.cells(), column_kind::regular_column);
if (has_complex_deletion) {
flags |= row_flags::has_complex_deletion;
}
write(_sst.get_version(), *_data_writer, flags);
if (ext_flags != row_extended_flags::none) {
write(_sst.get_version(), *_data_writer, ext_flags);
}
write_clustering_prefix(_sst.get_version(), *_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
write_vint(_tmp_bufs, prev_row_size);
write_row_body(_tmp_bufs, clustered_row, has_complex_deletion);
uint64_t row_body_size = _tmp_bufs.size();
write_vint(*_data_writer, row_body_size);
flush_tmp_bufs();
// Collect statistics
_collector.update_min_max_components(clustered_row.position());
collect_row_stats(_data_writer->offset() - current_pos, &clustered_row.key(), is_dead);
}
void writer::record_corrupt_row(clustering_row&& clustered_row) {
auto& handler = _sst.get_corrupt_data_handler();
const auto pk = _partition_key->to_partition_key(_schema);
const auto ck = clustered_row.key();
db::corrupt_data_handler::entry_id corrupt_row_id;
sstring result;
try {
corrupt_row_id = handler.record_corrupt_clustering_row(_schema, pk, std::move(clustered_row), "sstable-write", fmt::to_string(_sst.get_filename())).get();
result = format("written corrupt row to {} with id {}", handler.storage_name(), corrupt_row_id);
} catch (...) {
result = format("failed to write corrupt row to {}: {}", handler.storage_name(), std::current_exception());
}
slogger.error("found non-full clustering key {} in partition {} while writing sstable {} for non-compact table {}.{}; {}",
ck,
pk,
_sst.get_filename(),
_schema.ks_name(),
_schema.cf_name(),
result);
}
stop_iteration writer::consume(clustering_row&& cr) {
if (_write_regular_as_static) {
ensure_tombstone_is_written();
write_static_row(cr.cells(), column_kind::regular_column);
return stop_iteration::no;
}
if (!_schema.is_compact_table() && !cr.key().is_full(_schema)) {
record_corrupt_row(std::move(cr));
return stop_iteration::no;
}
ensure_tombstone_is_written();
ensure_static_row_is_written_if_needed();
write_clustered(cr, _current_tombstone);
auto can_split_partition_at_clustering_boundary = [this] {
// will allow size limit to be exceeded for 10%, so we won't perform unnecessary split
// of a partition which crossed the limit by a small margin.
uint64_t size_threshold = [this] {
const uint64_t max_size = std::numeric_limits<uint64_t>::max();
if (_cfg.max_sstable_size == max_size) {
return max_size;
}
uint64_t threshold_goal = _cfg.max_sstable_size * 1.1;
// handle overflow.
return threshold_goal < _cfg.max_sstable_size ? max_size : threshold_goal;
}();
// Check there are enough promoted index entries, meaning that current fragment won't
// unnecessarily cut the current partition in the middle.
bool has_enough_promoted_index_entries = _pi_write_m.promoted_index_size >= 2;
return get_data_offset() > size_threshold && has_enough_promoted_index_entries;
};
return stop_iteration(can_split_partition_at_clustering_boundary());
}
// Write clustering prefix along with its bound kind and, if not full, its size
template <typename W>
requires Writer<W>
static void write_clustering_prefix(sstable_version_types v, W& writer, bound_kind_m kind,
const schema& s, const clustering_key_prefix& clustering) {
SCYLLA_ASSERT(kind != bound_kind_m::static_clustering);
write(v, writer, kind);
auto is_ephemerally_full = ephemerally_full_prefix{s.is_compact_table()};
if (kind != bound_kind_m::clustering) {
// Don't use ephemerally full for RT bounds as they're always non-full
is_ephemerally_full = ephemerally_full_prefix::no;
write(v, writer, static_cast<uint16_t>(clustering.size(s)));
}
write_clustering_prefix(v, writer, s, clustering, is_ephemerally_full);
}
void writer::write_promoted_index() {
if (_index_writer) {
if (_pi_write_m.promoted_index_size < 2) {
write_vint(*_index_writer, uint64_t(0));
return;
}
write_vint(_tmp_bufs, _partition_header_length);
write(_sst.get_version(), _tmp_bufs, _pi_write_m.partition_tombstone);
write_vint(_tmp_bufs, _pi_write_m.promoted_index_size);
uint64_t pi_size = _tmp_bufs.size() + _pi_write_m.blocks.size() + _pi_write_m.offsets.size();
write_vint(*_index_writer, pi_size);
flush_tmp_bufs(*_index_writer);
write(_sst.get_version(), *_index_writer, _pi_write_m.blocks);
write(_sst.get_version(), *_index_writer, _pi_write_m.offsets);
}
}
void writer::write_pi_block(const pi_block& block) {
if (_index_writer) {
static constexpr size_t width_base = 65536;
bytes_ostream& blocks = _pi_write_m.blocks;
uint32_t offset = blocks.size();
write(_sst.get_version(), _pi_write_m.offsets, offset);
write_clustering_prefix(_sst.get_version(), blocks, block.first.kind, _schema, block.first.clustering);
write_clustering_prefix(_sst.get_version(), blocks, block.last.kind, _schema, block.last.clustering);
write_vint(blocks, block.offset);
write_signed_vint(blocks, block.width - width_base);
write(_sst.get_version(), blocks, static_cast<std::byte>(block.open_marker ? 1 : 0));
if (block.open_marker) {
write(_sst.get_version(), blocks, to_deletion_time(*block.open_marker));
}
}
if (_bti_row_index_writer) {
_bti_row_index_writer->add(
_schema,
block.first,
block.last,
block.offset,
to_deletion_time(block.preceding_range_tombstone)
);
}
}
void writer::write_clustered(const rt_marker& marker, uint64_t prev_row_size) {
write(_sst.get_version(), *_data_writer, row_flags::is_marker);
write_clustering_prefix(_sst.get_version(), *_data_writer, marker.kind, _schema, marker.clustering);
auto write_marker_body = [this, &marker] (bytes_ostream& writer) {
write_delta_deletion_time(writer, marker.tomb);
_c_stats.update(marker.tomb);
if (marker.boundary_tomb) {
do_write_delta_deletion_time(writer, *marker.boundary_tomb);
_c_stats.do_update(*marker.boundary_tomb);
}
};
write_vint(_tmp_bufs, prev_row_size);
write_marker_body(_tmp_bufs);
write_vint(*_data_writer, _tmp_bufs.size());
flush_tmp_bufs();
_collector.update_min_max_components(marker.position());
collect_range_tombstone_stats();
}
void writer::consume(rt_marker&& marker, tombstone preceding_range_tombstone) {
write_clustered(marker, preceding_range_tombstone);
}
stop_iteration writer::consume(range_tombstone_change&& rtc) {
ensure_tombstone_is_written();
ensure_static_row_is_written_if_needed();
position_in_partition_view pos = rtc.position();
if (!_current_tombstone && !rtc.tombstone()) {
return stop_iteration::no;
}
tombstone prev_tombstone = std::exchange(_current_tombstone, rtc.tombstone());
if (!prev_tombstone) { // start bound
auto bv = pos.as_start_bound_view();
consume(
rt_marker{pos.key(), to_bound_kind_m(bv.kind()), rtc.tombstone(), {}},
prev_tombstone);
} else if (!rtc.tombstone()) { // end bound
auto bv = pos.as_end_bound_view();
consume(
rt_marker{pos.key(), to_bound_kind_m(bv.kind()), prev_tombstone, {}},
prev_tombstone);
} else { // boundary
auto bk = pos.get_bound_weight() == bound_weight::before_all_prefixed
? bound_kind_m::excl_end_incl_start
: bound_kind_m::incl_end_excl_start;
consume(
rt_marker{pos.key(), bk, prev_tombstone, rtc.tombstone()},
prev_tombstone);
}
return stop_iteration::no;
}
stop_iteration writer::consume_end_of_partition() {
ensure_tombstone_is_written();
ensure_static_row_is_written_if_needed();
uint64_t end_of_partition_position = _data_writer->offset();
auto write_end_of_partition = [&] {
write(_sst.get_version(), *_data_writer, row_flags::end_of_partition);
};
auto maybe_add_pi_block = [&] {
if (_pi_write_m.promoted_index_size && _pi_write_m.first_clustering) {
add_pi_block();
}
};
if (_features.is_enabled(CorrectLastPiBlockWidth)) [[likely]] {
maybe_add_pi_block();
write_end_of_partition();
} else {
write_end_of_partition();
maybe_add_pi_block();
}
write_promoted_index();
if (_bti_partition_index_writer) {
auto partitions_db_payload = _bti_row_index_writer->finish(
_sst.get_version(),
_schema,
_current_partition_position,
end_of_partition_position,
*_partition_key,
_pi_write_m.partition_tombstone
);
_bti_partition_index_writer->add(
_schema,
*std::exchange(_current_dk_for_bti, std::nullopt),
_current_murmur_hash,
partitions_db_payload
);
}
// compute size of the current row.
_c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset;
maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size, _c_stats.rows_count, _c_stats.range_tombstones_count, _c_stats.dead_rows_count);
// update is about merging column_stats with the data being stored by collector.
_collector.update(std::move(_c_stats));
_c_stats.reset();
if (!_first_key) {
_first_key = *_partition_key;
}
_last_key = std::move(*_partition_key);
_partition_key = std::nullopt;
return get_data_offset() < _cfg.max_sstable_size ? stop_iteration::no : stop_iteration::yes;
}
void writer::consume_end_of_stream() {
_cfg.monitor->on_data_write_completed();
if (_sst._components->summary) {
seal_summary(_sst._components->summary, std::optional<key>(_first_key), std::optional<key>(_last_key), _index_sampling_state).get();
}
if (_sst.has_component(component_type::CompressionInfo)) {
_collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
close_index_writer();
close_partitions_writer();
close_rows_writer();
if (_hashes_writer) {
close_writer(_hashes_writer);
}
_sst.set_first_and_last_keys();
_sst._components->statistics.contents[metadata_type::Serialization] = std::make_unique<serialization_header>(std::move(_sst_schema.header));
seal_statistics(_sst.get_version(), _sst._components->statistics, _collector,
_sst._schema->get_partitioner().name(), _sst._schema->bloom_filter_fp_chance(),
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key(), _enc_stats);
close_data_writer();
if (_sst._components->summary) {
_sst.write_summary();
}
if (_delayed_filter) {
_sst.build_delayed_filter(_num_partitions_consumed);
} else {
_sst.maybe_rebuild_filter_from_index(_num_partitions_consumed);
}
_sst.write_filter();
_sst.write_statistics();
_sst.write_compression();
// Note: during the SSTable write, the `compressor` object in `_sst._components->compression`
// can only compress, not decompress. We have to create a decompressing `compressor` here.
// (The reason we split the two is that we don't want to keep the compressor-specific compression
// context after the write is over, because it hogs memory).
auto decompressor = _sst.manager().get_compressor_factory().make_compressor_for_reading(_sst._components->compression).get();
_sst._components->compression.set_compressor(std::move(decompressor));
run_identifier identifier{_run_identifier};
std::optional<scylla_metadata::large_data_stats> ld_stats(scylla_metadata::large_data_stats{
.map = {
{ large_data_type::partition_size, std::move(_partition_size_entry) },
{ large_data_type::rows_in_partition, std::move(_rows_in_partition_entry) },
{ large_data_type::row_size, std::move(_row_size_entry) },
{ large_data_type::cell_size, std::move(_cell_size_entry) },
{ large_data_type::elements_in_collection, std::move(_elements_in_collection_entry) },
}
});
std::optional<scylla_metadata::ext_timestamp_stats> ts_stats(scylla_metadata::ext_timestamp_stats{
.map = _collector.get_ext_timestamp_stats()
});
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats));
if (!_cfg.leave_unsealed) {
_sst.seal_sstable(_cfg.backup).get();
}
}
uint64_t writer::data_file_position_for_tests() const {
return _data_writer->offset();
}
std::unique_ptr<sstable_writer::writer_impl> make_writer(sstable& sst,
const schema& s,
uint64_t estimated_partitions,
const sstable_writer_config& cfg,
encoding_stats enc_stats,
shard_id shard) {
return std::make_unique<writer>(sst, s, estimated_partitions, cfg, enc_stats, shard);
}
}
}