sstables: Re-order columns (atomic first, then collections) for SSTables 3.x.

In Cassandra, row columns are stored in a BTree that uses the following
ordering on them:
    - all atomic columns go first, then all multi-cell ones
    - columns of both types (atomic and multi-cell) are
      lexicographically ordered by name regarding each other

Since schema already has all columns lexicographically sorted by name,
we only need to stably partition them by atomicity for that.

Fixes #3853

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-10-26 15:55:52 -07:00
parent 210507b867
commit 7e56e9fca6
4 changed files with 84 additions and 32 deletions

View File

@@ -114,6 +114,9 @@ private:
type->is_counter()
});
}
if (!is_static) {
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
}
}
return cols;
}

View File

@@ -203,11 +203,10 @@ void write_clustering_prefix(file_writer& out, const schema& s,
class missing_columns_input_range
: public input_range_base<missing_columns_input_range, uint64_t> {
private:
const schema& _schema;
const column_kind _kind;
const indexed_columns& _columns;
const row& _row;
mutable uint64_t _current_value = 0;
mutable column_id _current_id = 0;
mutable size_t _current_index = 0;
mutable bool _large_mode_produced_size = false;
enum class encoding_mode {
@@ -217,38 +216,35 @@ private:
} _mode;
public:
missing_columns_input_range(const schema& s, column_kind kind, const row& row)
: _schema(s)
, _kind(kind)
missing_columns_input_range(const indexed_columns& columns, const row& row)
: _columns(columns)
, _row(row) {
assert(kind == column_kind::static_column || kind == column_kind::regular_column);
auto row_size = _row.size();
auto total_size = _schema.columns_count(_kind);
auto total_size = _columns.size();
_current_id = row_size < total_size ? 0 : total_size;
_current_index = row_size < total_size ? 0 : total_size;
_mode = (total_size < 64) ? encoding_mode::small :
(row_size < total_size / 2) ? encoding_mode::large_encode_present :
encoding_mode::large_encode_missing;
}
bool next() const {
auto total_size = _schema.columns_count(_kind);
if (_current_id == total_size) {
auto total_size = _columns.size();
if (_current_index == total_size) {
// No more values to encode
return false;
}
if (_mode == encoding_mode::small) {
// Set bit for every missing column
for (column_id id = 0; id < total_size; ++id) {
auto cell = _row.find_cell(id);
for (const auto& element: _columns | boost::adaptors::indexed()) {
auto cell = _row.find_cell(element.value().id);
if (!cell) {
_current_value |= (uint64_t(1) << id);
_current_value |= (uint64_t(1) << element.index());
}
}
_current_id = total_size;
_current_index = total_size;
return true;
} else {
// For either of large modes, output the difference between total size and row size first
@@ -259,25 +255,25 @@ public:
}
if (_mode == encoding_mode::large_encode_present) {
while (_current_id < total_size) {
auto cell = _row.find_cell(_current_id);
while (_current_index < total_size) {
auto cell = _row.find_cell(_columns[_current_index].id);
if (cell) {
_current_value = _current_id;
++_current_id;
_current_value = _current_index;
++_current_index;
return true;
}
++_current_id;
++_current_index;
}
} else {
assert(_mode == encoding_mode::large_encode_missing);
while (_current_id < total_size) {
auto cell = _row.find_cell(_current_id);
while (_current_index < total_size) {
auto cell = _row.find_cell(_columns[_current_index].id);
if (!cell) {
_current_value = _current_id;
++_current_id;
_current_value = _current_index;
++_current_index;
return true;
}
++_current_id;
++_current_index;
}
}
}
@@ -289,12 +285,12 @@ public:
explicit operator bool() const
{
return (_current_id < _schema.columns_count(_kind));
return (_current_index < _columns.size());
}
};
void write_missing_columns(file_writer& out, const schema& s, column_kind kind, const row& row) {
for (const auto value: missing_columns_input_range{s, kind, row}) {
void write_missing_columns(file_writer& out, const indexed_columns& columns, const row& row) {
for (const auto value: missing_columns_input_range{columns, row}) {
write_vint(out, value);
}
}

View File

@@ -37,6 +37,16 @@ namespace sstables {
class file_writer;
// This structure is used to store references to column_definitions
// along with their respective column_ids.
// This allows us to do shallow re-ordering without changing the original order in schema.
struct column_definition_indexed_ref {
column_id id;
std::reference_wrapper<const column_definition> cdef;
};
using indexed_columns = std::vector<column_definition_indexed_ref>;
// Utilities for writing integral values in variable-length format
// See vint-serialization.hh for more details
void write_unsigned_vint(file_writer& out, uint64_t value);
@@ -75,7 +85,7 @@ void write_clustering_prefix(file_writer& out, const schema& s,
const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full);
// Writes encoded information about missing columns in the given row
void write_missing_columns(file_writer& out, const schema& s, column_kind kind, const row& row);
void write_missing_columns(file_writer& out, const indexed_columns& columns, const row& row);
// Helper functions for writing delta-encoded time-related values
void write_delta_timestamp(file_writer& out, api::timestamp_type timestamp, const encoding_stats& enc_stats);

View File

@@ -2696,6 +2696,27 @@ GCC6_CONCEPT(
};
)
static indexed_columns get_indexed_regular_columns_partitioned_by_atomicity(const schema& s) {
indexed_columns columns;
columns.reserve(s.regular_columns_count());
for (const auto& element: s.regular_columns() | boost::adaptors::indexed()) {
columns.push_back({static_cast<column_id>(element.index()), element.value()});
}
boost::range::stable_partition(
columns,
[](const column_definition_indexed_ref& column) { return column.cdef.get().is_atomic();});
return columns;
}
static indexed_columns get_indexed_static_columns(const schema& s) {
indexed_columns columns;
columns.reserve(s.static_columns_count());
for (const auto& element: s.static_columns() | boost::adaptors::indexed()) {
columns.push_back({static_cast<column_id>(element.index()), element.value()});
}
return columns;
}
// Used for writing SSTables in 'mc' format.
class sstable_writer_m : public sstable_writer::writer_impl {
private:
@@ -2715,6 +2736,21 @@ private:
stdx::optional<key> _first_key, _last_key;
index_sampling_state _index_sampling_state;
range_tombstone_stream _range_tombstones;
// For regular columns, we write all simple columns first followed by collections
// This container has regular columns paritioned by atomicity
const indexed_columns _regular_columns;
// TODO: unlike regular columns, static ones don't need re-ordering because
// they are always all atomic. Perhaps we should do a helper writing missing columns
// that would accept just the schema when writing a static row
const indexed_columns _static_columns;
struct cdef_and_collection {
const column_definition* cdef;
std::reference_wrapper<const atomic_cell_or_collection> collection;
};
// Used to defer writing collections until all atomic cells are written
std::vector<cdef_and_collection> _collections;
std::optional<rt_marker> _end_open_marker;
@@ -2837,6 +2873,8 @@ public:
, _enc_stats(enc_stats)
, _shard(shard)
, _range_tombstones(_schema)
, _regular_columns(get_indexed_regular_columns_partitioned_by_atomicity(s))
, _static_columns(get_indexed_static_columns(s))
{
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
_sst.write_toc(_pc);
@@ -3209,11 +3247,11 @@ void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const
// This differs from Origin where all updated columns are tracked and the set of filled columns of a row
// is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases
// but still valid.
write_missing_columns(writer, _schema, kind, row_body);
write_missing_columns(writer, kind == column_kind::static_column ? _static_columns : _regular_columns, row_body);
row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) {
auto&& column_definition = _schema.column_at(kind, id);
if (!column_definition.is_atomic()) {
write_collection(writer, column_definition, c.as_collection_mutation(), properties, has_complex_deletion);
_collections.push_back({&column_definition, c});
return;
}
atomic_cell_view cell = c.as_atomic_cell(column_definition);
@@ -3221,6 +3259,11 @@ void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const
++_c_stats.column_count;
write_cell(writer, cell, column_definition, properties);
});
for (const auto& col: _collections) {
write_collection(writer, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion);
}
_collections.clear();
}
void sstable_writer_m::write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion) {