Merge "upport for writing SSTables 3.0 - rows only" from Vladimir

"
This patch series introduces initial support for writing SSTables in
'mc' format (aka SSTables 3.0).

Currently, the following components are written in 3.0 format:
  - Data.db
  - Index.db
  - Summary.db
(there were no changes to summary files format compared to ka/la)
Other SSTables components are written in the old format for now as they
still need to exist to satisfy post-flush processing.

For now, only rows are written to the data file and indexed. Range
tombstones are not supported.

Writing rows is supported in full with the only exception being counter
cells. All the other features (TTLed data, row/cell level tombstones,
collections, etc) are supported.

Unit tests rely on producing files and binary-comparing them with
'golden' copies that are produced using Cassandra 3.11. This is done to
not block until reading SSTables 3.0 format is implemented.

=======================================
Implementation notes
=======================================

Internally, sstable_writer has been refactored to support multiple
implementations that are instantiated in its constructor based on the
sstable version. Little to no code is shared among sstable_writer_v2 and
sstable_writer_v3 as we only intend to support sstable_writer_v2
alongside sstable_writer_v3 for a single release (to be able to do
rollback on rolling upgrade failure) and then plan to get rid of it
entirely and switch to always writing SSTables in the new format.

The design of sstable_writer_v3 mostly follows that of its precursors
sstable_writer(_v2) and components_writer. Some refactoring and further
code rearrangements are expected in the future but the main code is
there.
"

* 'projects/sstables-30/write-rows/v2' of https://github.com/argenet/scylla:
  Add tests for writing data and index files in SSTables 3.0 ('mc') format.
  Support for writing SSTables 3.0 ('mc') Data.db and Index.db files - rows only.
  Add missing enum values to bound_kind.
  Add building blocks for writing data in SSTables 3.0 format.
  Refactor sstable_writer to support various internal implementations.
  Add is_fixed_length() to data types.
  Add mutation_partition::apply_insert() overload that accepts TTL and expiry for row marker.
This commit is contained in:
Avi Kivity
2018-04-27 17:10:31 +03:00
39 changed files with 1758 additions and 49 deletions

View File

@@ -32,7 +32,10 @@
enum class bound_kind : uint8_t {
excl_end = 0,
incl_start = 1,
// values 2 to 5 are reserved for forward Origin compatibility
excl_end_incl_start = 2,
static_clustering = 3,
clustering = 4,
incl_end_excl_start = 5,
incl_end = 6,
excl_start = 7,
};

View File

@@ -412,6 +412,7 @@ scylla_core = (['database.cc',
'sstables/compaction_manager.cc',
'sstables/integrity_checked_file_impl.cc',
'sstables/prepended_input_stream.cc',
'sstables/m_format_write_helpers.cc',
'transport/event.cc',
'transport/event_notifier.cc',
'transport/server.cc',

14
keys.cc
View File

@@ -63,6 +63,14 @@ std::ostream& operator<<(std::ostream& out, const bound_kind k) {
return out << "excl end";
case bound_kind::incl_start:
return out << "incl start";
case bound_kind::excl_end_incl_start:
return out << "excl end + incl start";
case bound_kind::static_clustering:
return out << "static clustering";
case bound_kind ::clustering:
return out << "clustering";
case bound_kind::incl_end_excl_start:
return out << "incl end + excl start";
case bound_kind::incl_end:
return out << "incl end";
case bound_kind::excl_start:
@@ -77,6 +85,10 @@ bound_kind invert_kind(bound_kind k) {
case bound_kind::incl_start: return bound_kind::excl_end;
case bound_kind::excl_end: return bound_kind::incl_start;
case bound_kind::incl_end: return bound_kind::excl_start;
case bound_kind::excl_end_incl_start: return bound_kind::incl_end_excl_start;
case bound_kind::incl_end_excl_start: return bound_kind::excl_end_incl_start;
case bound_kind::static_clustering: return bound_kind::static_clustering;
case bound_kind::clustering: return bound_kind::clustering;
}
abort();
}
@@ -91,6 +103,8 @@ int32_t weight(bound_kind k) {
return 1;
case bound_kind::excl_start:
return 2;
default:
throw std::invalid_argument(sprint("weight() is not defined for bound_kind {}", k));
}
abort();
}

View File

@@ -443,7 +443,10 @@ void
mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::timestamp_type created_at) {
clustered_row(s, key).apply(row_marker(created_at));
}
void mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::timestamp_type created_at,
gc_clock::duration ttl, gc_clock::time_point expiry) {
clustered_row(s, key).apply(row_marker(created_at, ttl, expiry));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
auto e = current_allocator().construct<rows_entry>(key, std::move(row));
_rows.insert(_rows.end(), *e, rows_entry::compare(s));

View File

@@ -953,6 +953,8 @@ public:
void apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t);
// Equivalent to applying a mutation with an empty row, created with given timestamp
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at);
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at,
gc_clock::duration ttl, gc_clock::time_point expiry);
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply_row_tombstone(const schema& schema, range_tombstone rt);

View File

@@ -58,10 +58,15 @@ int position_weight(bound_kind k) {
switch(k) {
case bound_kind::excl_end:
case bound_kind::incl_start:
case bound_kind::excl_end_incl_start:
return -1;
case bound_kind::incl_end:
case bound_kind::excl_start:
case bound_kind::incl_end_excl_start:
return 1;
case bound_kind::clustering:
case bound_kind::static_clustering:
return 0;
}
abort();
}

View File

@@ -549,7 +549,8 @@ public:
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &_active_write_monitors.back();
cfg.large_partition_warning_threshold_bytes = _cf.large_partition_warning_threshold_bytes();
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority));
// TODO: calculate encoding_stats based on statistics of compacted sstables
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, encoding_stats{}, priority));
}
return &*_writer;
}
@@ -696,7 +697,8 @@ public:
sstable_writer_config cfg;
cfg.max_sstable_size = _max_sstable_size;
auto&& priority = service::get_local_compaction_priority();
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, priority, _shard));
// TODO: calculate encoding_stats based on statistics of compacted sstables
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
}
return &*writer;
}

View File

