diff --git a/clustering_bounds_comparator.hh b/clustering_bounds_comparator.hh index b4f0545601..c5d425db5e 100644 --- a/clustering_bounds_comparator.hh +++ b/clustering_bounds_comparator.hh @@ -32,7 +32,10 @@ enum class bound_kind : uint8_t { excl_end = 0, incl_start = 1, - // values 2 to 5 are reserved for forward Origin compatibility + excl_end_incl_start = 2, + static_clustering = 3, + clustering = 4, + incl_end_excl_start = 5, incl_end = 6, excl_start = 7, }; diff --git a/configure.py b/configure.py index 9bf948c85c..3f6e52d71d 100755 --- a/configure.py +++ b/configure.py @@ -412,6 +412,7 @@ scylla_core = (['database.cc', 'sstables/compaction_manager.cc', 'sstables/integrity_checked_file_impl.cc', 'sstables/prepended_input_stream.cc', + 'sstables/m_format_write_helpers.cc', 'transport/event.cc', 'transport/event_notifier.cc', 'transport/server.cc', diff --git a/keys.cc b/keys.cc index 6f9af7c88b..8da170dba3 100644 --- a/keys.cc +++ b/keys.cc @@ -63,6 +63,14 @@ std::ostream& operator<<(std::ostream& out, const bound_kind k) { return out << "excl end"; case bound_kind::incl_start: return out << "incl start"; + case bound_kind::excl_end_incl_start: + return out << "excl end + incl start"; + case bound_kind::static_clustering: + return out << "static clustering"; + case bound_kind ::clustering: + return out << "clustering"; + case bound_kind::incl_end_excl_start: + return out << "incl end + excl start"; case bound_kind::incl_end: return out << "incl end"; case bound_kind::excl_start: @@ -77,6 +85,10 @@ bound_kind invert_kind(bound_kind k) { case bound_kind::incl_start: return bound_kind::excl_end; case bound_kind::excl_end: return bound_kind::incl_start; case bound_kind::incl_end: return bound_kind::excl_start; + case bound_kind::excl_end_incl_start: return bound_kind::incl_end_excl_start; + case bound_kind::incl_end_excl_start: return bound_kind::excl_end_incl_start; + case bound_kind::static_clustering: return bound_kind::static_clustering; + case bound_kind::clustering: return bound_kind::clustering; } abort(); } @@ -91,6 +103,8 @@ int32_t weight(bound_kind k) { return 1; case bound_kind::excl_start: return 2; + default: + throw std::invalid_argument(sprint("weight() is not defined for bound_kind {}", k)); } abort(); } diff --git a/mutation_partition.cc b/mutation_partition.cc index 0c5e65c5ef..b436b05bf1 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -443,7 +443,10 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::timestamp_type created_at) { clustered_row(s, key).apply(row_marker(created_at)); } - +void mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::timestamp_type created_at, + gc_clock::duration ttl, gc_clock::time_point expiry) { + clustered_row(s, key).apply(row_marker(created_at, ttl, expiry)); +} void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) { auto e = current_allocator().construct(key, std::move(row)); _rows.insert(_rows.end(), *e, rows_entry::compare(s)); diff --git a/mutation_partition.hh b/mutation_partition.hh index d8a6798923..e3d010a3bc 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -953,6 +953,8 @@ public: void apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t); // Equivalent to applying a mutation with an empty row, created with given timestamp void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at); + void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at, + gc_clock::duration ttl, gc_clock::time_point expiry); // prefix must not be full void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); void apply_row_tombstone(const schema& schema, range_tombstone rt); diff --git a/position_in_partition.hh b/position_in_partition.hh index f4b13e5792..f7084b98e8 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -58,10 +58,15 @@ int position_weight(bound_kind k) { switch(k) { case bound_kind::excl_end: case bound_kind::incl_start: + case bound_kind::excl_end_incl_start: return -1; case bound_kind::incl_end: case bound_kind::excl_start: + case bound_kind::incl_end_excl_start: return 1; + case bound_kind::clustering: + case bound_kind::static_clustering: + return 0; } abort(); } diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 1215dd0deb..7cc6ae3e99 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -549,7 +549,8 @@ public: cfg.max_sstable_size = _max_sstable_size; cfg.monitor = &_active_write_monitors.back(); cfg.large_partition_warning_threshold_bytes = _cf.large_partition_warning_threshold_bytes(); - _writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority)); + // TODO: calculate encoding_stats based on statistics of compacted sstables + _writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, encoding_stats{}, priority)); } return &*_writer; } @@ -696,7 +697,8 @@ public: sstable_writer_config cfg; cfg.max_sstable_size = _max_sstable_size; auto&& priority = service::get_local_compaction_priority(); - writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, priority, _shard)); + // TODO: calculate encoding_stats based on statistics of compacted sstables + writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard)); } return &*writer; } diff --git a/sstables/m_format_write_helpers.cc b/sstables/m_format_write_helpers.cc new file mode 100644 index 0000000000..bb9583516e --- /dev/null +++ b/sstables/m_format_write_helpers.cc @@ -0,0 +1,315 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include + +#include "encoding_stats.hh" +#include "schema.hh" +#include "mutation_fragment.hh" +#include "vint-serialization.hh" +#include "sstables/types.hh" +#include "sstables/m_format_write_helpers.hh" +#include "sstables/writer.hh" + +namespace sstables { + +template +inline void write_vint_impl(file_writer& out, T value) { + using vint_type = std::conditional_t, unsigned_vint, signed_vint>; + std::array encoding_buffer; + const auto size = vint_type::serialize(value, encoding_buffer.begin()); + out.write(reinterpret_cast(encoding_buffer.data()), size); +} + +void write_unsigned_vint(file_writer& out, uint64_t value) { + return write_vint_impl(out, value); +} + +void write_signed_vint(file_writer& out, int64_t value) { + return write_vint_impl(out, value); +} + +// A helper CRTP base class for input ranges. +// Derived classes should implement the following functions: +// bool next() const; +// generates the next value, if possible; +// returns true if the next value has been evaluated, false otherwise +// explicit operator bool() const; +// tells whether the range can produce more items +// TODO: turn description into a concept +template +struct input_range_base { +private: + + InputRange& self() { + return static_cast(*this); + } + + const InputRange& self() const { + return static_cast(*this); + } + +public: + // Use the same type for iterator and const_iterator + using const_iterator = class iterator + : public boost::iterator_facade< + iterator, + const ValueType, + std::input_iterator_tag, + const ValueType + > + { + private: + const InputRange* _range; + + friend class input_range_base; + friend class boost::iterator_core_access; + + explicit iterator(const InputRange& range) + : _range(range.next() ? &range : nullptr) + {} + + void increment() { + assert(_range); + if (!_range->next()) { + _range = nullptr; + } + } + + bool equal(iterator that) const { + return (_range == that._range); + } + + const ValueType dereference() const { + assert(_range); + return _range->get_value(); + } + + public: + iterator() : _range{} {} + + }; + + iterator begin() const { return iterator{self()}; } + iterator end() const { return iterator{}; } +}; + +struct clustering_block { + constexpr static uint32_t max_block_size = 32; + uint64_t header = 0; + struct described_value { + bytes_view value; + std::reference_wrapper type; + }; + boost::container::static_vector values; +}; + +class clustering_blocks_input_range + : public input_range_base { +private: + const schema& _schema; + const clustering_key_prefix& _prefix; + mutable clustering_block _current_block; + mutable uint32_t _offset = 0; + +public: + clustering_blocks_input_range(const schema& s, const clustering_key_prefix& prefix) + : _schema(s), _prefix(prefix) {} + + bool next() const { + if (_offset == _schema.clustering_key_size()) { + // No more values to encode + return false; + } + + // Each block contains up to max_block_size values + auto limit = std::min(_schema.clustering_key_size(), _offset + clustering_block::max_block_size); + + _current_block = {}; + assert (_offset % clustering_block::max_block_size == 0); + while (_offset < limit) { + auto shift = _offset & clustering_block::max_block_size; + if (_offset < _prefix.size(_schema)) { + bytes_view value = _prefix.get_component(_schema, _offset); + if (value.empty()) { + _current_block.header |= (uint64_t(1) << (shift * 2)); + } else { + _current_block.values.push_back({value, *_prefix.get_compound_type(_schema)->types()[_offset]}); + } + } else { + // This (and all subsequent) values of the prefix are missing (null) + _current_block.header |= (uint64_t(1) << ((shift * 2) + 1)); + } + ++_offset; + } + return true; + } + + clustering_block get_value() const { return _current_block; }; + + explicit operator bool() const { + return (_offset < _schema.clustering_key_size()); + } +}; + +// Writes cell value according to its data type traits +// NOTE: this function is defined in sstables/sstables.cc +void write_cell_value(file_writer& out, const abstract_type& type, bytes_view value); + +static void write(file_writer& out, const clustering_block& block) { + write_vint(out, block.header); + for (const auto& [value, type]: block.values) { + write_cell_value(out, type, value); + } +} + +void write_clustering_prefix(file_writer& out, const schema& s, const clustering_key_prefix& prefix) { + clustering_blocks_input_range range{s, prefix}; + for (const auto block: range) { + write(out, block); + } +} + +// This range generates a sequence of values that represent information +// about missing columns for SSTables 3.0 format. +class missing_columns_input_range + : public input_range_base { +private: + const schema& _schema; + const row& _row; + mutable uint64_t _current_value = 0; + mutable column_id _current_id = 0; + mutable bool _large_mode_produced_size = false; + + enum class encoding_mode { + small, + large_encode_present, + large_encode_missing, + } _mode; + +public: + missing_columns_input_range(const schema& s, const row& row) + : _schema(s) + , _row(row) { + + auto row_size = _row.size(); + auto total_size = _schema.regular_columns_count(); + + _current_id = row_size < total_size ? 0 : total_size; + _mode = (total_size < 64) ? encoding_mode::small : + (row_size < total_size / 2) ? encoding_mode::large_encode_present : + encoding_mode::large_encode_missing; + } + + bool next() const { + auto total_size = _schema.regular_columns_count(); + if (_current_id == total_size) { + // No more values to encode + return false; + } + + if (_mode == encoding_mode::small) { + // Set bit for every missing column + for (column_id id = 0; id < total_size; ++id) { + auto cell = _row.find_cell(id); + if (!cell) { + _current_value |= (uint64_t(1) << id); + } + } + _current_id = total_size; + return true; + } else { + // For either of large modes, output the difference between total size and row size first + if (!_large_mode_produced_size) { + _current_value = total_size - _row.size(); + _large_mode_produced_size = true; + return true; + } + + if (_mode == encoding_mode::large_encode_present) { + while (_current_id < total_size) { + auto cell = _row.find_cell(_current_id); + if (cell) { + _current_value = _current_id; + ++_current_id; + return true; + } + ++_current_id; + } + } else { + assert(_mode == encoding_mode::large_encode_missing); + while (_current_id < total_size) { + auto cell = _row.find_cell(_current_id); + if (!cell) { + _current_value = _current_id; + ++_current_id; + return true; + } + ++_current_id; + } + } + } + + return false; + } + + uint64_t get_value() const { return _current_value; } + + explicit operator bool() const + { + return (_current_id < _schema.regular_columns_count()); + } +}; + +void write_missing_columns(file_writer& out, const schema& s, const row& row) { + for (const auto value: missing_columns_input_range{s, row}) { + write_vint(out, value); + } +} + +template +void write_unsigned_delta_vint(file_writer& out, T value, T base) { + using unsigned_type = std::make_unsigned_t; + unsigned_type delta = value - base; + write_vint(out, delta); +} + +void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats) { + write_unsigned_delta_vint(out, timestamp, enc_stats.min_timestamp); +} + +void write_delta_ttl(file_writer& out, uint32_t ttl, const encoding_stats& enc_stats) { + write_unsigned_delta_vint(out, ttl, enc_stats.min_ttl); +} + +void write_delta_local_deletion_time(file_writer& out, uint32_t local_deletion_time, const encoding_stats& enc_stats) { + write_unsigned_delta_vint(out, local_deletion_time, enc_stats.min_local_deletion_time); +} + +void write_delta_deletion_time(file_writer& out, deletion_time dt, const encoding_stats& enc_stats) { + write_delta_timestamp(out, dt.marked_for_delete_at, enc_stats); + write_delta_local_deletion_time(out, dt.local_deletion_time, enc_stats); +} + +}; // namespace sstables + diff --git a/sstables/m_format_write_helpers.hh b/sstables/m_format_write_helpers.hh new file mode 100644 index 0000000000..ac74350706 --- /dev/null +++ b/sstables/m_format_write_helpers.hh @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "bytes.hh" +#include "types.hh" +#include "timestamp.hh" + +class schema; +class row; +class clustering_key_prefix; +class encoding_stats; + +namespace sstables { + +class file_writer; + +// Utilities for writing integral values in variable-length format +// See vint-serialization.hh for more details +void write_unsigned_vint(file_writer& out, uint64_t value); +void write_signed_vint(file_writer& out, int64_t value); + +template +typename std::enable_if_t> +write_vint(file_writer& out, T t) = delete; + +template +inline void write_vint(file_writer& out, T value) { + static_assert(std::is_integral_v, "Non-integral values can't be written using write_vint"); + return std::is_unsigned_v ? write_unsigned_vint(out, value) : write_signed_vint(out, value); +} + + +// Writes clustering prefix, full or not, encoded in SSTables 3.0 format +void write_clustering_prefix(file_writer& out, const schema& s, const clustering_key_prefix& prefix); + +// Writes encoded information about missing columns in the given row +void write_missing_columns(file_writer& out, const schema& s, const row& row); + +// Helper functions for writing delta-encoded time-related values +void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats); + +void write_delta_ttl(file_writer& out, uint32_t ttl, const encoding_stats& enc_stats); + +void write_delta_local_deletion_time(file_writer& out, uint32_t local_deletion_time, const encoding_stats& enc_stats); + +void write_delta_deletion_time(file_writer& out, deletion_time dt, const encoding_stats& enc_stats); + +}; // namespace sstables diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 77eb7791ce..e388d92349 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -35,6 +35,7 @@ #include #include "types.hh" +#include "m_format_write_helpers.hh" #include "sstables.hh" #include "progress_monitor.hh" #include "compress.hh" @@ -64,6 +65,7 @@ #include "service/storage_service.hh" #include "db/extensions.hh" #include "unimplemented.hh" +#include "vint-serialization.hh" thread_local disk_error_signal_type sstable_read_error; thread_local disk_error_signal_type sstable_write_error; @@ -1708,6 +1710,17 @@ void sstable::maybe_flush_pi_block(file_writer& out, } } +void write_cell_value(file_writer& out, const abstract_type& type, bytes_view value) { + if (!value.empty()) { + if (type.is_fixed_length()) { + write(sstable_version_types::mc, out, value); + } else { + write_vint(out, value.size()); + write(sstable_version_types::mc, out, value); + } + } +} + static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type timestamp) { c_stats.update_min_timestamp(timestamp); c_stats.update_max_timestamp(timestamp); @@ -2159,11 +2172,11 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) { auto p_key = disk_string_view(); p_key.value = bytes_view(*_partition_key); - // Write index file entry from partition key into index file. + // Write index file entry for partition key into index file. // Write an index entry minus the "promoted index" (sample of columns) // part. We can only write that after processing the entire partition // and collecting the sample of columns. - write_index_header(_sst._version, _index, p_key, _out.offset()); + write_index_header(_sst.get_version(), _index, p_key, _out.offset()); _sst._pi_write.data = {}; _sst._pi_write.numblocks = 0; _sst._pi_write.deltime.local_deletion_time = std::numeric_limits::max(); @@ -2173,7 +2186,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) { _sst._pi_write.schemap = &_schema; // sadly we need this // Write partition key into data file. - write(_sst._version, _out, p_key); + write(_sst.get_version(), _out, p_key); _tombstone_written = false; } @@ -2194,7 +2207,7 @@ void components_writer::consume(tombstone t) { d.local_deletion_time = std::numeric_limits::max(); d.marked_for_delete_at = std::numeric_limits::min(); } - write(_sst._version, _out, d); + write(_sst.get_version(), _out, d); _tombstone_written = true; // TODO: need to verify we don't do this twice? _sst._pi_write.deltime = d; @@ -2263,13 +2276,13 @@ stop_iteration components_writer::consume_end_of_partition() { _out.offset() - _sst._pi_write.block_start_offset); _sst._pi_write.numblocks++; } - write_index_promoted(_sst._version, _index, _sst._pi_write.data, _sst._pi_write.deltime, + write_index_promoted(_sst.get_version(), _index, _sst._pi_write.data, _sst._pi_write.deltime, _sst._pi_write.numblocks); _sst._pi_write.data = {}; _sst._pi_write.block_first_colname = {}; int16_t end_of_row = 0; - write(_sst._version, _out, end_of_row); + write(_sst.get_version(), _out, end_of_row); // compute size of the current row. _sst._c_stats.row_size = _out.offset() - _sst._c_stats.start_offset; @@ -2300,7 +2313,7 @@ void components_writer::consume_end_of_stream() { } _sst.set_first_and_last_keys(); - seal_statistics(_sst._version, _sst._components->statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(), + seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(), _sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key()); } @@ -2350,7 +2363,50 @@ sstable::write_scylla_metadata(const io_priority_class& pc, shard_id shard, ssta write_simple(*_components->scylla_metadata, pc); } -void sstable_writer::prepare_file_writer() +struct sstable_writer::writer_impl { + virtual void consume_new_partition(const dht::decorated_key& dk) = 0; + virtual void consume(tombstone t) = 0; + virtual stop_iteration consume(static_row&& sr) = 0; + virtual stop_iteration consume(clustering_row&& cr) = 0; + virtual stop_iteration consume(range_tombstone&& rt) = 0; + virtual stop_iteration consume_end_of_partition() = 0; + virtual void consume_end_of_stream() = 0; + virtual ~writer_impl() {} +}; + +class sstable_writer_k_l : public sstable_writer::writer_impl { + sstable& _sst; + const schema& _schema; + const io_priority_class& _pc; + bool _backup; + bool _leave_unsealed; + bool _compression_enabled; + std::unique_ptr _writer; + stdx::optional _components_writer; + shard_id _shard; // Specifies which shard new sstable will belong to. + write_monitor* _monitor; + bool _correctly_serialize_non_compound_range_tombstones; +private: + void prepare_file_writer(); + void finish_file_writer(); +public: + sstable_writer_k_l(sstable& sst, const schema& s, uint64_t estimated_partitions, + const sstable_writer_config&, const io_priority_class& pc, shard_id shard = engine().cpu_id()); + ~sstable_writer_k_l(); + sstable_writer_k_l(sstable_writer_k_l&& o) : _sst(o._sst), _schema(o._schema), _pc(o._pc), _backup(o._backup), + _leave_unsealed(o._leave_unsealed), _compression_enabled(o._compression_enabled), _writer(std::move(o._writer)), + _components_writer(std::move(o._components_writer)), _shard(o._shard), _monitor(o._monitor), + _correctly_serialize_non_compound_range_tombstones(o._correctly_serialize_non_compound_range_tombstones) { } + void consume_new_partition(const dht::decorated_key& dk) override { return _components_writer->consume_new_partition(dk); } + void consume(tombstone t) override { _components_writer->consume(t); } + stop_iteration consume(static_row&& sr) override { return _components_writer->consume(std::move(sr)); } + stop_iteration consume(clustering_row&& cr) override { return _components_writer->consume(std::move(cr)); } + stop_iteration consume(range_tombstone&& rt) override { return _components_writer->consume(std::move(rt)); } + stop_iteration consume_end_of_partition() override { return _components_writer->consume_end_of_partition(); } + void consume_end_of_stream() override; +}; + +void sstable_writer_k_l::prepare_file_writer() { file_output_stream_options options; options.io_priority_class = _pc; @@ -2364,21 +2420,21 @@ void sstable_writer::prepare_file_writer() } } -void sstable_writer::finish_file_writer() +void sstable_writer_k_l::finish_file_writer() { auto writer = std::move(_writer); writer->close().get(); if (!_compression_enabled) { auto chksum_wr = static_cast(writer.get()); - write_digest(_sst._version, _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum()); - write_crc(_sst._version, _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum()); + write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum()); + write_crc(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum()); } else { - write_digest(_sst._version, _sst._write_error_handler, _sst.filename(component_type::Digest), _sst._components->compression.full_checksum()); + write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), _sst._components->compression.full_checksum()); } } -sstable_writer::~sstable_writer() { +sstable_writer_k_l::~sstable_writer_k_l() { if (_writer) { try { _writer->close().get(); @@ -2388,7 +2444,7 @@ sstable_writer::~sstable_writer() { } } -sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, +sstable_writer_k_l::sstable_writer_k_l(sstable& sst, const schema& s, uint64_t estimated_partitions, const sstable_writer_config& cfg, const io_priority_class& pc, shard_id shard) : _sst(sst) , _schema(s) @@ -2414,7 +2470,7 @@ static sstable_enabled_features all_features() { return sstable_enabled_features{(1 << sstable_feature::End) - 1}; } -void sstable_writer::consume_end_of_stream() +void sstable_writer_k_l::consume_end_of_stream() { _components_writer->consume_end_of_stream(); _components_writer = stdx::nullopt; @@ -2439,6 +2495,665 @@ void sstable_writer::consume_end_of_stream() _monitor->on_flush_completed(); } +enum class cell_flags : uint8_t { + none = 0x00, + is_deleted_mask = 0x01, // Whether the cell is a tombstone or not. + is_expiring_mask = 0x02, // Whether the cell is expiring. + has_empty_value_mask = 0x04, // Whether the cell has an empty value. This will be the case for a tombstone in particular. + use_row_timestamp_mask = 0x08, // Whether the cell has the same timestamp as the row this is a cell of. + use_row_ttl_mask = 0x10, // Whether the cell has the same TTL as the row this is a cell of. +}; + +inline cell_flags operator& (cell_flags lhs, cell_flags rhs) { + return cell_flags(static_cast(lhs) & static_cast(rhs)); +} + +inline cell_flags& operator |= (cell_flags& lhs, cell_flags rhs) { + lhs = cell_flags(static_cast(lhs) | static_cast(rhs)); + return lhs; +} + +enum class row_flags : uint8_t { + none = 0x00, + // Signal the end of the partition. Nothing follows a field with that flag. + end_of_partition = 0x01, + // Whether the encoded unfiltered is a marker or a row. All following flags apply only to rows. + is_marker = 0x02, + // Whether the encoded row has a timestamp (i.e. its liveness_info is not empty). + has_timestamp = 0x04, + // Whether the encoded row has some expiration info (i.e. if its liveness_info contains TTL and local_deletion). + has_ttl = 0x08, + // Whether the encoded row has some deletion info. + has_deletion = 0x10, + // Whether the encoded row has all of the columns from the header present. + has_all_columns = 0x20, + // Whether the encoded row has some complex deletion for at least one of its complex columns. + has_complex_deletion = 0x40, + // If present, another byte is read containing the "extended flags" below. + extension_flag = 0x80 +}; + +inline row_flags operator& (row_flags lhs, row_flags rhs) { + return row_flags(static_cast(lhs) & static_cast(rhs)); +} + +inline row_flags& operator |= (row_flags& lhs, row_flags rhs) { + lhs = row_flags(static_cast(lhs) | static_cast(rhs)); + return lhs; +} + +enum class row_extended_flags : uint8_t { + none = 0x00, + // Whether the encoded row is a static. If there is no extended flag, the row is assumed not static. + is_static = 0x01, + // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion) + // the deletion is assumed not shadowable. + // This flag is deprecated in Origin - see CASSANDRA-11500. + has_shadowable_deletion = 0x02, +}; + +// Used for writing SSTables in 'mc' format. +class sstable_writer_m : public sstable_writer::writer_impl { +private: + sstable& _sst; + const schema& _schema; + const io_priority_class& _pc; + sstable_writer_config _cfg; + encoding_stats _enc_stats; + shard_id _shard; // Specifies which shard the new SStable will belong to. + std::unique_ptr _data_writer; + std::optional _index_writer; + bool _tombstone_written = false; + bool _row_deletion_written = false; + uint64_t _current_partition_offset = 0; + uint64_t _prev_row_start = 0; + std::optional _partition_key; + stdx::optional _first_key, _last_key; + index_sampling_state _index_sampling_state; + struct pi_block { + clustering_key_prefix first; + clustering_key_prefix last; + uint64_t offset; + uint64_t width; + }; + // _pi_write_m is used temporarily for building the promoted + // index (column sample) of one partition when writing a new sstable. + struct { + // Unfortunately we cannot output the promoted index directly to the + // index file because it needs to be prepended by its size. + seastar::circular_buffer promoted_index; + tombstone tomb; + uint64_t block_start_offset; + uint64_t block_next_start_offset; + std::optional first_clustering; + std::optional last_clustering; + size_t desired_block_size; + } _pi_write_m; + + void init_file_writers(); + void close_data_writer(); + void ensure_tombstone_is_written() { + if (!_tombstone_written) { + consume(tombstone()); + } + } + + void write_delta_timestamp(file_writer& writer, api::timestamp_type timestamp) { + sstables::write_delta_timestamp(writer, timestamp, _enc_stats); + } + void write_delta_ttl(file_writer& writer, uint32_t ttl) { + sstables::write_delta_ttl(writer, ttl, _enc_stats); + } + void write_delta_local_deletion_time(file_writer& writer, uint32_t ldt) { + sstables::write_delta_local_deletion_time(writer, ldt, _enc_stats); + } + void write_delta_deletion_time(file_writer& writer, deletion_time dt) { + sstables::write_delta_deletion_time(writer, dt, _enc_stats); + } + + struct row_time_properties { + std::optional timestamp; + std::optional ttl; + std::optional local_deletion_time; + }; + + // Writes single atomic cell + void write_cell(file_writer& writer, atomic_cell_view cell, const column_definition& cdef, + const row_time_properties& properties, bytes_view cell_path = {}); + + // Writes information about row liveness (formerly 'row marker') + void write_liveness_info(file_writer& writer, const row_marker& marker); + + // Writes a CQL collection (list, set or map) + void write_collection(file_writer& writer, const column_definition& cdef, collection_mutation_view collection, + const row_time_properties& properties, bool has_complex_deletion); + + void write_cells(file_writer& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion = false); + void write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion); + void write_static_row(const row& static_row); + void write_clustered_row(const clustering_row& clustered_row, uint64_t prev_row_size); + std::vector write_promoted_index(file_writer& writer); +public: + + sstable_writer_m(sstable& sst, const schema& s, uint64_t estimated_partitions, + const sstable_writer_config& cfg, encoding_stats enc_stats, + const io_priority_class& pc, shard_id shard = engine().cpu_id()) + : _sst(sst) + , _schema(s) + , _pc(pc) + , _cfg(cfg) + , _enc_stats(enc_stats) + , _shard(shard) + { + _index_sampling_state.summary_byte_cost = summary_byte_cost(); + _sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance()); + _sst.write_toc(_pc); + _sst.create_data().get(); + if (!_sst.has_component(component_type::CRC)) { + throw std::runtime_error("Compression is not yet implemented for SSTables 3.0 yet"); + } + init_file_writers(); + _sst._shards = { shard }; + + _cfg.monitor->on_write_started(_data_writer->offset_tracker()); + _sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance()); + _pi_write_m.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024); + _sst._correctly_serialize_non_compound_range_tombstones = _cfg.correctly_serialize_non_compound_range_tombstones; + _index_sampling_state.summary_byte_cost = summary_byte_cost(); + prepare_summary(_sst._components->summary, estimated_partitions, _schema.min_index_interval()); + + } + + ~sstable_writer_m(); + sstable_writer_m(sstable_writer_m&& o) = default; + void consume_new_partition(const dht::decorated_key& dk) override; + void consume(tombstone t) override; + stop_iteration consume(static_row&& sr) override; + stop_iteration consume(clustering_row&& cr) override; + stop_iteration consume(range_tombstone&& rt) override { + throw std::runtime_error("consume(range_tombstone) is not yet implemented for SSTables v3"); + } + stop_iteration consume_end_of_partition(); + void consume_end_of_stream() override; +}; + +sstable_writer_m::~sstable_writer_m() { + auto close_writer = [](auto& writer) { + if (writer) { + try { + writer->close().get(); + } catch (...) { + sstlog.error("sstable_writer_m failed to close file: {}", std::current_exception()); + } + } + }; + close_writer(_index_writer); + close_writer(_data_writer); +} + +void sstable_writer_m::init_file_writers() { + file_output_stream_options options; + options.io_priority_class = _pc; + options.buffer_size = _sst.sstable_buffer_size; + options.write_behind = 10; + + _data_writer = std::make_unique(std::move(_sst._data_file), options, true); + _index_writer.emplace(std::move(_sst._index_file), options); +} + +void sstable_writer_m::close_data_writer() { + auto writer = std::move(_data_writer); + writer->close().get(); + auto chksum_wr = static_cast(writer.get()); + write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum()); + write_crc(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum()); +} + +void sstable_writer_m::consume_new_partition(const dht::decorated_key& dk) { + _current_partition_offset = _data_writer->offset(); + _prev_row_start = 0; + + _partition_key = key::from_partition_key(_schema, dk.key()); + _sst._components->filter->add(bytes_view(*_partition_key)); + _sst._collector.add_key(bytes_view(*_partition_key)); + + auto p_key = disk_string_view(); + p_key.value = bytes_view(*_partition_key); + + // Write index file entry from partition key into index file. + // Write an index entry minus the "promoted index" (sample of columns) + // part. We can only write that after processing the entire partition + // and collecting the sample of columns. + write(_sst.get_version(), *_index_writer, p_key); + write_vint(*_index_writer, _data_writer->offset()); + + _pi_write_m.promoted_index = {}; + _pi_write_m.tomb = {}; + _pi_write_m.first_clustering.reset(); + _pi_write_m.last_clustering.reset(); + + write(_sst.get_version(), *_data_writer, p_key); + + _tombstone_written = false; +} + +deletion_time to_deletion_time(tombstone t) { + deletion_time dt; + if (t) { + dt.local_deletion_time = t.deletion_time.time_since_epoch().count(); + dt.marked_for_delete_at = t.timestamp; + } else { + // Default values for live, non-deleted rows. + dt.local_deletion_time = std::numeric_limits::max(); + dt.marked_for_delete_at = std::numeric_limits::min(); + } + return dt; +} + +void sstable_writer_m::consume(tombstone t) { + write(_sst.get_version(), *_data_writer, to_deletion_time(t)); + _pi_write_m.tomb = t; + _tombstone_written = true; +} + +void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, const column_definition& cdef, + const row_time_properties& properties, bytes_view cell_path) { + + bytes_view cell_value = cell.value(); + bool has_value = !cell_value.empty(); + bool is_deleted = cell.is_dead(_sst._now); + if (is_deleted) { + has_value = false; + } + bool use_row_timestamp = (properties.timestamp == cell.timestamp()); + bool is_row_expiring = properties.ttl.has_value(); + bool is_cell_expiring = cell.is_live_and_has_ttl() || properties.ttl; + bool is_expiring = is_row_expiring || is_cell_expiring; + bool use_row_ttl = is_row_expiring || (is_cell_expiring && + (properties.ttl == cell.ttl().count()) && + (properties.local_deletion_time == cell.deletion_time().time_since_epoch().count())); + + cell_flags flags = cell_flags::none; + if (!has_value) { + flags |= cell_flags::has_empty_value_mask; + } + if (is_deleted) { + flags |= cell_flags::is_deleted_mask; + } else if (is_expiring) { + flags |= cell_flags::is_expiring_mask; + } + if (use_row_timestamp) { + flags |= cell_flags::use_row_timestamp_mask; + } + if (use_row_ttl) { + flags |= cell_flags::use_row_ttl_mask; + } + write(_sst.get_version(), writer, flags); + + if (!use_row_timestamp) { + write_delta_timestamp(writer, cell.timestamp()); + } + + if (!use_row_ttl) { + if (is_deleted) { + write_delta_local_deletion_time(writer, cell.deletion_time().time_since_epoch().count()); + } else if (is_expiring) { + write_delta_local_deletion_time(writer, cell.expiry().time_since_epoch().count()); + write_delta_ttl(writer, cell.ttl().count()); + } + } + + if (!cell_path.empty()) { + write_vint(writer, cell_path.size()); + write(_sst.get_version(), writer, cell_path); + } + + if (has_value) { + write_cell_value(writer, *cdef.type, cell_value); + } +} + +void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker& marker) { + if (marker.is_missing()) { + return; + } + + uint64_t timestamp = marker.timestamp(); + if (marker.is_dead(_sst._now)) { + // the row has expired by the time of flush + // write deletion info instead of liveness info + deletion_time dt; + dt.local_deletion_time = marker.deletion_time().time_since_epoch().count(); + dt.marked_for_delete_at = timestamp; + write_delta_deletion_time(writer, dt); + _row_deletion_written = true; + } else { // marker.is_live() + write_delta_timestamp(writer, timestamp); + if (marker.is_expiring()) { + write_delta_ttl(writer, marker.ttl().count()); + write_delta_local_deletion_time(writer, marker.expiry().time_since_epoch().count()); + } + } +} + +void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef, + collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) { + auto t = static_pointer_cast(cdef.type); + auto mview = t->deserialize_mutation_form(collection); + if (has_complex_deletion) { + write_delta_deletion_time(writer, to_deletion_time(mview.tomb)); + } + + write_vint(writer, mview.cells.size()); + for (const auto& [cell_path, cell]: mview.cells) { + write_cell(writer, cell, cdef, properties, cell_path); + } +} + +void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body, + const row_time_properties& properties, bool has_complex_deletion) { + // Note that missing columns are written based on the whole set of regular columns as defined by schema. + // This differs from Origin where all updated columns are tracked and the set of filled columns of a row + // is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases + // but still valid. + write_missing_columns(writer, _schema, row_body); + row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) { + auto&& column_definition = _schema.column_at(kind, id); + if (!column_definition.is_atomic()) { + write_collection(writer, column_definition, c.as_collection_mutation(), properties, has_complex_deletion); + return; + } + atomic_cell_view cell = c.as_atomic_cell(); + write_cell(writer, cell, column_definition, properties); + }); +} + +void sstable_writer_m::write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion) { + _row_deletion_written = false; + // write_liveness_info may end up writing deletion info for the row if the row + // has expired by the time of writing. If this happens, _row_deletion_writen is set + write_liveness_info(writer, row.marker()); + if (row.tomb() && !_row_deletion_written) { + write_delta_deletion_time(writer, to_deletion_time(row.tomb().tomb())); + } + row_time_properties properties; + if (!row.marker().is_missing()) { + properties.timestamp = row.marker().timestamp(); + if (row.marker().is_expiring()) { + properties.ttl = row.marker().ttl().count(); + properties.local_deletion_time = row.marker().deletion_time().time_since_epoch().count(); + } + } + + return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion); +} + +template +uint64_t calculate_write_size(Func&& func) { + uint64_t written_size = 0; + { + auto counting_writer = file_writer(make_sizing_output_stream(written_size)); + func(counting_writer); + counting_writer.flush().get(); + counting_writer.close().get(); + } + return written_size; +} + +void sstable_writer_m::write_static_row(const row& static_row) { + assert(_schema.is_compound()); + + // Static row flag is stored in extended flags so extension_flag is always set for static rows + row_flags flags = row_flags::extension_flag; + if (static_row.size() == _schema.static_columns_count()) { + flags |= row_flags::has_all_columns; + } + + write(_sst.get_version(), *_data_writer, flags); + write(_sst.get_version(), *_data_writer, row_extended_flags::is_static); + + // Calculate the size of the row body + auto write_row = [this, &static_row] (file_writer& writer) { + write_cells(writer, column_kind::static_column, static_row, row_time_properties{}); + }; + + uint64_t row_body_size = calculate_write_size(write_row) + unsigned_vint::serialized_size(0); + write_vint(*_data_writer, row_body_size); + write_vint(*_data_writer, 0); // as the static row always comes first, the previous row size is always zero + + write_row(*_data_writer); +} + +stop_iteration sstable_writer_m::consume(static_row&& sr) { + ensure_tombstone_is_written(); + write_static_row(sr.cells()); + return stop_iteration::no; +} + +// Find if any collection in the row contains a collection-wide tombstone +static bool row_has_complex_deletion(const schema& s, const row& r) { + bool result = false; + r.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& c) { + auto&& cdef = s.column_at(column_kind::regular_column, id); + if (cdef.is_atomic()) { + return stop_iteration::no; + } + auto t = static_pointer_cast(cdef.type); + auto mview = t->deserialize_mutation_form(c.as_collection_mutation()); + if (mview.tomb) { + result = true; + } + return stop_iteration(static_cast(mview.tomb)); + }); + + return result; +} + +void sstable_writer_m::write_clustered_row(const clustering_row& clustered_row, uint64_t prev_row_size) { + row_flags flags = row_flags::none; + row_extended_flags ext_flags = row_extended_flags::none; + if (clustered_row.marker().is_live()) { + flags |= row_flags::has_timestamp; + if (clustered_row.marker().is_expiring()) { + flags |= row_flags::has_ttl; + } + } + + if ((!clustered_row.marker().is_missing() && clustered_row.marker().is_dead(_sst._now)) || clustered_row.tomb().tomb()) { + flags |= row_flags::has_deletion; + if (clustered_row.tomb().tomb() && clustered_row.tomb().is_shadowable()) { + ext_flags = row_extended_flags::has_shadowable_deletion; + } + } + + if (clustered_row.cells().size() == _schema.regular_columns_count()) { + flags |= row_flags::has_all_columns; + } + bool has_complex_deletion = row_has_complex_deletion(_schema, clustered_row.cells()); + if (has_complex_deletion) { + flags |= row_flags::has_complex_deletion; + } + write(_sst.get_version(), *_data_writer, flags); + if (ext_flags != row_extended_flags::none) { + write(_sst.get_version(), *_data_writer, ext_flags); + } + + write_clustering_prefix(*_data_writer, _schema, clustered_row.key()); + + + auto write_row = [this, &clustered_row, has_complex_deletion] (file_writer& writer) { + write_row_body(writer, clustered_row, has_complex_deletion); + }; + + uint64_t row_body_size = calculate_write_size(write_row) + unsigned_vint::serialized_size(prev_row_size); + + write_vint(*_data_writer, row_body_size); + write_vint(*_data_writer, prev_row_size); + + write_row(*_data_writer); +} + +stop_iteration sstable_writer_m::consume(clustering_row&& cr) { + ensure_tombstone_is_written(); + uint64_t pos = _data_writer->offset(); + if (!_pi_write_m.first_clustering) { + _pi_write_m.first_clustering = cr.key(); + _pi_write_m.block_start_offset = pos; + _pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size; + } + write_clustered_row(cr, pos - _prev_row_start); + + _pi_write_m.last_clustering = cr.key(); + + pos = _data_writer->offset(); + _prev_row_start = pos; + if (pos >= _pi_write_m.block_next_start_offset) { + _pi_write_m.promoted_index.push_back({ + *_pi_write_m.first_clustering, + *_pi_write_m.last_clustering, + _pi_write_m.block_start_offset - _current_partition_offset, + pos - _pi_write_m.block_start_offset}); + _pi_write_m.first_clustering.reset(); + _pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size; + } + return stop_iteration::no; +} + +// Write clustering prefix along with its bound kind and, if not full, its size +static void write_clustering_prefix(file_writer& writer, bound_kind kind, + const schema& s, const clustering_key_prefix& clustering) { + assert(kind != bound_kind::static_clustering); + write(sstable_version_types::mc, writer, kind); + if (kind != bound_kind::clustering) { + write(sstable_version_types::mc, writer, static_cast(clustering.size(s))); + } + write_clustering_prefix(writer, s, clustering); +} + +std::vector sstable_writer_m::write_promoted_index(file_writer& writer) { + static constexpr size_t width_base = 65536; + if (_pi_write_m.promoted_index.empty()) { + return {}; + } + write(_sst.get_version(), writer, to_deletion_time(_pi_write_m.tomb)); + write_vint(writer, _pi_write_m.promoted_index.size()); + std::vector offsets; + offsets.reserve(_pi_write_m.promoted_index.size()); + uint64_t start = writer.offset(); + for (const pi_block& block: _pi_write_m.promoted_index) { + offsets.push_back(writer.offset() - start); + write_clustering_prefix(writer, bound_kind::clustering, _schema, block.first); + write_clustering_prefix(writer, bound_kind::clustering, _schema, block.last); + write_vint(writer, block.offset); + write_signed_vint(writer, block.width - width_base); + // TODO: serialize end open marker here later, for now always write "false" + // to indicate there is no end open marker + write(_sst.get_version(), writer, (std::byte)0); + } + + return offsets; +} + +stop_iteration sstable_writer_m::consume_end_of_partition() { + if (!_pi_write_m.promoted_index.empty() && _pi_write_m.first_clustering) { + _pi_write_m.promoted_index.push_back({ + *_pi_write_m.first_clustering, + *_pi_write_m.last_clustering, + _pi_write_m.block_start_offset - _current_partition_offset, + _data_writer->offset() - _pi_write_m.block_start_offset}); + } + + auto write_pi = [this] (file_writer& writer) { + return write_promoted_index(writer); + }; + + uint64_t pi_size = calculate_write_size(write_pi); + write_vint(*_index_writer, pi_size); + auto offsets = write_pi(*_index_writer); + for (uint32_t offset: offsets) { + write(_sst.get_version(), *_index_writer, offset); + } + + write(_sst.get_version(), *_data_writer, row_flags::end_of_partition); + if (!_first_key) { + _first_key = *_partition_key; + } + _last_key = std::move(*_partition_key); + return stop_iteration::no; +} + +void sstable_writer_m::consume_end_of_stream() { + seal_summary(_sst._components->summary, std::move(_first_key), std::move(_last_key), _index_sampling_state); + + if (_sst.has_component(component_type::CompressionInfo)) { + _sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length()); + } + + _index_writer->close().get(); + _index_writer.reset(); + _sst.set_first_and_last_keys(); + seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector, + dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(), + _sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key()); + _cfg.monitor->on_data_write_completed(); + close_data_writer(); + _sst.write_summary(_pc); + _sst.write_filter(_pc); + _sst.write_statistics(_pc); + _sst.write_compression(_pc); + auto features = all_features(); + if (!_cfg.correctly_serialize_non_compound_range_tombstones) { + features.disable(sstable_feature::NonCompoundRangeTombstones); + } + _sst.write_scylla_metadata(_pc, _shard, std::move(features)); + _cfg.monitor->on_write_completed(); + if (!_cfg.leave_unsealed) { + _sst.seal_sstable(_cfg.backup).get(); + } + _cfg.monitor->on_flush_completed(); +} + +sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, + const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard) { + if (sst.get_version() == sstable_version_types::mc) { + _impl = std::make_unique(sst, s, estimated_partitions, cfg, enc_stats, pc, shard); + } else { + _impl = std::make_unique(sst, s, estimated_partitions, cfg, pc, shard); + } +} + +void sstable_writer::consume_new_partition(const dht::decorated_key& dk) { + return _impl->consume_new_partition(dk); +} + +void sstable_writer::consume(tombstone t) { + return _impl->consume(t); +} + +stop_iteration sstable_writer::consume(static_row&& sr) { + return _impl->consume(std::move(sr)); +} + +stop_iteration sstable_writer::consume(clustering_row&& cr) { + return _impl->consume(std::move(cr)); +} + +stop_iteration sstable_writer::consume(range_tombstone&& rt) { + return _impl->consume(std::move(rt)); +} + +stop_iteration sstable_writer::consume_end_of_partition() { + return _impl->consume_end_of_partition(); +} + +void sstable_writer::consume_end_of_stream() { + return _impl->consume_end_of_stream(); +} + +sstable_writer::sstable_writer(sstable_writer&& o) = default; +sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default; +sstable_writer::~sstable_writer() = default; + future<> sstable::seal_sstable(bool backup) { return seal_sstable().then([this, backup] { @@ -2452,9 +3167,10 @@ future<> sstable::seal_sstable(bool backup) }); } -sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, const sstable_writer_config& cfg, const io_priority_class& pc, shard_id shard) +sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, + const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard) { - return sstable_writer(*this, s, estimated_partitions, cfg, pc, shard); + return sstable_writer(*this, s, estimated_partitions, cfg, enc_stats, pc, shard); } future<> sstable::write_components( @@ -2467,8 +3183,8 @@ future<> sstable::write_components( if (cfg.replay_position) { _collector.set_replay_position(cfg.replay_position.value()); } - return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, &pc] () mutable { - auto wr = get_writer(*schema, estimated_partitions, cfg, pc); + return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable { + auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc); mr.consume_in_thread(std::move(wr)); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index f6d4d58920..bd060b9088 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -70,6 +70,8 @@ namespace sstables { extern logging::logger sstlog; class key; class sstable_writer; +class sstable_writer_k_l; +class sstable_writer_m; struct foreign_sstable_open_info; struct sstable_open_info; @@ -260,6 +262,7 @@ public: sstable_writer get_writer(const schema& s, uint64_t estimated_partitions, const sstable_writer_config&, + encoding_stats enc_stats, const io_priority_class& pc = default_priority_class(), shard_id shard = engine().cpu_id()); @@ -329,6 +332,10 @@ public: return _components->filter->memory_size(); } + version_types get_version() const { + return _version; + } + // Returns the total bytes of all components. uint64_t bytes_on_disk(); @@ -696,7 +703,8 @@ public: friend class test; friend class components_writer; - friend class sstable_writer; + friend class sstable_writer_k_l; + friend class sstable_writer_m; friend class index_reader; template friend data_consume_context @@ -810,34 +818,26 @@ public: }; class sstable_writer { - sstable& _sst; - const schema& _schema; - const io_priority_class& _pc; - bool _backup; - bool _leave_unsealed; - bool _compression_enabled; - std::unique_ptr _writer; - stdx::optional _components_writer; - shard_id _shard; // Specifies which shard new sstable will belong to. - write_monitor* _monitor; - bool _correctly_serialize_non_compound_range_tombstones; +public: + class writer_impl; private: - void prepare_file_writer(); - void finish_file_writer(); + std::unique_ptr _impl; public: sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, - const sstable_writer_config&, const io_priority_class& pc, shard_id shard = engine().cpu_id()); + const sstable_writer_config&, encoding_stats enc_stats, + const io_priority_class& pc, shard_id shard = engine().cpu_id()); + + sstable_writer(sstable_writer&& o); + sstable_writer& operator=(sstable_writer&& o); + ~sstable_writer(); - sstable_writer(sstable_writer&& o) : _sst(o._sst), _schema(o._schema), _pc(o._pc), _backup(o._backup), - _leave_unsealed(o._leave_unsealed), _compression_enabled(o._compression_enabled), _writer(std::move(o._writer)), - _components_writer(std::move(o._components_writer)), _shard(o._shard), _monitor(o._monitor), - _correctly_serialize_non_compound_range_tombstones(o._correctly_serialize_non_compound_range_tombstones) { } - void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); } - void consume(tombstone t) { _components_writer->consume(t); } - stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); } - stop_iteration consume(clustering_row&& cr) { return _components_writer->consume(std::move(cr)); } - stop_iteration consume(range_tombstone&& rt) { return _components_writer->consume(std::move(rt)); } - stop_iteration consume_end_of_partition() { return _components_writer->consume_end_of_partition(); } + + void consume_new_partition(const dht::decorated_key& dk); + void consume(tombstone t); + stop_iteration consume(static_row&& sr); + stop_iteration consume(clustering_row&& cr); + stop_iteration consume(range_tombstone&& rt); + stop_iteration consume_end_of_partition(); void consume_end_of_stream(); }; diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index da57eab885..7f04cdae1e 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -20,17 +20,23 @@ */ #include +#include +#include #include #include +#include #include "sstables/sstables.hh" #include "compress.hh" #include "schema_builder.hh" -#include "tests/test-utils.hh" #include "sstable_test.hh" #include "flat_mutation_reader_assertions.hh" +#include "memtable-sstable.hh" +#include "sstable_test.hh" +#include "tests/test_services.hh" +#include "tests/tmpdir.hh" using namespace sstables; @@ -217,3 +223,515 @@ SEASTAR_TEST_CASE(test_uncompressed_simple_read_index) { BOOST_REQUIRE_EQUAL(1, vec.size()); }); } + +static void compare_files(sstring filename1, sstring filename2) { + std::ifstream ifs1(filename1); + std::ifstream ifs2(filename2); + + std::istream_iterator b1(ifs1), e1; + std::istream_iterator b2(ifs2), e2; + BOOST_CHECK_EQUAL_COLLECTIONS(b1, e1, b2, e2); +} + +static void write_and_compare_sstables(schema_ptr s, lw_shared_ptr mt, sstring table_name) { + storage_service_for_tests ssft; + tmpdir tmp; + auto sst = sstables::test::make_test_sstable(4096, s, tmp.path, 1, sstables::sstable_version_types::mc, sstable::format_types::big); + write_memtable_to_sstable(*mt, sst).get(); + + for (auto file_type : {component_type::Data, component_type::Index}) { + auto orig_filename = + sstable::filename("tests/sstables/3.x/uncompressed/write_" + table_name, "ks", + table_name, sstables::sstable_version_types::mc, 1, big, file_type); + auto result_filename = + sstable::filename(tmp.path, "ks", table_name, sstables::sstable_version_types::mc, 1, big, file_type); + compare_files(orig_filename, result_filename); + } +} + +SEASTAR_TEST_CASE(test_write_static_row) { + return seastar::async([] { + sstring table_name = "static_row"; + // CREATE TABLE static_row (pk int, ck int, st1 int static, st2 text static, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", utf8_type}}, + // clustering key + {{"ck", int32_type}}, + // regular columns + {}, + // static columns + {{"st1", int32_type}, {"st2", utf8_type}}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - static row test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + api::timestamp_type ts = api::new_timestamp(); + + // INSERT INTO static_row (pk, st1, st2) values ('key1', 1135, 'hello'); + auto key = make_dkey(s, {to_bytes("key1")}); + mutation mut{s, key}; + mut.set_static_cell("st1", data_value{1135}, ts); + mut.set_static_cell("st2", data_value{"hello"}, ts); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_composite_clustering_key) { + return seastar::async([] { + sstring table_name = "composite_clustering_key"; + // CREATE TABLE composite_clustering_key (a int , b text, c int, d text, e int, f text, PRIMARY KEY (a, b, c, d)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"a", int32_type}}, + // clustering key + {{"b", utf8_type}, {"c", int32_type}, {"d", utf8_type}}, + // regular columns + {{"e", int32_type}, {"f", utf8_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - composite clustering key test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + api::timestamp_type ts = api::new_timestamp(); + + // INSERT INTO composite_clustering_key (a,b,c,d,e,f) values (1, 'hello', 2, 'dear', 3, 'world'); + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + clustering_key ckey = clustering_key::from_deeply_exploded(*s, { "hello", 2, "dear" }); + mut.partition().apply_insert(*s, ckey, ts); + mut.set_cell(ckey, "e", data_value{3}, ts); + mut.set_cell(ckey, "f", data_value{"world"}, ts); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_wide_partition) { + return seastar::async([] { + sstring table_name = "wide_partition"; + // CREATE TABLE wide_partition (pk text , ck text, rc text, PRIMARY KEY (pk, ck) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", utf8_type}}, + // clustering key + {{"ck", utf8_type}}, + // regular columns + {{"rc", utf8_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - wide partition test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + auto key = make_dkey(s, {to_bytes("key")}); + mutation mut{s, key}; + sstring ck_base(1024, 'a'); + sstring rc_base(1024, 'b'); + api::timestamp_type ts = api::new_timestamp(); + for (auto idx: boost::irange(0, 1024)) { + clustering_key ckey = clustering_key::from_deeply_exploded(*s, {format("{}{}", ck_base, idx)}); + mut.partition().apply_insert(*s, ckey, ts); + mut.set_cell(ckey, "rc", data_value{format("{}{}", rc_base, idx)}, ts); + mt->apply(std::move(mut)); + seastar::thread::yield(); + } + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_ttled_row) { + return seastar::async([] { + sstring table_name = "ttled_row"; + // CREATE TABLE ttled_row (pk int, ck int, rc int, PRIMARY KEY (pk)); + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {{"ck", int32_type}}, + // regular columns + {{"rc", int32_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - TTL-ed row test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // INSERT INTO ttled_row (pk, ck, rc) VALUES ( 1, 2, 3) USING TTL 1135; + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + clustering_key ckey = clustering_key::from_deeply_exploded(*s, { 2 }); + api::timestamp_type ts = api::new_timestamp(); + gc_clock::time_point tp = gc_clock::now(); + gc_clock::duration ttl{1135}; + mut.partition().apply_insert(*s, ckey, ts, ttl, tp + ttl); + mut.set_cell(ckey, "rc", data_value{3}, ts); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_ttled_column) { + return seastar::async([] { + sstring table_name = "ttled_column"; + // CREATE TABLE ttled_column (pk text, rc int, PRIMARY KEY (pk)); + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", utf8_type}}, + // clustering key + {}, + // regular columns + {{"rc", int32_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - TTL-ed column test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // UPDATE ttled_column USING TTL 1135 SET rc = 1 WHERE pk='key'; + auto key = make_dkey(s, {to_bytes("key")}); + mutation mut{s, key}; + api::timestamp_type ts = api::new_timestamp(); + gc_clock::duration ttl{1135}; + mut.set_clustered_cell(clustering_key::make_empty(), "rc", data_value{1}, ts, ttl); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_deleted_column) { + return seastar::async([] { + sstring table_name = "deleted_column"; + // CREATE TABLE deleted_cell (int pk, int rc, PRIMARY KEY (pk)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {}, + // regular columns + {{"rc", int32_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - deleted column test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // DELETE rc FROM deleted_column WHERE pk=1; + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + //mut.partition().apply_delete(*s, clustering_key::make_empty(), tombstone{api::new_timestamp(), gc_clock::now()}); + auto column_def = s->get_column_definition("rc"); + if (!column_def) { + throw std::runtime_error("no column definition found"); + } + mut.set_cell(clustering_key::make_empty(), *column_def, atomic_cell::make_dead(api::new_timestamp(), gc_clock::now())); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_deleted_row) { + return seastar::async([] { + sstring table_name = "deleted_row"; + // CREATE TABLE deleted_row (int pk, int ck, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {{"ck", int32_type}}, + // regular columns + {}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - deleted row test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // DELETE FROM deleted_row WHERE pk=1 and ck=2; + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + clustering_key ckey = clustering_key::from_deeply_exploded(*s, { 2 }); + mut.partition().apply_delete(*s, ckey, tombstone{api::new_timestamp(), gc_clock::now()}); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_collection_wide_update) { + return seastar::async([] { + sstring table_name = "collection_wide_update"; + auto set_of_ints_type = set_type_impl::get_instance(int32_type, true); + // CREATE TABLE collection_wide_update (pk int, col set, PRIMARY KEY (pk)) with compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {}, + // regular columns + {{"col", set_of_ints_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - collection wide update test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // INSERT INTO collection_wide_update (pk, col) VALUES (1, {2, 3}); + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + + api::timestamp_type ts = api::new_timestamp(); + gc_clock::time_point tp = gc_clock::now(); + mut.partition().apply_insert(*s, clustering_key::make_empty(), ts); + set_type_impl::mutation set_values { + {ts - 1, tp}, // tombstone + { + {int32_type->decompose(2), atomic_cell::make_live(ts, bytes_view{})}, + {int32_type->decompose(3), atomic_cell::make_live(ts, bytes_view{})}, + } + }; + + mut.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("col"), set_of_ints_type->serialize_mutation_form(set_values)); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_collection_incremental_update) { + return seastar::async([] { + sstring table_name = "collection_incremental_update"; + auto set_of_ints_type = set_type_impl::get_instance(int32_type, true); + // CREATE TABLE collection_incremental_update (pk int, col set, PRIMARY KEY (pk)) with compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {}, + // regular columns + {{"col", set_of_ints_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - collection incremental update test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + // UPDATE collection_incremental_update SET col = col + {2} WHERE pk = 1; + auto key = partition_key::from_deeply_exploded(*s, { 1 }); + mutation mut{s, key}; + + api::timestamp_type ts = api::new_timestamp(); + set_type_impl::mutation set_values { + {}, // tombstone + { + {int32_type->decompose(2), atomic_cell::make_live(ts, bytes_view{})}, + } + }; + + mut.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("col"), set_of_ints_type->serialize_mutation_form(set_values)); + mt->apply(std::move(mut)); + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_multiple_partitions) { + return seastar::async([] { + sstring table_name = "multiple_partitions"; + // CREATE TABLE multiple_partitions (pk int, rc1 int, rc2 int, rc3 int, PRIMARY KEY (pk)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {}, + // regular columns + {{"rc1", int32_type}, {"rc2", int32_type}, {"rc3", int32_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - multiple partitions test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + api::timestamp_type ts = api::new_timestamp(); + // INSERT INTO multiple_partitions (pk, rc1) VALUES (1, 10); + // INSERT INTO multiple_partitions (pk, rc2) VALUES (2, 20); + // INSERT INTO multiple_partitions (pk, rc3) VALUES (3, 30); + for (auto i : boost::irange(1, 4)) { + auto key = partition_key::from_deeply_exploded(*s, {i}); + mutation mut{s, key}; + + mut.set_cell(clustering_key::make_empty(), to_bytes(format("rc{}", i)), data_value{i * 10}, ts); + mt->apply(std::move(mut)); + ts += 10; + } + + write_and_compare_sstables(s, mt, table_name); + }); +} + +SEASTAR_TEST_CASE(test_write_multiple_rows) { + return seastar::async([] { + sstring table_name = "multiple_rows"; + // CREATE TABLE multiple_rows (pk int, ck int, rc1 int, rc2 int, rc3 int, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''}; + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {{"ck", int32_type}}, + // regular columns + {{"rc1", int32_type}, {"rc2", int32_type}, {"rc3", int32_type}}, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - multiple rows test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + auto key = partition_key::from_deeply_exploded(*s, {0}); + api::timestamp_type ts = api::new_timestamp(); + mutation mut{s, key}; + + // INSERT INTO multiple_rows (pk, ck, rc1) VALUES (0, 1, 10); + // INSERT INTO multiple_rows (pk, ck, rc2) VALUES (0, 2, 20); + // INSERT INTO multiple_rows (pk, ck, rc3) VALUES (0, 3, 30); + for (auto i : boost::irange(1, 4)) { + clustering_key ckey = clustering_key::from_deeply_exploded(*s, { i }); + mut.partition().apply_insert(*s, ckey, ts); + mut.set_cell(ckey, to_bytes(format("rc{}", i)), data_value{i * 10}, ts); + ts += 10; + } + + mt->apply(std::move(mut)); + write_and_compare_sstables(s, mt, table_name); + }); +} + +// Information on missing columns is serialized differently when the number of columns is > 64. +// This test checks that this information is encoded correctly. +SEASTAR_TEST_CASE(test_write_missing_columns_large_set) { + return seastar::async([] { + sstring table_name = "missing_columns_large_set"; + // CREATE TABLE missing_columns_large_set (pk int, ck int, rc1 int, ..., rc64, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''}; + std::vector regular_columns; + regular_columns.reserve(64); + for (auto idx: boost::irange(1, 65)) { + regular_columns.push_back({to_bytes(format("rc{}", idx)), int32_type}); + } + schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name, + // partition key + {{"pk", int32_type}}, + // clustering key + {{"ck", int32_type}}, + regular_columns, + // static columns + {}, + // regular column name type + utf8_type, + // comment + "SSTable 3.0 format write path - missing columns large set test" + ))); + builder.set_compressor_params(compression_parameters()); + schema_ptr s = builder.build(schema_builder::compact_storage::no); + + lw_shared_ptr mt = make_lw_shared(s); + + auto key = partition_key::from_deeply_exploded(*s, {0}); + api::timestamp_type ts = api::new_timestamp(); + mutation mut{s, key}; + + // INSERT INTO missing_columns_large_set (pk, ck, rc1, ..., rc62) VALUES (0, 0, 1, ..., 62); + // For missing columns, the missing ones will be written as majority are present. + { + clustering_key ckey = clustering_key::from_deeply_exploded(*s, {0}); + mut.partition().apply_insert(*s, ckey, ts); + for (auto idx: boost::irange(1, 63)) { + mut.set_cell(ckey, to_bytes(format("rc{}", idx)), data_value{idx}, ts); + } + mt->apply(std::move(mut)); + } + ts += 10; + // INSERT INTO missing_columns_large_set (pk, ck, rc63, rc64) VALUES (0, 1, 63, 64); + // For missing columns, the present ones will be written as majority are missing. + { + clustering_key ckey = clustering_key::from_deeply_exploded(*s, {1}); + mut.partition().apply_insert(*s, ckey, ts); + mut.set_cell(ckey, to_bytes(format("rc63", 63)), data_value{63}, ts); + mut.set_cell(ckey, to_bytes(format("rc64", 63)), data_value{64}, ts); + mt->apply(std::move(mut)); + } + + write_and_compare_sstables(s, mt, table_name); + }); +} + diff --git a/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Data.db new file mode 100644 index 0000000000..54a1799105 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_collection_incremental_update/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Data.db new file mode 100644 index 0000000000..48bd694aa0 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_collection_wide_update/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Data.db new file mode 100644 index 0000000000..a701a40436 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_composite_clustering_key/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Data.db new file mode 100644 index 0000000000..8dbb23268e Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_deleted_column/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Data.db new file mode 100644 index 0000000000..c06cb04f05 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_deleted_row/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Data.db new file mode 100644 index 0000000000..55c43161de Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Index.db new file mode 100644 index 0000000000..f9376371f5 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_missing_columns_large_set/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Data.db new file mode 100644 index 0000000000..3475106c94 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Index.db new file mode 100644 index 0000000000..91ff1c7d23 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_multiple_partitions/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Data.db new file mode 100644 index 0000000000..6077744d59 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Index.db new file mode 100644 index 0000000000..f9376371f5 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_multiple_rows/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Data.db new file mode 100644 index 0000000000..cd36e08925 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Index.db new file mode 100644 index 0000000000..c6b4844b7d Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_static_row/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Data.db new file mode 100644 index 0000000000..d3c92a1286 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Index.db new file mode 100644 index 0000000000..f8d1e6283d Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_ttled_column/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Data.db new file mode 100644 index 0000000000..ecf77af2b2 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_ttled_row/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Data.db new file mode 100644 index 0000000000..7bc80095e4 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Index.db new file mode 100644 index 0000000000..3598902428 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/write_wide_partition/mc-1-big-Index.db differ diff --git a/types.cc b/types.cc index 32a1f1f0f1..79f8a5cdfd 100644 --- a/types.cc +++ b/types.cc @@ -239,6 +239,9 @@ struct integer_type_impl : simple_type_impl { virtual bytes from_json_object(const Json::Value& value, cql_serialization_format sf) const override { return this->decompose(T(json::to_int64_t(value))); } + virtual bool is_fixed_length() const override { + return true; + } }; struct byte_type_impl : integer_type_impl { @@ -350,6 +353,9 @@ struct string_type_impl : public concrete_type { virtual bytes from_json_object(const Json::Value& value, cql_serialization_format sf) const override { return from_string(value.asString()); } + virtual bool is_fixed_length() const override { + return false; + } }; struct ascii_type_impl final : public string_type_impl { @@ -436,6 +442,9 @@ struct bytes_type_impl final : public concrete_type { // bytesType validate everything, so it is compatible with the former. return this == &other || &other == ascii_type.get() || &other == utf8_type.get(); } + virtual bool is_fixed_length() const override { + return false; + } }; struct boolean_type_impl : public simple_type_impl { @@ -512,6 +521,9 @@ struct boolean_type_impl : public simple_type_impl { virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::boolean; } + virtual bool is_fixed_length() const override { + return true; + } }; class date_type_impl : public concrete_type { @@ -591,6 +603,9 @@ public: } return false; } + virtual bool is_fixed_length() const override { + return true; + } }; logging::logger date_type_impl::_logger(date_type_name); @@ -689,6 +704,9 @@ struct timeuuid_type_impl : public concrete_type { virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::timeuuid; } + virtual bool is_fixed_length() const override { + return true; + } private: static int compare_bytes(bytes_view o1, bytes_view o2) { auto compare_pos = [&] (unsigned pos, int mask, int ifequal) { @@ -880,6 +898,9 @@ public: } return false; } + virtual bool is_fixed_length() const override { + return true; + } }; logging::logger timestamp_type_impl::_logger(timestamp_type_name); @@ -975,6 +996,9 @@ struct simple_date_type_impl : public simple_type_impl { virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::date; } + virtual bool is_fixed_length() const override { + return true; + } }; struct time_type_impl : public simple_type_impl { @@ -1076,6 +1100,9 @@ struct time_type_impl : public simple_type_impl { virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::time; } + virtual bool is_fixed_length() const override { + return true; + } }; struct uuid_type_impl : concrete_type { @@ -1167,6 +1194,9 @@ struct uuid_type_impl : concrete_type { virtual bool is_value_compatible_with_internal(const abstract_type& other) const override { return &other == this || &other == timeuuid_type.get(); } + virtual bool is_fixed_length() const override { + return true; + } }; using inet_address = seastar::net::inet_address; @@ -1272,6 +1302,9 @@ struct inet_addr_type_impl : concrete_type { virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::inet; } + virtual bool is_fixed_length() const override { + return true; + } }; // Integer of same length of a given type. This is useful because our @@ -1422,6 +1455,9 @@ struct floating_type_impl : public simple_type_impl { throw marshal_exception("Only float/double types can be parsed from JSON floating point object"); } } + virtual bool is_fixed_length() const override { + return true; + } }; struct double_type_impl : floating_type_impl { @@ -1559,6 +1595,9 @@ public: virtual bool is_value_compatible_with_internal(const abstract_type& other) const override { return &other == this || int32_type->is_value_compatible_with(other) || long_type->is_value_compatible_with(other); } + virtual bool is_fixed_length() const override { + return false; + } friend class decimal_type_impl; }; @@ -1666,6 +1705,9 @@ public: virtual ::shared_ptr as_cql3_type() const override { return cql3::cql3_type::decimal; } + virtual bool is_fixed_length() const override { + return false; + } }; class counter_type_impl : public abstract_type { @@ -1741,6 +1783,9 @@ public: virtual std::experimental::optional update_user_type(const shared_ptr updated) const { return std::experimental::nullopt; } + virtual bool is_fixed_length() const override { + fail(unimplemented::cause::COUNTERS); + } }; // TODO(jhaberku): Move this to Seastar. @@ -1886,6 +1931,9 @@ public: virtual bool references_duration() const override { return true; } + virtual bool is_fixed_length() const override { + return false; + } private: using counter_type = cql_duration::common_counter_type; @@ -1975,6 +2023,9 @@ struct empty_type_impl : abstract_type { // Can't happen abort(); } + virtual bool is_fixed_length() const override { + return true; + } }; diff --git a/types.hh b/types.hh index 5ed2e94603..d93d3164db 100644 --- a/types.hh +++ b/types.hh @@ -499,6 +499,7 @@ public: virtual bool references_duration() const { return false; } + virtual bool is_fixed_length() const = 0; protected: virtual bool equals(const abstract_type& other) const { return this == &other; @@ -845,6 +846,7 @@ public: return deserialize(v, sf); } bytes_opt reserialize(cql_serialization_format from, cql_serialization_format to, bytes_view_opt v) const; + virtual bool is_fixed_length() const override { return false; } }; using collection_type = shared_ptr; @@ -1009,6 +1011,10 @@ public: static shared_ptr get_instance(data_type type) { return intern::get_instance(std::move(type)); } + + virtual bool is_fixed_length() const override { + return _underlying_type->is_fixed_length(); + } protected: virtual size_t native_value_size() const override; virtual size_t native_value_alignment() const override; @@ -1640,6 +1646,7 @@ public: virtual bool references_user_type(const sstring& keyspace, const bytes& name) const override; virtual std::experimental::optional update_user_type(const shared_ptr updated) const override; virtual bool references_duration() const override; + virtual bool is_fixed_length() const override { return false; } private: bool check_compatibility(const abstract_type& previous, bool (abstract_type::*predicate)(const abstract_type&) const) const; static sstring make_name(const std::vector& types); diff --git a/vint-serialization.hh b/vint-serialization.hh index d47b70a34a..d7e01065d6 100644 --- a/vint-serialization.hh +++ b/vint-serialization.hh @@ -31,6 +31,8 @@ using vint_size_type = bytes::size_type; +static constexpr size_t max_vint_length = 9; + struct unsigned_vint final { using value_type = uint64_t;