From 7e56e9fca65cca06fe1178f3547fe6b42e8f136b Mon Sep 17 00:00:00 2001 From: Vladimir Krivopalov Date: Fri, 26 Oct 2018 15:55:52 -0700 Subject: [PATCH] sstables: Re-order columns (atomic first, then collections) for SSTables 3.x. In Cassandra, row columns are stored in a BTree that uses the following ordering on them: - all atomic columns go first, then all multi-cell ones - columns of both types (atomic and multi-cell) are lexicographically ordered by name regarding each other Since schema already has all columns lexicographically sorted by name, we only need to stably partition them by atomicity for that. Fixes #3853 Signed-off-by: Vladimir Krivopalov --- sstables/column_translation.hh | 3 ++ sstables/m_format_write_helpers.cc | 54 ++++++++++++++---------------- sstables/m_format_write_helpers.hh | 12 ++++++- sstables/sstables.cc | 47 ++++++++++++++++++++++++-- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/sstables/column_translation.hh b/sstables/column_translation.hh index d5842b264f..270affb666 100644 --- a/sstables/column_translation.hh +++ b/sstables/column_translation.hh @@ -114,6 +114,9 @@ private: type->is_counter() }); } + if (!is_static) { + boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; }); + } } return cols; } diff --git a/sstables/m_format_write_helpers.cc b/sstables/m_format_write_helpers.cc index 2ace0e297f..9b08e00109 100644 --- a/sstables/m_format_write_helpers.cc +++ b/sstables/m_format_write_helpers.cc @@ -203,11 +203,10 @@ void write_clustering_prefix(file_writer& out, const schema& s, class missing_columns_input_range : public input_range_base { private: - const schema& _schema; - const column_kind _kind; + const indexed_columns& _columns; const row& _row; mutable uint64_t _current_value = 0; - mutable column_id _current_id = 0; + mutable size_t _current_index = 0; mutable bool _large_mode_produced_size = false; enum class encoding_mode { @@ -217,38 +216,35 @@ private: } _mode; public: - missing_columns_input_range(const schema& s, column_kind kind, const row& row) - : _schema(s) - , _kind(kind) + missing_columns_input_range(const indexed_columns& columns, const row& row) + : _columns(columns) , _row(row) { - assert(kind == column_kind::static_column || kind == column_kind::regular_column); - auto row_size = _row.size(); - auto total_size = _schema.columns_count(_kind); + auto total_size = _columns.size(); - _current_id = row_size < total_size ? 0 : total_size; + _current_index = row_size < total_size ? 0 : total_size; _mode = (total_size < 64) ? encoding_mode::small : (row_size < total_size / 2) ? encoding_mode::large_encode_present : encoding_mode::large_encode_missing; } bool next() const { - auto total_size = _schema.columns_count(_kind); - if (_current_id == total_size) { + auto total_size = _columns.size(); + if (_current_index == total_size) { // No more values to encode return false; } if (_mode == encoding_mode::small) { // Set bit for every missing column - for (column_id id = 0; id < total_size; ++id) { - auto cell = _row.find_cell(id); + for (const auto& element: _columns | boost::adaptors::indexed()) { + auto cell = _row.find_cell(element.value().id); if (!cell) { - _current_value |= (uint64_t(1) << id); + _current_value |= (uint64_t(1) << element.index()); } } - _current_id = total_size; + _current_index = total_size; return true; } else { // For either of large modes, output the difference between total size and row size first @@ -259,25 +255,25 @@ public: } if (_mode == encoding_mode::large_encode_present) { - while (_current_id < total_size) { - auto cell = _row.find_cell(_current_id); + while (_current_index < total_size) { + auto cell = _row.find_cell(_columns[_current_index].id); if (cell) { - _current_value = _current_id; - ++_current_id; + _current_value = _current_index; + ++_current_index; return true; } - ++_current_id; + ++_current_index; } } else { assert(_mode == encoding_mode::large_encode_missing); - while (_current_id < total_size) { - auto cell = _row.find_cell(_current_id); + while (_current_index < total_size) { + auto cell = _row.find_cell(_columns[_current_index].id); if (!cell) { - _current_value = _current_id; - ++_current_id; + _current_value = _current_index; + ++_current_index; return true; } - ++_current_id; + ++_current_index; } } } @@ -289,12 +285,12 @@ public: explicit operator bool() const { - return (_current_id < _schema.columns_count(_kind)); + return (_current_index < _columns.size()); } }; -void write_missing_columns(file_writer& out, const schema& s, column_kind kind, const row& row) { - for (const auto value: missing_columns_input_range{s, kind, row}) { +void write_missing_columns(file_writer& out, const indexed_columns& columns, const row& row) { + for (const auto value: missing_columns_input_range{columns, row}) { write_vint(out, value); } } diff --git a/sstables/m_format_write_helpers.hh b/sstables/m_format_write_helpers.hh index ff2945ef7e..e1126ca6ee 100644 --- a/sstables/m_format_write_helpers.hh +++ b/sstables/m_format_write_helpers.hh @@ -37,6 +37,16 @@ namespace sstables { class file_writer; +// This structure is used to store references to column_definitions +// along with their respective column_ids. +// This allows us to do shallow re-ordering without changing the original order in schema. +struct column_definition_indexed_ref { + column_id id; + std::reference_wrapper cdef; +}; + +using indexed_columns = std::vector; + // Utilities for writing integral values in variable-length format // See vint-serialization.hh for more details void write_unsigned_vint(file_writer& out, uint64_t value); @@ -75,7 +85,7 @@ void write_clustering_prefix(file_writer& out, const schema& s, const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full); // Writes encoded information about missing columns in the given row -void write_missing_columns(file_writer& out, const schema& s, column_kind kind, const row& row); +void write_missing_columns(file_writer& out, const indexed_columns& columns, const row& row); // Helper functions for writing delta-encoded time-related values void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 572cc87d0b..5f83446301 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2696,6 +2696,27 @@ GCC6_CONCEPT( }; ) +static indexed_columns get_indexed_regular_columns_partitioned_by_atomicity(const schema& s) { + indexed_columns columns; + columns.reserve(s.regular_columns_count()); + for (const auto& element: s.regular_columns() | boost::adaptors::indexed()) { + columns.push_back({static_cast(element.index()), element.value()}); + } + boost::range::stable_partition( + columns, + [](const column_definition_indexed_ref& column) { return column.cdef.get().is_atomic();}); + return columns; +} + +static indexed_columns get_indexed_static_columns(const schema& s) { + indexed_columns columns; + columns.reserve(s.static_columns_count()); + for (const auto& element: s.static_columns() | boost::adaptors::indexed()) { + columns.push_back({static_cast(element.index()), element.value()}); + } + return columns; +} + // Used for writing SSTables in 'mc' format. class sstable_writer_m : public sstable_writer::writer_impl { private: @@ -2715,6 +2736,21 @@ private: stdx::optional _first_key, _last_key; index_sampling_state _index_sampling_state; range_tombstone_stream _range_tombstones; + // For regular columns, we write all simple columns first followed by collections + // This container has regular columns paritioned by atomicity + const indexed_columns _regular_columns; + // TODO: unlike regular columns, static ones don't need re-ordering because + // they are always all atomic. Perhaps we should do a helper writing missing columns + // that would accept just the schema when writing a static row + const indexed_columns _static_columns; + + struct cdef_and_collection { + const column_definition* cdef; + std::reference_wrapper collection; + }; + + // Used to defer writing collections until all atomic cells are written + std::vector _collections; std::optional _end_open_marker; @@ -2837,6 +2873,8 @@ public: , _enc_stats(enc_stats) , _shard(shard) , _range_tombstones(_schema) + , _regular_columns(get_indexed_regular_columns_partitioned_by_atomicity(s)) + , _static_columns(get_indexed_static_columns(s)) { _sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance()); _sst.write_toc(_pc); @@ -3209,11 +3247,11 @@ void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const // This differs from Origin where all updated columns are tracked and the set of filled columns of a row // is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases // but still valid. - write_missing_columns(writer, _schema, kind, row_body); + write_missing_columns(writer, kind == column_kind::static_column ? _static_columns : _regular_columns, row_body); row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) { auto&& column_definition = _schema.column_at(kind, id); if (!column_definition.is_atomic()) { - write_collection(writer, column_definition, c.as_collection_mutation(), properties, has_complex_deletion); + _collections.push_back({&column_definition, c}); return; } atomic_cell_view cell = c.as_atomic_cell(column_definition); @@ -3221,6 +3259,11 @@ void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const ++_c_stats.column_count; write_cell(writer, cell, column_definition, properties); }); + + for (const auto& col: _collections) { + write_collection(writer, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion); + } + _collections.clear(); } void sstable_writer_m::write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion) {