@@ -0,0 +1,315 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <functional>
#include <boost/iterator/iterator_facade.hpp>
#include <boost/container/static_vector.hpp>
#include "encoding_stats.hh"
#include "schema.hh"
#include "mutation_fragment.hh"
#include "vint-serialization.hh"
#include "sstables/types.hh"
#include "sstables/m_format_write_helpers.hh"
#include "sstables/writer.hh"
namespace sstables {
template <typename T>
inline void write_vint_impl(file_writer& out, T value) {
using vint_type = std::conditional_t<std::is_unsigned_v<T>, unsigned_vint, signed_vint>;
std::array<bytes::value_type, max_vint_length> encoding_buffer;
const auto size = vint_type::serialize(value, encoding_buffer.begin());
out.write(reinterpret_cast<const char*>(encoding_buffer.data()), size);
}
void write_unsigned_vint(file_writer& out, uint64_t value) {
return write_vint_impl(out, value);
}
void write_signed_vint(file_writer& out, int64_t value) {
return write_vint_impl(out, value);
}
// A helper CRTP base class for input ranges.
// Derived classes should implement the following functions:
// bool next() const;
// generates the next value, if possible;
// returns true if the next value has been evaluated, false otherwise
// explicit operator bool() const;
// tells whether the range can produce more items
// TODO: turn description into a concept
template <typename InputRange, typename ValueType>
struct input_range_base {
private:
InputRange& self() {
return static_cast<InputRange&>(*this);
}
const InputRange& self() const {
return static_cast<const InputRange&>(*this);
}
public:
// Use the same type for iterator and const_iterator
using const_iterator = class iterator
: public boost::iterator_facade<
iterator,
const ValueType,
std::input_iterator_tag,
const ValueType
>
{
private:
const InputRange* _range;
friend class input_range_base;
friend class boost::iterator_core_access;
explicit iterator(const InputRange& range)
: _range(range.next() ? &range : nullptr)
{}
void increment() {
assert(_range);
if (!_range->next()) {
_range = nullptr;
}
}
bool equal(iterator that) const {
return (_range == that._range);
}
const ValueType dereference() const {
assert(_range);
return _range->get_value();
}
public:
iterator() : _range{} {}
};
iterator begin() const { return iterator{self()}; }
iterator end() const { return iterator{}; }
};
struct clustering_block {
constexpr static uint32_t max_block_size = 32;
uint64_t header = 0;
struct described_value {
bytes_view value;
std::reference_wrapper<const abstract_type> type;
};
boost::container::static_vector<described_value, clustering_block::max_block_size> values;
};
class clustering_blocks_input_range
: public input_range_base<clustering_blocks_input_range, clustering_block> {
private:
const schema& _schema;
const clustering_key_prefix& _prefix;
mutable clustering_block _current_block;
mutable uint32_t _offset = 0;
public:
clustering_blocks_input_range(const schema& s, const clustering_key_prefix& prefix)
: _schema(s), _prefix(prefix) {}
bool next() const {
if (_offset == _schema.clustering_key_size()) {
// No more values to encode
return false;
}
// Each block contains up to max_block_size values
auto limit = std::min(_schema.clustering_key_size(), _offset + clustering_block::max_block_size);
_current_block = {};
assert (_offset % clustering_block::max_block_size == 0);
while (_offset < limit) {
auto shift = _offset & clustering_block::max_block_size;
if (_offset < _prefix.size(_schema)) {
bytes_view value = _prefix.get_component(_schema, _offset);
if (value.empty()) {
_current_block.header |= (uint64_t(1) << (shift * 2));
} else {
_current_block.values.push_back({value, *_prefix.get_compound_type(_schema)->types()[_offset]});
}
} else {
// This (and all subsequent) values of the prefix are missing (null)
_current_block.header |= (uint64_t(1) << ((shift * 2) + 1));
}
++_offset;
}
return true;
}
clustering_block get_value() const { return _current_block; };
explicit operator bool() const {
return (_offset < _schema.clustering_key_size());
}
};
// Writes cell value according to its data type traits
// NOTE: this function is defined in sstables/sstables.cc
void write_cell_value(file_writer& out, const abstract_type& type, bytes_view value);
static void write(file_writer& out, const clustering_block& block) {
write_vint(out, block.header);
for (const auto& [value, type]: block.values) {
write_cell_value(out, type, value);
}
}
void write_clustering_prefix(file_writer& out, const schema& s, const clustering_key_prefix& prefix) {
clustering_blocks_input_range range{s, prefix};
for (const auto block: range) {
write(out, block);
}
}
// This range generates a sequence of values that represent information
// about missing columns for SSTables 3.0 format.
class missing_columns_input_range
: public input_range_base<missing_columns_input_range, uint64_t> {
private:
const schema& _schema;
const row& _row;
mutable uint64_t _current_value = 0;
mutable column_id _current_id = 0;
mutable bool _large_mode_produced_size = false;
enum class encoding_mode {
small,
large_encode_present,
large_encode_missing,
} _mode;
public:
missing_columns_input_range(const schema& s, const row& row)
: _schema(s)
, _row(row) {
auto row_size = _row.size();
auto total_size = _schema.regular_columns_count();
_current_id = row_size < total_size ? 0 : total_size;
_mode = (total_size < 64) ? encoding_mode::small :
(row_size < total_size / 2) ? encoding_mode::large_encode_present :
encoding_mode::large_encode_missing;
}
bool next() const {
auto total_size = _schema.regular_columns_count();
if (_current_id == total_size) {
// No more values to encode
return false;
}
if (_mode == encoding_mode::small) {
// Set bit for every missing column
for (column_id id = 0; id < total_size; ++id) {
auto cell = _row.find_cell(id);
if (!cell) {
_current_value |= (uint64_t(1) << id);
}
}
_current_id = total_size;
return true;
} else {
// For either of large modes, output the difference between total size and row size first
if (!_large_mode_produced_size) {
_current_value = total_size - _row.size();
_large_mode_produced_size = true;
return true;
}
if (_mode == encoding_mode::large_encode_present) {
while (_current_id < total_size) {
auto cell = _row.find_cell(_current_id);
if (cell) {
_current_value = _current_id;
++_current_id;
return true;
}
++_current_id;
}
} else {
assert(_mode == encoding_mode::large_encode_missing);
while (_current_id < total_size) {
auto cell = _row.find_cell(_current_id);
if (!cell) {
_current_value = _current_id;
++_current_id;
return true;
}
++_current_id;
}
}
}
return false;
}
uint64_t get_value() const { return _current_value; }
explicit operator bool() const
{
return (_current_id < _schema.regular_columns_count());
}
};
void write_missing_columns(file_writer& out, const schema& s, const row& row) {
for (const auto value: missing_columns_input_range{s, row}) {
write_vint(out, value);
}
}
template <typename T>
void write_unsigned_delta_vint(file_writer& out, T value, T base) {
using unsigned_type = std::make_unsigned_t<T>;
unsigned_type delta = value - base;
write_vint(out, delta);
}
void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, timestamp, enc_stats.min_timestamp);
}
void write_delta_ttl(file_writer& out, uint32_t ttl, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, ttl, enc_stats.min_ttl);
}
void write_delta_local_deletion_time(file_writer& out, uint32_t local_deletion_time, const encoding_stats& enc_stats) {
write_unsigned_delta_vint(out, local_deletion_time, enc_stats.min_local_deletion_time);
}
void write_delta_deletion_time(file_writer& out, deletion_time dt, const encoding_stats& enc_stats) {
write_delta_timestamp(out, dt.marked_for_delete_at, enc_stats);
write_delta_local_deletion_time(out, dt.local_deletion_time, enc_stats);
}
}; // namespace sstables

View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <type_traits>
#include "bytes.hh"
#include "types.hh"
#include "timestamp.hh"
class schema;
class row;
class clustering_key_prefix;
class encoding_stats;
namespace sstables {
class file_writer;
// Utilities for writing integral values in variable-length format
// See vint-serialization.hh for more details
void write_unsigned_vint(file_writer& out, uint64_t value);
void write_signed_vint(file_writer& out, int64_t value);
template <typename T>
typename std::enable_if_t<!std::is_integral_v<T>>
write_vint(file_writer& out, T t) = delete;
template <typename T>
inline void write_vint(file_writer& out, T value) {
static_assert(std::is_integral_v<T>, "Non-integral values can't be written using write_vint");
return std::is_unsigned_v<T> ? write_unsigned_vint(out, value) : write_signed_vint(out, value);
}
// Writes clustering prefix, full or not, encoded in SSTables 3.0 format
void write_clustering_prefix(file_writer& out, const schema& s, const clustering_key_prefix& prefix);
// Writes encoded information about missing columns in the given row
void write_missing_columns(file_writer& out, const schema& s, const row& row);
// Helper functions for writing delta-encoded time-related values
void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats);
void write_delta_ttl(file_writer& out, uint32_t ttl, const encoding_stats& enc_stats);
void write_delta_local_deletion_time(file_writer& out, uint32_t local_deletion_time, const encoding_stats& enc_stats);
void write_delta_deletion_time(file_writer& out, deletion_time dt, const encoding_stats& enc_stats);
}; // namespace sstables

View File

@@ -35,6 +35,7 @@
#include <iterator>
#include "types.hh"
#include "m_format_write_helpers.hh"
#include "sstables.hh"
#include "progress_monitor.hh"
#include "compress.hh"
@@ -64,6 +65,7 @@
#include "service/storage_service.hh"
#include "db/extensions.hh"
#include "unimplemented.hh"
#include "vint-serialization.hh"
thread_local disk_error_signal_type sstable_read_error;
thread_local disk_error_signal_type sstable_write_error;
@@ -1708,6 +1710,17 @@ void sstable::maybe_flush_pi_block(file_writer& out,
}
}
void write_cell_value(file_writer& out, const abstract_type& type, bytes_view value) {
if (!value.empty()) {
if (type.is_fixed_length()) {
write(sstable_version_types::mc, out, value);
} else {
write_vint(out, value.size());
write(sstable_version_types::mc, out, value);
}
}
}
static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type timestamp) {
c_stats.update_min_timestamp(timestamp);
c_stats.update_max_timestamp(timestamp);
@@ -2159,11 +2172,11 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
auto p_key = disk_string_view<uint16_t>();
p_key.value = bytes_view(*_partition_key);
// Write index file entry from partition key into index file.
// Write index file entry for partition key into index file.
// Write an index entry minus the "promoted index" (sample of columns)
// part. We can only write that after processing the entire partition
// and collecting the sample of columns.
write_index_header(_sst._version, _index, p_key, _out.offset());
write_index_header(_sst.get_version(), _index, p_key, _out.offset());
_sst._pi_write.data = {};
_sst._pi_write.numblocks = 0;
_sst._pi_write.deltime.local_deletion_time = std::numeric_limits<int32_t>::max();
@@ -2173,7 +2186,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
_sst._pi_write.schemap = &_schema; // sadly we need this
// Write partition key into data file.
write(_sst._version, _out, p_key);
write(_sst.get_version(), _out, p_key);
_tombstone_written = false;
}
@@ -2194,7 +2207,7 @@ void components_writer::consume(tombstone t) {
d.local_deletion_time = std::numeric_limits<int32_t>::max();
d.marked_for_delete_at = std::numeric_limits<int64_t>::min();
}
write(_sst._version, _out, d);
write(_sst.get_version(), _out, d);
_tombstone_written = true;
// TODO: need to verify we don't do this twice?
_sst._pi_write.deltime = d;
@@ -2263,13 +2276,13 @@ stop_iteration components_writer::consume_end_of_partition() {
_out.offset() - _sst._pi_write.block_start_offset);
_sst._pi_write.numblocks++;
}
write_index_promoted(_sst._version, _index, _sst._pi_write.data, _sst._pi_write.deltime,
write_index_promoted(_sst.get_version(), _index, _sst._pi_write.data, _sst._pi_write.deltime,
_sst._pi_write.numblocks);
_sst._pi_write.data = {};
_sst._pi_write.block_first_colname = {};
int16_t end_of_row = 0;
write(_sst._version, _out, end_of_row);
write(_sst.get_version(), _out, end_of_row);
// compute size of the current row.
_sst._c_stats.row_size = _out.offset() - _sst._c_stats.start_offset;
@@ -2300,7 +2313,7 @@ void components_writer::consume_end_of_stream() {
}
_sst.set_first_and_last_keys();
seal_statistics(_sst._version, _sst._components->statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(),
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(),
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key());
}
@@ -2350,7 +2363,50 @@ sstable::write_scylla_metadata(const io_priority_class& pc, shard_id shard, ssta
write_simple<component_type::Scylla>(*_components->scylla_metadata, pc);
}
void sstable_writer::prepare_file_writer()
struct sstable_writer::writer_impl {
virtual void consume_new_partition(const dht::decorated_key& dk) = 0;
virtual void consume(tombstone t) = 0;
virtual stop_iteration consume(static_row&& sr) = 0;
virtual stop_iteration consume(clustering_row&& cr) = 0;
virtual stop_iteration consume(range_tombstone&& rt) = 0;
virtual stop_iteration consume_end_of_partition() = 0;
virtual void consume_end_of_stream() = 0;
virtual ~writer_impl() {}
};
class sstable_writer_k_l : public sstable_writer::writer_impl {
sstable& _sst;
const schema& _schema;
const io_priority_class& _pc;
bool _backup;
bool _leave_unsealed;
bool _compression_enabled;
std::unique_ptr<file_writer> _writer;
stdx::optional<components_writer> _components_writer;
shard_id _shard; // Specifies which shard new sstable will belong to.
write_monitor* _monitor;
bool _correctly_serialize_non_compound_range_tombstones;
private:
void prepare_file_writer();
void finish_file_writer();
public:
sstable_writer_k_l(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config&, const io_priority_class& pc, shard_id shard = engine().cpu_id());
~sstable_writer_k_l();
sstable_writer_k_l(sstable_writer_k_l&& o) : _sst(o._sst), _schema(o._schema), _pc(o._pc), _backup(o._backup),
_leave_unsealed(o._leave_unsealed), _compression_enabled(o._compression_enabled), _writer(std::move(o._writer)),
_components_writer(std::move(o._components_writer)), _shard(o._shard), _monitor(o._monitor),
_correctly_serialize_non_compound_range_tombstones(o._correctly_serialize_non_compound_range_tombstones) { }
void consume_new_partition(const dht::decorated_key& dk) override { return _components_writer->consume_new_partition(dk); }
void consume(tombstone t) override { _components_writer->consume(t); }
stop_iteration consume(static_row&& sr) override { return _components_writer->consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr) override { return _components_writer->consume(std::move(cr)); }
stop_iteration consume(range_tombstone&& rt) override { return _components_writer->consume(std::move(rt)); }
stop_iteration consume_end_of_partition() override { return _components_writer->consume_end_of_partition(); }
void consume_end_of_stream() override;
};
void sstable_writer_k_l::prepare_file_writer()
{
file_output_stream_options options;
options.io_priority_class = _pc;
@@ -2364,21 +2420,21 @@ void sstable_writer::prepare_file_writer()
}
}
void sstable_writer::finish_file_writer()
void sstable_writer_k_l::finish_file_writer()
{
auto writer = std::move(_writer);
writer->close().get();
if (!_compression_enabled) {
auto chksum_wr = static_cast<checksummed_file_writer*>(writer.get());
write_digest(_sst._version, _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum());
write_crc(_sst._version, _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum());
write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum());
write_crc(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum());
} else {
write_digest(_sst._version, _sst._write_error_handler, _sst.filename(component_type::Digest), _sst._components->compression.full_checksum());
write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), _sst._components->compression.full_checksum());
}
}
sstable_writer::~sstable_writer() {
sstable_writer_k_l::~sstable_writer_k_l() {
if (_writer) {
try {
_writer->close().get();
@@ -2388,7 +2444,7 @@ sstable_writer::~sstable_writer() {
}
}
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
sstable_writer_k_l::sstable_writer_k_l(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, const io_priority_class& pc, shard_id shard)
: _sst(sst)
, _schema(s)
@@ -2414,7 +2470,7 @@ static sstable_enabled_features all_features() {
return sstable_enabled_features{(1 << sstable_feature::End) - 1};
}
void sstable_writer::consume_end_of_stream()
void sstable_writer_k_l::consume_end_of_stream()
{
_components_writer->consume_end_of_stream();
_components_writer = stdx::nullopt;
@@ -2439,6 +2495,665 @@ void sstable_writer::consume_end_of_stream()
_monitor->on_flush_completed();
}
enum class cell_flags : uint8_t {
none = 0x00,
is_deleted_mask = 0x01, // Whether the cell is a tombstone or not.
is_expiring_mask = 0x02, // Whether the cell is expiring.
has_empty_value_mask = 0x04, // Whether the cell has an empty value. This will be the case for a tombstone in particular.
use_row_timestamp_mask = 0x08, // Whether the cell has the same timestamp as the row this is a cell of.
use_row_ttl_mask = 0x10, // Whether the cell has the same TTL as the row this is a cell of.
};
inline cell_flags operator& (cell_flags lhs, cell_flags rhs) {
return cell_flags(static_cast<uint8_t>(lhs) & static_cast<uint8_t>(rhs));
}
inline cell_flags& operator |= (cell_flags& lhs, cell_flags rhs) {
lhs = cell_flags(static_cast<uint8_t>(lhs) | static_cast<uint8_t>(rhs));
return lhs;
}
enum class row_flags : uint8_t {
none = 0x00,
// Signal the end of the partition. Nothing follows a <flags> field with that flag.
end_of_partition = 0x01,
// Whether the encoded unfiltered is a marker or a row. All following flags apply only to rows.
is_marker = 0x02,
// Whether the encoded row has a timestamp (i.e. its liveness_info is not empty).
has_timestamp = 0x04,
// Whether the encoded row has some expiration info (i.e. if its liveness_info contains TTL and local_deletion).
has_ttl = 0x08,
// Whether the encoded row has some deletion info.
has_deletion = 0x10,
// Whether the encoded row has all of the columns from the header present.
has_all_columns = 0x20,
// Whether the encoded row has some complex deletion for at least one of its complex columns.
has_complex_deletion = 0x40,
// If present, another byte is read containing the "extended flags" below.
extension_flag = 0x80
};
inline row_flags operator& (row_flags lhs, row_flags rhs) {
return row_flags(static_cast<uint8_t>(lhs) & static_cast<uint8_t>(rhs));
}
inline row_flags& operator |= (row_flags& lhs, row_flags rhs) {
lhs = row_flags(static_cast<uint8_t>(lhs) | static_cast<uint8_t>(rhs));
return lhs;
}
enum class row_extended_flags : uint8_t {
none = 0x00,
// Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
is_static = 0x01,
// Whether the row deletion is shadowable. If there is no extended flag (or no row deletion)
// the deletion is assumed not shadowable.
// This flag is deprecated in Origin - see CASSANDRA-11500.
has_shadowable_deletion = 0x02,
};
// Used for writing SSTables in 'mc' format.
class sstable_writer_m : public sstable_writer::writer_impl {
private:
sstable& _sst;
const schema& _schema;
const io_priority_class& _pc;
sstable_writer_config _cfg;
encoding_stats _enc_stats;
shard_id _shard; // Specifies which shard the new SStable will belong to.
std::unique_ptr<file_writer> _data_writer;
std::optional<file_writer> _index_writer;
bool _tombstone_written = false;
bool _row_deletion_written = false;
uint64_t _current_partition_offset = 0;
uint64_t _prev_row_start = 0;
std::optional<key> _partition_key;
stdx::optional<key> _first_key, _last_key;
index_sampling_state _index_sampling_state;
struct pi_block {
clustering_key_prefix first;
clustering_key_prefix last;
uint64_t offset;
uint64_t width;
};
// _pi_write_m is used temporarily for building the promoted
// index (column sample) of one partition when writing a new sstable.
struct {
// Unfortunately we cannot output the promoted index directly to the
// index file because it needs to be prepended by its size.
seastar::circular_buffer<pi_block> promoted_index;
tombstone tomb;
uint64_t block_start_offset;
uint64_t block_next_start_offset;
std::optional<clustering_key_prefix> first_clustering;
std::optional<clustering_key_prefix> last_clustering;
size_t desired_block_size;
} _pi_write_m;
void init_file_writers();
void close_data_writer();
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
consume(tombstone());
}
}
void write_delta_timestamp(file_writer& writer, api::timestamp_type timestamp) {
sstables::write_delta_timestamp(writer, timestamp, _enc_stats);
}
void write_delta_ttl(file_writer& writer, uint32_t ttl) {
sstables::write_delta_ttl(writer, ttl, _enc_stats);
}
void write_delta_local_deletion_time(file_writer& writer, uint32_t ldt) {
sstables::write_delta_local_deletion_time(writer, ldt, _enc_stats);
}
void write_delta_deletion_time(file_writer& writer, deletion_time dt) {
sstables::write_delta_deletion_time(writer, dt, _enc_stats);
}
struct row_time_properties {
std::optional<api::timestamp_type> timestamp;
std::optional<uint32_t> ttl;
std::optional<uint32_t> local_deletion_time;
};
// Writes single atomic cell
void write_cell(file_writer& writer, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, bytes_view cell_path = {});
// Writes information about row liveness (formerly 'row marker')
void write_liveness_info(file_writer& writer, const row_marker& marker);
// Writes a CQL collection (list, set or map)
void write_collection(file_writer& writer, const column_definition& cdef, collection_mutation_view collection,
const row_time_properties& properties, bool has_complex_deletion);
void write_cells(file_writer& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion = false);
void write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion);
void write_static_row(const row& static_row);
void write_clustered_row(const clustering_row& clustered_row, uint64_t prev_row_size);
std::vector<uint32_t> write_promoted_index(file_writer& writer);
public:
sstable_writer_m(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats,
const io_priority_class& pc, shard_id shard = engine().cpu_id())
: _sst(sst)
, _schema(s)
, _pc(pc)
, _cfg(cfg)
, _enc_stats(enc_stats)
, _shard(shard)
{
_index_sampling_state.summary_byte_cost = summary_byte_cost();
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
_sst.write_toc(_pc);
_sst.create_data().get();
if (!_sst.has_component(component_type::CRC)) {
throw std::runtime_error("Compression is not yet implemented for SSTables 3.0 yet");
}
init_file_writers();
_sst._shards = { shard };
_cfg.monitor->on_write_started(_data_writer->offset_tracker());
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance());
_pi_write_m.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024);
_sst._correctly_serialize_non_compound_range_tombstones = _cfg.correctly_serialize_non_compound_range_tombstones;
_index_sampling_state.summary_byte_cost = summary_byte_cost();
prepare_summary(_sst._components->summary, estimated_partitions, _schema.min_index_interval());
}
~sstable_writer_m();
sstable_writer_m(sstable_writer_m&& o) = default;
void consume_new_partition(const dht::decorated_key& dk) override;
void consume(tombstone t) override;
stop_iteration consume(static_row&& sr) override;
stop_iteration consume(clustering_row&& cr) override;
stop_iteration consume(range_tombstone&& rt) override {
throw std::runtime_error("consume(range_tombstone) is not yet implemented for SSTables v3");
}
stop_iteration consume_end_of_partition();
void consume_end_of_stream() override;
};
sstable_writer_m::~sstable_writer_m() {
auto close_writer = [](auto& writer) {
if (writer) {
try {
writer->close().get();
} catch (...) {
sstlog.error("sstable_writer_m failed to close file: {}", std::current_exception());
}
}
};
close_writer(_index_writer);
close_writer(_data_writer);
}
void sstable_writer_m::init_file_writers() {
file_output_stream_options options;
options.io_priority_class = _pc;
options.buffer_size = _sst.sstable_buffer_size;
options.write_behind = 10;
_data_writer = std::make_unique<checksummed_file_writer>(std::move(_sst._data_file), options, true);
_index_writer.emplace(std::move(_sst._index_file), options);
}
void sstable_writer_m::close_data_writer() {
auto writer = std::move(_data_writer);
writer->close().get();
auto chksum_wr = static_cast<checksummed_file_writer*>(writer.get());
write_digest(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::Digest), chksum_wr->full_checksum());
write_crc(_sst.get_version(), _sst._write_error_handler, _sst.filename(component_type::CRC), chksum_wr->finalize_checksum());
}
void sstable_writer_m::consume_new_partition(const dht::decorated_key& dk) {
_current_partition_offset = _data_writer->offset();
_prev_row_start = 0;
_partition_key = key::from_partition_key(_schema, dk.key());
_sst._components->filter->add(bytes_view(*_partition_key));
_sst._collector.add_key(bytes_view(*_partition_key));
auto p_key = disk_string_view<uint16_t>();
p_key.value = bytes_view(*_partition_key);
// Write index file entry from partition key into index file.
// Write an index entry minus the "promoted index" (sample of columns)
// part. We can only write that after processing the entire partition
// and collecting the sample of columns.
write(_sst.get_version(), *_index_writer, p_key);
write_vint(*_index_writer, _data_writer->offset());
_pi_write_m.promoted_index = {};
_pi_write_m.tomb = {};
_pi_write_m.first_clustering.reset();
_pi_write_m.last_clustering.reset();
write(_sst.get_version(), *_data_writer, p_key);
_tombstone_written = false;
}
deletion_time to_deletion_time(tombstone t) {
deletion_time dt;
if (t) {
dt.local_deletion_time = t.deletion_time.time_since_epoch().count();
dt.marked_for_delete_at = t.timestamp;
} else {
// Default values for live, non-deleted rows.
dt.local_deletion_time = std::numeric_limits<int32_t>::max();
dt.marked_for_delete_at = std::numeric_limits<int64_t>::min();
}
return dt;
}
void sstable_writer_m::consume(tombstone t) {
write(_sst.get_version(), *_data_writer, to_deletion_time(t));
_pi_write_m.tomb = t;
_tombstone_written = true;
}
void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, bytes_view cell_path) {
bytes_view cell_value = cell.value();
bool has_value = !cell_value.empty();
bool is_deleted = cell.is_dead(_sst._now);
if (is_deleted) {
has_value = false;
}
bool use_row_timestamp = (properties.timestamp == cell.timestamp());
bool is_row_expiring = properties.ttl.has_value();
bool is_cell_expiring = cell.is_live_and_has_ttl() || properties.ttl;
bool is_expiring = is_row_expiring || is_cell_expiring;
bool use_row_ttl = is_row_expiring || (is_cell_expiring &&
(properties.ttl == cell.ttl().count()) &&
(properties.local_deletion_time == cell.deletion_time().time_since_epoch().count()));
cell_flags flags = cell_flags::none;
if (!has_value) {
flags |= cell_flags::has_empty_value_mask;
}
if (is_deleted) {
flags |= cell_flags::is_deleted_mask;
} else if (is_expiring) {
flags |= cell_flags::is_expiring_mask;
}
if (use_row_timestamp) {
flags |= cell_flags::use_row_timestamp_mask;
}
if (use_row_ttl) {
flags |= cell_flags::use_row_ttl_mask;
}
write(_sst.get_version(), writer, flags);
if (!use_row_timestamp) {
write_delta_timestamp(writer, cell.timestamp());
}
if (!use_row_ttl) {
if (is_deleted) {
write_delta_local_deletion_time(writer, cell.deletion_time().time_since_epoch().count());
} else if (is_expiring) {
write_delta_local_deletion_time(writer, cell.expiry().time_since_epoch().count());
write_delta_ttl(writer, cell.ttl().count());
}
}
if (!cell_path.empty()) {
write_vint(writer, cell_path.size());
write(_sst.get_version(), writer, cell_path);
}
if (has_value) {
write_cell_value(writer, *cdef.type, cell_value);
}
}
void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker& marker) {
if (marker.is_missing()) {
return;
}
uint64_t timestamp = marker.timestamp();
if (marker.is_dead(_sst._now)) {
// the row has expired by the time of flush
// write deletion info instead of liveness info
deletion_time dt;
dt.local_deletion_time = marker.deletion_time().time_since_epoch().count();
dt.marked_for_delete_at = timestamp;
write_delta_deletion_time(writer, dt);
_row_deletion_written = true;
} else { // marker.is_live()
write_delta_timestamp(writer, timestamp);
if (marker.is_expiring()) {
write_delta_ttl(writer, marker.ttl().count());
write_delta_local_deletion_time(writer, marker.expiry().time_since_epoch().count());
}
}
}
void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef,
collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) {
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
auto mview = t->deserialize_mutation_form(collection);
if (has_complex_deletion) {
write_delta_deletion_time(writer, to_deletion_time(mview.tomb));
}
write_vint(writer, mview.cells.size());
for (const auto& [cell_path, cell]: mview.cells) {
write_cell(writer, cell, cdef, properties, cell_path);
}
}
void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body,
const row_time_properties& properties, bool has_complex_deletion) {
// Note that missing columns are written based on the whole set of regular columns as defined by schema.
// This differs from Origin where all updated columns are tracked and the set of filled columns of a row
// is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases
// but still valid.
write_missing_columns(writer, _schema, row_body);
row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) {
auto&& column_definition = _schema.column_at(kind, id);
if (!column_definition.is_atomic()) {
write_collection(writer, column_definition, c.as_collection_mutation(), properties, has_complex_deletion);
return;
}
atomic_cell_view cell = c.as_atomic_cell();
write_cell(writer, cell, column_definition, properties);
});
}
void sstable_writer_m::write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion) {
_row_deletion_written = false;
// write_liveness_info may end up writing deletion info for the row if the row
// has expired by the time of writing. If this happens, _row_deletion_writen is set
write_liveness_info(writer, row.marker());
if (row.tomb() && !_row_deletion_written) {
write_delta_deletion_time(writer, to_deletion_time(row.tomb().tomb()));
}
row_time_properties properties;
if (!row.marker().is_missing()) {
properties.timestamp = row.marker().timestamp();
if (row.marker().is_expiring()) {
properties.ttl = row.marker().ttl().count();
properties.local_deletion_time = row.marker().deletion_time().time_since_epoch().count();
}
}
return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion);
}
template <typename Func>
uint64_t calculate_write_size(Func&& func) {
uint64_t written_size = 0;
{
auto counting_writer = file_writer(make_sizing_output_stream(written_size));
func(counting_writer);
counting_writer.flush().get();
counting_writer.close().get();
}
return written_size;
}
void sstable_writer_m::write_static_row(const row& static_row) {
assert(_schema.is_compound());
// Static row flag is stored in extended flags so extension_flag is always set for static rows
row_flags flags = row_flags::extension_flag;
if (static_row.size() == _schema.static_columns_count()) {
flags |= row_flags::has_all_columns;
}
write(_sst.get_version(), *_data_writer, flags);
write(_sst.get_version(), *_data_writer, row_extended_flags::is_static);
// Calculate the size of the row body
auto write_row = [this, &static_row] (file_writer& writer) {
write_cells(writer, column_kind::static_column, static_row, row_time_properties{});
};
uint64_t row_body_size = calculate_write_size(write_row) + unsigned_vint::serialized_size(0);
write_vint(*_data_writer, row_body_size);
write_vint(*_data_writer, 0); // as the static row always comes first, the previous row size is always zero
write_row(*_data_writer);
}
stop_iteration sstable_writer_m::consume(static_row&& sr) {
ensure_tombstone_is_written();
write_static_row(sr.cells());
return stop_iteration::no;
}
// Find if any collection in the row contains a collection-wide tombstone
static bool row_has_complex_deletion(const schema& s, const row& r) {
bool result = false;
r.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& c) {
auto&& cdef = s.column_at(column_kind::regular_column, id);
if (cdef.is_atomic()) {
return stop_iteration::no;
}
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
auto mview = t->deserialize_mutation_form(c.as_collection_mutation());
if (mview.tomb) {
result = true;
}
return stop_iteration(static_cast<bool>(mview.tomb));
});
return result;
}
void sstable_writer_m::write_clustered_row(const clustering_row& clustered_row, uint64_t prev_row_size) {
row_flags flags = row_flags::none;
row_extended_flags ext_flags = row_extended_flags::none;
if (clustered_row.marker().is_live()) {
flags |= row_flags::has_timestamp;
if (clustered_row.marker().is_expiring()) {
flags |= row_flags::has_ttl;
}
}
if ((!clustered_row.marker().is_missing() && clustered_row.marker().is_dead(_sst._now)) || clustered_row.tomb().tomb()) {
flags |= row_flags::has_deletion;
if (clustered_row.tomb().tomb() && clustered_row.tomb().is_shadowable()) {
ext_flags = row_extended_flags::has_shadowable_deletion;
}
}
if (clustered_row.cells().size() == _schema.regular_columns_count()) {
flags |= row_flags::has_all_columns;
}
bool has_complex_deletion = row_has_complex_deletion(_schema, clustered_row.cells());
if (has_complex_deletion) {
flags |= row_flags::has_complex_deletion;
}
write(_sst.get_version(), *_data_writer, flags);
if (ext_flags != row_extended_flags::none) {
write(_sst.get_version(), *_data_writer, ext_flags);
}
write_clustering_prefix(*_data_writer, _schema, clustered_row.key());
auto write_row = [this, &clustered_row, has_complex_deletion] (file_writer& writer) {
write_row_body(writer, clustered_row, has_complex_deletion);
};
uint64_t row_body_size = calculate_write_size(write_row) + unsigned_vint::serialized_size(prev_row_size);
write_vint(*_data_writer, row_body_size);
write_vint(*_data_writer, prev_row_size);
write_row(*_data_writer);
}
stop_iteration sstable_writer_m::consume(clustering_row&& cr) {
ensure_tombstone_is_written();
uint64_t pos = _data_writer->offset();
if (!_pi_write_m.first_clustering) {
_pi_write_m.first_clustering = cr.key();
_pi_write_m.block_start_offset = pos;
_pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size;
}
write_clustered_row(cr, pos - _prev_row_start);
_pi_write_m.last_clustering = cr.key();
pos = _data_writer->offset();
_prev_row_start = pos;
if (pos >= _pi_write_m.block_next_start_offset) {
_pi_write_m.promoted_index.push_back({
*_pi_write_m.first_clustering,
*_pi_write_m.last_clustering,
_pi_write_m.block_start_offset - _current_partition_offset,
pos - _pi_write_m.block_start_offset});
_pi_write_m.first_clustering.reset();
_pi_write_m.block_next_start_offset = pos + _pi_write_m.desired_block_size;
}
return stop_iteration::no;
}
// Write clustering prefix along with its bound kind and, if not full, its size
static void write_clustering_prefix(file_writer& writer, bound_kind kind,
const schema& s, const clustering_key_prefix& clustering) {
assert(kind != bound_kind::static_clustering);
write(sstable_version_types::mc, writer, kind);
if (kind != bound_kind::clustering) {
write(sstable_version_types::mc, writer, static_cast<uint16_t>(clustering.size(s)));
}
write_clustering_prefix(writer, s, clustering);
}
std::vector<uint32_t> sstable_writer_m::write_promoted_index(file_writer& writer) {
static constexpr size_t width_base = 65536;
if (_pi_write_m.promoted_index.empty()) {
return {};
}
write(_sst.get_version(), writer, to_deletion_time(_pi_write_m.tomb));
write_vint(writer, _pi_write_m.promoted_index.size());
std::vector<uint32_t> offsets;
offsets.reserve(_pi_write_m.promoted_index.size());
uint64_t start = writer.offset();
for (const pi_block& block: _pi_write_m.promoted_index) {
offsets.push_back(writer.offset() - start);
write_clustering_prefix(writer, bound_kind::clustering, _schema, block.first);
write_clustering_prefix(writer, bound_kind::clustering, _schema, block.last);
write_vint(writer, block.offset);
write_signed_vint(writer, block.width - width_base);
// TODO: serialize end open marker here later, for now always write "false"
// to indicate there is no end open marker
write(_sst.get_version(), writer, (std::byte)0);
}
return offsets;
}
stop_iteration sstable_writer_m::consume_end_of_partition() {
if (!_pi_write_m.promoted_index.empty() && _pi_write_m.first_clustering) {
_pi_write_m.promoted_index.push_back({
*_pi_write_m.first_clustering,
*_pi_write_m.last_clustering,
_pi_write_m.block_start_offset - _current_partition_offset,
_data_writer->offset() - _pi_write_m.block_start_offset});
}
auto write_pi = [this] (file_writer& writer) {
return write_promoted_index(writer);
};
uint64_t pi_size = calculate_write_size(write_pi);
write_vint(*_index_writer, pi_size);
auto offsets = write_pi(*_index_writer);
for (uint32_t offset: offsets) {
write(_sst.get_version(), *_index_writer, offset);
}
write(_sst.get_version(), *_data_writer, row_flags::end_of_partition);
if (!_first_key) {
_first_key = *_partition_key;
}
_last_key = std::move(*_partition_key);
return stop_iteration::no;
}
void sstable_writer_m::consume_end_of_stream() {
seal_summary(_sst._components->summary, std::move(_first_key), std::move(_last_key), _index_sampling_state);
if (_sst.has_component(component_type::CompressionInfo)) {
_sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
_index_writer->close().get();
_index_writer.reset();
_sst.set_first_and_last_keys();
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector,
dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(),
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key());
_cfg.monitor->on_data_write_completed();
close_data_writer();
_sst.write_summary(_pc);
_sst.write_filter(_pc);
_sst.write_statistics(_pc);
_sst.write_compression(_pc);
auto features = all_features();
if (!_cfg.correctly_serialize_non_compound_range_tombstones) {
features.disable(sstable_feature::NonCompoundRangeTombstones);
}
_sst.write_scylla_metadata(_pc, _shard, std::move(features));
_cfg.monitor->on_write_completed();
if (!_cfg.leave_unsealed) {
_sst.seal_sstable(_cfg.backup).get();
}
_cfg.monitor->on_flush_completed();
}
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard) {
if (sst.get_version() == sstable_version_types::mc) {
_impl = std::make_unique<sstable_writer_m>(sst, s, estimated_partitions, cfg, enc_stats, pc, shard);
} else {
_impl = std::make_unique<sstable_writer_k_l>(sst, s, estimated_partitions, cfg, pc, shard);
}
}
void sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
return _impl->consume_new_partition(dk);
}
void sstable_writer::consume(tombstone t) {
return _impl->consume(t);
}
stop_iteration sstable_writer::consume(static_row&& sr) {
return _impl->consume(std::move(sr));
}
stop_iteration sstable_writer::consume(clustering_row&& cr) {
return _impl->consume(std::move(cr));
}
stop_iteration sstable_writer::consume(range_tombstone&& rt) {
return _impl->consume(std::move(rt));
}
stop_iteration sstable_writer::consume_end_of_partition() {
return _impl->consume_end_of_partition();
}
void sstable_writer::consume_end_of_stream() {
return _impl->consume_end_of_stream();
}
sstable_writer::sstable_writer(sstable_writer&& o) = default;
sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default;
sstable_writer::~sstable_writer() = default;
future<> sstable::seal_sstable(bool backup)
{
return seal_sstable().then([this, backup] {
@@ -2452,9 +3167,10 @@ future<> sstable::seal_sstable(bool backup)
});
}
sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, const sstable_writer_config& cfg, const io_priority_class& pc, shard_id shard)
sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard)
{
return sstable_writer(*this, s, estimated_partitions, cfg, pc, shard);
return sstable_writer(*this, s, estimated_partitions, cfg, enc_stats, pc, shard);
}
future<> sstable::write_components(
@@ -2467,8 +3183,8 @@ future<> sstable::write_components(
if (cfg.replay_position) {
_collector.set_replay_position(cfg.replay_position.value());
}
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, pc);
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc);
mr.consume_in_thread(std::move(wr));
});
}

View File

@@ -70,6 +70,8 @@ namespace sstables {
extern logging::logger sstlog;
class key;
class sstable_writer;
class sstable_writer_k_l;
class sstable_writer_m;
struct foreign_sstable_open_info;
struct sstable_open_info;
@@ -260,6 +262,7 @@ public:
sstable_writer get_writer(const schema& s,
uint64_t estimated_partitions,
const sstable_writer_config&,
encoding_stats enc_stats,
const io_priority_class& pc = default_priority_class(),
shard_id shard = engine().cpu_id());
@@ -329,6 +332,10 @@ public:
return _components->filter->memory_size();
}
version_types get_version() const {
return _version;
}
// Returns the total bytes of all components.
uint64_t bytes_on_disk();
@@ -696,7 +703,8 @@ public:
friend class test;
friend class components_writer;
friend class sstable_writer;
friend class sstable_writer_k_l;
friend class sstable_writer_m;
friend class index_reader;
template <typename DataConsumeRowsContext>
friend data_consume_context<DataConsumeRowsContext>
@@ -810,34 +818,26 @@ public:
};
class sstable_writer {
sstable& _sst;
const schema& _schema;
const io_priority_class& _pc;
bool _backup;
bool _leave_unsealed;
bool _compression_enabled;
std::unique_ptr<file_writer> _writer;
stdx::optional<components_writer> _components_writer;
shard_id _shard; // Specifies which shard new sstable will belong to.
write_monitor* _monitor;
bool _correctly_serialize_non_compound_range_tombstones;
public:
class writer_impl;
private:
void prepare_file_writer();
void finish_file_writer();
std::unique_ptr<writer_impl> _impl;
public:
sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config&, const io_priority_class& pc, shard_id shard = engine().cpu_id());
const sstable_writer_config&, encoding_stats enc_stats,
const io_priority_class& pc, shard_id shard = engine().cpu_id());
sstable_writer(sstable_writer&& o);
sstable_writer& operator=(sstable_writer&& o);
~sstable_writer();
sstable_writer(sstable_writer&& o) : _sst(o._sst), _schema(o._schema), _pc(o._pc), _backup(o._backup),
_leave_unsealed(o._leave_unsealed), _compression_enabled(o._compression_enabled), _writer(std::move(o._writer)),
_components_writer(std::move(o._components_writer)), _shard(o._shard), _monitor(o._monitor),
_correctly_serialize_non_compound_range_tombstones(o._correctly_serialize_non_compound_range_tombstones) { }
void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); }
void consume(tombstone t) { _components_writer->consume(t); }
stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr) { return _components_writer->consume(std::move(cr)); }
stop_iteration consume(range_tombstone&& rt) { return _components_writer->consume(std::move(rt)); }
stop_iteration consume_end_of_partition() { return _components_writer->consume_end_of_partition(); }
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t);
stop_iteration consume(static_row&& sr);
stop_iteration consume(clustering_row&& cr);
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
};

View File

@@ -20,17 +20,23 @@
*/
#include <set>
#include <fstream>
#include <iterator>
#include <boost/test/unit_test.hpp>
#include <seastar/core/thread.hh>
#include <seastar/tests/test-utils.hh>
#include "sstables/sstables.hh"
#include "compress.hh"
#include "schema_builder.hh"
#include "tests/test-utils.hh"
#include "sstable_test.hh"
#include "flat_mutation_reader_assertions.hh"
#include "memtable-sstable.hh"
#include "sstable_test.hh"
#include "tests/test_services.hh"
#include "tests/tmpdir.hh"
using namespace sstables;
@@ -217,3 +223,515 @@ SEASTAR_TEST_CASE(test_uncompressed_simple_read_index) {
BOOST_REQUIRE_EQUAL(1, vec.size());
});
}
static void compare_files(sstring filename1, sstring filename2) {
std::ifstream ifs1(filename1);
std::ifstream ifs2(filename2);
std::istream_iterator<char> b1(ifs1), e1;
std::istream_iterator<char> b2(ifs2), e2;
BOOST_CHECK_EQUAL_COLLECTIONS(b1, e1, b2, e2);
}
static void write_and_compare_sstables(schema_ptr s, lw_shared_ptr<memtable> mt, sstring table_name) {
storage_service_for_tests ssft;
tmpdir tmp;
auto sst = sstables::test::make_test_sstable(4096, s, tmp.path, 1, sstables::sstable_version_types::mc, sstable::format_types::big);
write_memtable_to_sstable(*mt, sst).get();
for (auto file_type : {component_type::Data, component_type::Index}) {
auto orig_filename =
sstable::filename("tests/sstables/3.x/uncompressed/write_" + table_name, "ks",
table_name, sstables::sstable_version_types::mc, 1, big, file_type);
auto result_filename =
sstable::filename(tmp.path, "ks", table_name, sstables::sstable_version_types::mc, 1, big, file_type);
compare_files(orig_filename, result_filename);
}
}
SEASTAR_TEST_CASE(test_write_static_row) {
return seastar::async([] {
sstring table_name = "static_row";
// CREATE TABLE static_row (pk int, ck int, st1 int static, st2 text static, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", utf8_type}},
// clustering key
{{"ck", int32_type}},
// regular columns
{},
// static columns
{{"st1", int32_type}, {"st2", utf8_type}},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - static row test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
api::timestamp_type ts = api::new_timestamp();
// INSERT INTO static_row (pk, st1, st2) values ('key1', 1135, 'hello');
auto key = make_dkey(s, {to_bytes("key1")});
mutation mut{s, key};
mut.set_static_cell("st1", data_value{1135}, ts);
mut.set_static_cell("st2", data_value{"hello"}, ts);
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_composite_clustering_key) {
return seastar::async([] {
sstring table_name = "composite_clustering_key";
// CREATE TABLE composite_clustering_key (a int , b text, c int, d text, e int, f text, PRIMARY KEY (a, b, c, d)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"a", int32_type}},
// clustering key
{{"b", utf8_type}, {"c", int32_type}, {"d", utf8_type}},
// regular columns
{{"e", int32_type}, {"f", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - composite clustering key test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
api::timestamp_type ts = api::new_timestamp();
// INSERT INTO composite_clustering_key (a,b,c,d,e,f) values (1, 'hello', 2, 'dear', 3, 'world');
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
clustering_key ckey = clustering_key::from_deeply_exploded(*s, { "hello", 2, "dear" });
mut.partition().apply_insert(*s, ckey, ts);
mut.set_cell(ckey, "e", data_value{3}, ts);
mut.set_cell(ckey, "f", data_value{"world"}, ts);
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_wide_partition) {
return seastar::async([] {
sstring table_name = "wide_partition";
// CREATE TABLE wide_partition (pk text , ck text, rc text, PRIMARY KEY (pk, ck) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", utf8_type}},
// clustering key
{{"ck", utf8_type}},
// regular columns
{{"rc", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - wide partition test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
auto key = make_dkey(s, {to_bytes("key")});
mutation mut{s, key};
sstring ck_base(1024, 'a');
sstring rc_base(1024, 'b');
api::timestamp_type ts = api::new_timestamp();
for (auto idx: boost::irange(0, 1024)) {
clustering_key ckey = clustering_key::from_deeply_exploded(*s, {format("{}{}", ck_base, idx)});
mut.partition().apply_insert(*s, ckey, ts);
mut.set_cell(ckey, "rc", data_value{format("{}{}", rc_base, idx)}, ts);
mt->apply(std::move(mut));
seastar::thread::yield();
}
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_ttled_row) {
return seastar::async([] {
sstring table_name = "ttled_row";
// CREATE TABLE ttled_row (pk int, ck int, rc int, PRIMARY KEY (pk));
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{{"ck", int32_type}},
// regular columns
{{"rc", int32_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - TTL-ed row test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// INSERT INTO ttled_row (pk, ck, rc) VALUES ( 1, 2, 3) USING TTL 1135;
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
clustering_key ckey = clustering_key::from_deeply_exploded(*s, { 2 });
api::timestamp_type ts = api::new_timestamp();
gc_clock::time_point tp = gc_clock::now();
gc_clock::duration ttl{1135};
mut.partition().apply_insert(*s, ckey, ts, ttl, tp + ttl);
mut.set_cell(ckey, "rc", data_value{3}, ts);
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_ttled_column) {
return seastar::async([] {
sstring table_name = "ttled_column";
// CREATE TABLE ttled_column (pk text, rc int, PRIMARY KEY (pk));
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", utf8_type}},
// clustering key
{},
// regular columns
{{"rc", int32_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - TTL-ed column test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// UPDATE ttled_column USING TTL 1135 SET rc = 1 WHERE pk='key';
auto key = make_dkey(s, {to_bytes("key")});
mutation mut{s, key};
api::timestamp_type ts = api::new_timestamp();
gc_clock::duration ttl{1135};
mut.set_clustered_cell(clustering_key::make_empty(), "rc", data_value{1}, ts, ttl);
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_deleted_column) {
return seastar::async([] {
sstring table_name = "deleted_column";
// CREATE TABLE deleted_cell (int pk, int rc, PRIMARY KEY (pk)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{},
// regular columns
{{"rc", int32_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - deleted column test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// DELETE rc FROM deleted_column WHERE pk=1;
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
//mut.partition().apply_delete(*s, clustering_key::make_empty(), tombstone{api::new_timestamp(), gc_clock::now()});
auto column_def = s->get_column_definition("rc");
if (!column_def) {
throw std::runtime_error("no column definition found");
}
mut.set_cell(clustering_key::make_empty(), *column_def, atomic_cell::make_dead(api::new_timestamp(), gc_clock::now()));
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_deleted_row) {
return seastar::async([] {
sstring table_name = "deleted_row";
// CREATE TABLE deleted_row (int pk, int ck, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{{"ck", int32_type}},
// regular columns
{},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - deleted row test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// DELETE FROM deleted_row WHERE pk=1 and ck=2;
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
clustering_key ckey = clustering_key::from_deeply_exploded(*s, { 2 });
mut.partition().apply_delete(*s, ckey, tombstone{api::new_timestamp(), gc_clock::now()});
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_collection_wide_update) {
return seastar::async([] {
sstring table_name = "collection_wide_update";
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
// CREATE TABLE collection_wide_update (pk int, col set<int>, PRIMARY KEY (pk)) with compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{},
// regular columns
{{"col", set_of_ints_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - collection wide update test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// INSERT INTO collection_wide_update (pk, col) VALUES (1, {2, 3});
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
api::timestamp_type ts = api::new_timestamp();
gc_clock::time_point tp = gc_clock::now();
mut.partition().apply_insert(*s, clustering_key::make_empty(), ts);
set_type_impl::mutation set_values {
{ts - 1, tp}, // tombstone
{
{int32_type->decompose(2), atomic_cell::make_live(ts, bytes_view{})},
{int32_type->decompose(3), atomic_cell::make_live(ts, bytes_view{})},
}
};
mut.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("col"), set_of_ints_type->serialize_mutation_form(set_values));
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_collection_incremental_update) {
return seastar::async([] {
sstring table_name = "collection_incremental_update";
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
// CREATE TABLE collection_incremental_update (pk int, col set<int>, PRIMARY KEY (pk)) with compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{},
// regular columns
{{"col", set_of_ints_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - collection incremental update test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
// UPDATE collection_incremental_update SET col = col + {2} WHERE pk = 1;
auto key = partition_key::from_deeply_exploded(*s, { 1 });
mutation mut{s, key};
api::timestamp_type ts = api::new_timestamp();
set_type_impl::mutation set_values {
{}, // tombstone
{
{int32_type->decompose(2), atomic_cell::make_live(ts, bytes_view{})},
}
};
mut.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("col"), set_of_ints_type->serialize_mutation_form(set_values));
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_multiple_partitions) {
return seastar::async([] {
sstring table_name = "multiple_partitions";
// CREATE TABLE multiple_partitions (pk int, rc1 int, rc2 int, rc3 int, PRIMARY KEY (pk)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{},
// regular columns
{{"rc1", int32_type}, {"rc2", int32_type}, {"rc3", int32_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - multiple partitions test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
api::timestamp_type ts = api::new_timestamp();
// INSERT INTO multiple_partitions (pk, rc1) VALUES (1, 10);
// INSERT INTO multiple_partitions (pk, rc2) VALUES (2, 20);
// INSERT INTO multiple_partitions (pk, rc3) VALUES (3, 30);
for (auto i : boost::irange(1, 4)) {
auto key = partition_key::from_deeply_exploded(*s, {i});
mutation mut{s, key};
mut.set_cell(clustering_key::make_empty(), to_bytes(format("rc{}", i)), data_value{i * 10}, ts);
mt->apply(std::move(mut));
ts += 10;
}
write_and_compare_sstables(s, mt, table_name);
});
}
SEASTAR_TEST_CASE(test_write_multiple_rows) {
return seastar::async([] {
sstring table_name = "multiple_rows";
// CREATE TABLE multiple_rows (pk int, ck int, rc1 int, rc2 int, rc3 int, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''};
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{{"ck", int32_type}},
// regular columns
{{"rc1", int32_type}, {"rc2", int32_type}, {"rc3", int32_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - multiple rows test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
auto key = partition_key::from_deeply_exploded(*s, {0});
api::timestamp_type ts = api::new_timestamp();
mutation mut{s, key};
// INSERT INTO multiple_rows (pk, ck, rc1) VALUES (0, 1, 10);
// INSERT INTO multiple_rows (pk, ck, rc2) VALUES (0, 2, 20);
// INSERT INTO multiple_rows (pk, ck, rc3) VALUES (0, 3, 30);
for (auto i : boost::irange(1, 4)) {
clustering_key ckey = clustering_key::from_deeply_exploded(*s, { i });
mut.partition().apply_insert(*s, ckey, ts);
mut.set_cell(ckey, to_bytes(format("rc{}", i)), data_value{i * 10}, ts);
ts += 10;
}
mt->apply(std::move(mut));
write_and_compare_sstables(s, mt, table_name);
});
}
// Information on missing columns is serialized differently when the number of columns is > 64.
// This test checks that this information is encoded correctly.
SEASTAR_TEST_CASE(test_write_missing_columns_large_set) {
return seastar::async([] {
sstring table_name = "missing_columns_large_set";
// CREATE TABLE missing_columns_large_set (pk int, ck int, rc1 int, ..., rc64, PRIMARY KEY (pk, ck)) WITH compression = {'sstable_compression': ''};
std::vector<schema::column> regular_columns;
regular_columns.reserve(64);
for (auto idx: boost::irange(1, 65)) {
regular_columns.push_back({to_bytes(format("rc{}", idx)), int32_type});
}
schema_builder builder(make_lw_shared(schema(generate_legacy_id("ks", table_name), "sst3", table_name,
// partition key
{{"pk", int32_type}},
// clustering key
{{"ck", int32_type}},
regular_columns,
// static columns
{},
// regular column name type
utf8_type,
// comment
"SSTable 3.0 format write path - missing columns large set test"
)));
builder.set_compressor_params(compression_parameters());
schema_ptr s = builder.build(schema_builder::compact_storage::no);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
auto key = partition_key::from_deeply_exploded(*s, {0});
api::timestamp_type ts = api::new_timestamp();
mutation mut{s, key};
// INSERT INTO missing_columns_large_set (pk, ck, rc1, ..., rc62) VALUES (0, 0, 1, ..., 62);
// For missing columns, the missing ones will be written as majority are present.
{
clustering_key ckey = clustering_key::from_deeply_exploded(*s, {0});
mut.partition().apply_insert(*s, ckey, ts);
for (auto idx: boost::irange(1, 63)) {
mut.set_cell(ckey, to_bytes(format("rc{}", idx)), data_value{idx}, ts);
}
mt->apply(std::move(mut));
}
ts += 10;
// INSERT INTO missing_columns_large_set (pk, ck, rc63, rc64) VALUES (0, 1, 63, 64);
// For missing columns, the present ones will be written as majority are missing.
{
clustering_key ckey = clustering_key::from_deeply_exploded(*s, {1});
mut.partition().apply_insert(*s, ckey, ts);
mut.set_cell(ckey, to_bytes(format("rc63", 63)), data_value{63}, ts);
mut.set_cell(ckey, to_bytes(format("rc64", 63)), data_value{64}, ts);
mt->apply(std::move(mut));
}
write_and_compare_sstables(s, mt, table_name);
});
}

View File

@@ -239,6 +239,9 @@ struct integer_type_impl : simple_type_impl<T> {
virtual bytes from_json_object(const Json::Value& value, cql_serialization_format sf) const override {
return this->decompose(T(json::to_int64_t(value)));
}
virtual bool is_fixed_length() const override {
return true;
}
};
struct byte_type_impl : integer_type_impl<int8_t> {
@@ -350,6 +353,9 @@ struct string_type_impl : public concrete_type<sstring> {
virtual bytes from_json_object(const Json::Value& value, cql_serialization_format sf) const override {
return from_string(value.asString());
}
virtual bool is_fixed_length() const override {
return false;
}
};
struct ascii_type_impl final : public string_type_impl {
@@ -436,6 +442,9 @@ struct bytes_type_impl final : public concrete_type<bytes> {
// bytesType validate everything, so it is compatible with the former.
return this == &other || &other == ascii_type.get() || &other == utf8_type.get();
}
virtual bool is_fixed_length() const override {
return false;
}
};
struct boolean_type_impl : public simple_type_impl<bool> {
@@ -512,6 +521,9 @@ struct boolean_type_impl : public simple_type_impl<bool> {
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::boolean;
}
virtual bool is_fixed_length() const override {
return true;
}
};
class date_type_impl : public concrete_type<db_clock::time_point> {
@@ -591,6 +603,9 @@ public:
}
return false;
}
virtual bool is_fixed_length() const override {
return true;
}
};
logging::logger date_type_impl::_logger(date_type_name);
@@ -689,6 +704,9 @@ struct timeuuid_type_impl : public concrete_type<utils::UUID> {
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::timeuuid;
}
virtual bool is_fixed_length() const override {
return true;
}
private:
static int compare_bytes(bytes_view o1, bytes_view o2) {
auto compare_pos = [&] (unsigned pos, int mask, int ifequal) {
@@ -880,6 +898,9 @@ public:
}
return false;
}
virtual bool is_fixed_length() const override {
return true;
}
};
logging::logger timestamp_type_impl::_logger(timestamp_type_name);
@@ -975,6 +996,9 @@ struct simple_date_type_impl : public simple_type_impl<uint32_t> {
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::date;
}
virtual bool is_fixed_length() const override {
return true;
}
};
struct time_type_impl : public simple_type_impl<int64_t> {
@@ -1076,6 +1100,9 @@ struct time_type_impl : public simple_type_impl<int64_t> {
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::time;
}
virtual bool is_fixed_length() const override {
return true;
}
};
struct uuid_type_impl : concrete_type<utils::UUID> {
@@ -1167,6 +1194,9 @@ struct uuid_type_impl : concrete_type<utils::UUID> {
virtual bool is_value_compatible_with_internal(const abstract_type& other) const override {
return &other == this || &other == timeuuid_type.get();
}
virtual bool is_fixed_length() const override {
return true;
}
};
using inet_address = seastar::net::inet_address;
@@ -1272,6 +1302,9 @@ struct inet_addr_type_impl : concrete_type<inet_address> {
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::inet;
}
virtual bool is_fixed_length() const override {
return true;
}
};
// Integer of same length of a given type. This is useful because our
@@ -1422,6 +1455,9 @@ struct floating_type_impl : public simple_type_impl<T> {
throw marshal_exception("Only float/double types can be parsed from JSON floating point object");
}
}
virtual bool is_fixed_length() const override {
return true;
}
};
struct double_type_impl : floating_type_impl<double> {
@@ -1559,6 +1595,9 @@ public:
virtual bool is_value_compatible_with_internal(const abstract_type& other) const override {
return &other == this || int32_type->is_value_compatible_with(other) || long_type->is_value_compatible_with(other);
}
virtual bool is_fixed_length() const override {
return false;
}
friend class decimal_type_impl;
};
@@ -1666,6 +1705,9 @@ public:
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::decimal;
}
virtual bool is_fixed_length() const override {
return false;
}
};
class counter_type_impl : public abstract_type {
@@ -1741,6 +1783,9 @@ public:
virtual std::experimental::optional<data_type> update_user_type(const shared_ptr<const user_type_impl> updated) const {
return std::experimental::nullopt;
}
virtual bool is_fixed_length() const override {
fail(unimplemented::cause::COUNTERS);
}
};
// TODO(jhaberku): Move this to Seastar.
@@ -1886,6 +1931,9 @@ public:
virtual bool references_duration() const override {
return true;
}
virtual bool is_fixed_length() const override {
return false;
}
private:
using counter_type = cql_duration::common_counter_type;
@@ -1975,6 +2023,9 @@ struct empty_type_impl : abstract_type {
// Can't happen
abort();
}
virtual bool is_fixed_length() const override {
return true;
}
};

View File

@@ -499,6 +499,7 @@ public:
virtual bool references_duration() const {
return false;
}
virtual bool is_fixed_length() const = 0;
protected:
virtual bool equals(const abstract_type& other) const {
return this == &other;
@@ -845,6 +846,7 @@ public:
return deserialize(v, sf);
}
bytes_opt reserialize(cql_serialization_format from, cql_serialization_format to, bytes_view_opt v) const;
virtual bool is_fixed_length() const override { return false; }
};
using collection_type = shared_ptr<const collection_type_impl>;
@@ -1009,6 +1011,10 @@ public:
static shared_ptr<const reversed_type_impl> get_instance(data_type type) {
return intern::get_instance(std::move(type));
}
virtual bool is_fixed_length() const override {
return _underlying_type->is_fixed_length();
}
protected:
virtual size_t native_value_size() const override;
virtual size_t native_value_alignment() const override;
@@ -1640,6 +1646,7 @@ public:
virtual bool references_user_type(const sstring& keyspace, const bytes& name) const override;
virtual std::experimental::optional<data_type> update_user_type(const shared_ptr<const user_type_impl> updated) const override;
virtual bool references_duration() const override;
virtual bool is_fixed_length() const override { return false; }
private:
bool check_compatibility(const abstract_type& previous, bool (abstract_type::*predicate)(const abstract_type&) const) const;
static sstring make_name(const std::vector<data_type>& types);

View File

@@ -31,6 +31,8 @@
using vint_size_type = bytes::size_type;
static constexpr size_t max_vint_length = 9;
struct unsigned_vint final {
using value_type = uint64_t;