diff --git a/sstables/disk_types.hh b/sstables/disk_types.hh index a58c19fa09..29caaa4c50 100644 --- a/sstables/disk_types.hh +++ b/sstables/disk_types.hh @@ -63,6 +63,20 @@ struct disk_array { utils::chunked_vector elements; }; +// A wrapper struct for integers to be written using variable-length encoding +template +struct vint { + static_assert(std::is_integral_v, "Can only wrap integral types"); + T value; +}; + +// Same as disk_array but with its size serialized as variable-length integer +template +struct disk_array_vint_size { + static_assert(std::is_integral::value, "Length type must be convertible to integer"); + utils::chunked_vector elements; +}; + template struct disk_array_ref { static_assert(std::is_integral::value, "Length type must be convertible to integer"); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 1a80d7231b..200a67f936 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -359,6 +359,11 @@ parse(sstable_version_types v, random_access_reader& in, T& t) { }); } +template +inline void write(sstable_version_types v, file_writer& out, const vint& t) { + write_vint(out, t.value); +} + template inline typename std::enable_if_t::value && !std::is_enum::value, void> write(sstable_version_types v, file_writer& out, const T& t) { @@ -487,6 +492,14 @@ inline void write(sstable_version_types v, file_writer& out, const disk_array +inline void write(sstable_version_types v, file_writer& out, const disk_array_vint_size& arr) { + Size len = 0; + check_truncate_and_assign(len, arr.elements.size()); + write_vint(out, len); + write(v, out, arr.elements); +} + template inline void write(sstable_version_types v, file_writer& out, const disk_array_ref& arr) { Size len = 0; @@ -2056,12 +2069,63 @@ create_sharding_metadata(schema_ptr schema, const dht::decorated_key& first_key, return sm; } +template +static bytes_array_vint_size to_bytes_array_vint_size(const T& t) { + static_assert(sizeof(typename T::value_type) == 1, "Only single-byte char types are allowed"); + bytes_array_vint_size result; + boost::copy(t, std::back_inserter(result.elements)); + return result; +} + +static sstring pk_type_to_string(const schema& s) { + if (s.partition_key_size() == 1) { + return s.partition_key_columns().begin()->type->name(); + } else { + sstring type_params = ::join(",", s.partition_key_columns() + | boost::adaptors::transformed(std::mem_fn(&column_definition::type)) + | boost::adaptors::transformed(std::mem_fn(&abstract_type::name))); + return "org.apache.cassandra.db.marshal.CompositeType(" + type_params + ")"; + } +} + +static serialization_header make_serialization_header(const schema& s, const encoding_stats& enc_stats) { + serialization_header header; + header.min_timestamp.value = enc_stats.min_timestamp - encoding_stats::timestamp_epoch; + header.min_local_deletion_time.value = enc_stats.min_local_deletion_time - encoding_stats::deletion_time_epoch; + header.min_ttl.value = enc_stats.min_ttl - encoding_stats::ttl_epoch; + + header.pk_type_name = to_bytes_array_vint_size(pk_type_to_string(s)); + + header.clustering_key_types_names.elements.reserve(s.clustering_key_size()); + for (const auto& ck_column : s.clustering_key_columns()) { + auto ck_type_name = to_bytes_array_vint_size(ck_column.type->name()); + header.clustering_key_types_names.elements.push_back(std::move(ck_type_name)); + } + + header.static_columns.elements.reserve(s.static_columns_count()); + for (const auto& static_column : s.static_columns()) { + serialization_header::column_desc cd; + cd.name = to_bytes_array_vint_size(static_column.name()); + cd.type_name = to_bytes_array_vint_size(static_column.type->name()); + header.static_columns.elements.push_back(std::move(cd)); + } + + header.regular_columns.elements.reserve(s.regular_columns_count()); + for (const auto& regular_column : s.regular_columns()) { + serialization_header::column_desc cd; + cd.name = to_bytes_array_vint_size(regular_column.name()); + cd.type_name = to_bytes_array_vint_size(regular_column.type->name()); + header.regular_columns.elements.push_back(std::move(cd)); + } + + return header; +} // In the beginning of the statistics file, there is a disk_hash used to // map each metadata type to its correspondent position in the file. static void seal_statistics(sstable_version_types v, statistics& s, metadata_collector& collector, const sstring partitioner, double bloom_filter_fp_chance, schema_ptr schema, - const dht::decorated_key& first_key, const dht::decorated_key& last_key) { + const dht::decorated_key& first_key, const dht::decorated_key& last_key, encoding_stats enc_stats = {}) { validation_metadata validation; compaction_metadata compaction; stats_metadata stats; @@ -2076,6 +2140,11 @@ static void seal_statistics(sstable_version_types v, statistics& s, metadata_col collector.construct_stats(stats); s.contents[metadata_type::Stats] = std::make_unique(std::move(stats)); + if (v == sstable_version_types::mc) { + auto header = make_serialization_header(*schema, enc_stats); + s.contents[metadata_type::Serialization] = std::make_unique(std::move(header)); + } + populate_statistics_offsets(v, s); } @@ -3154,7 +3223,7 @@ void sstable_writer_m::consume_end_of_stream() { _sst.set_first_and_last_keys(); seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(), dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(), - _sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key()); + _sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key(), _enc_stats); _cfg.monitor->on_data_write_completed(); close_data_writer(); _sst.write_summary(_pc); diff --git a/sstables/types.hh b/sstables/types.hh index 2a0c541a8e..a4ffd9e90d 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -372,6 +372,50 @@ struct stats_metadata : public metadata_base { } }; +using bytes_array_vint_size = disk_array_vint_size; + +struct serialization_header : public metadata_base { + vint min_timestamp; + vint min_local_deletion_time; + vint min_ttl; + bytes_array_vint_size pk_type_name; + disk_array_vint_size clustering_key_types_names; + struct column_desc { + bytes_array_vint_size name; + bytes_array_vint_size type_name; + template + auto describe_type(sstable_version_types v, Describer f) { + return f( + name, + type_name + ); + } + }; + disk_array_vint_size static_columns; + disk_array_vint_size regular_columns; + template + auto describe_type(sstable_version_types v, Describer f) { + switch (v) { + case sstable_version_types::mc: + return f( + min_timestamp, + min_local_deletion_time, + min_ttl, + pk_type_name, + clustering_key_types_names, + static_columns, + regular_columns + ); + case sstable_version_types::ka: + case sstable_version_types::la: + throw std::runtime_error( + "Statistics is malformed: SSTable is in 2.x format but contains serialization header."); + } + // Should never reach here - compiler will complain if switch above does not cover all sstable versions + abort(); + } +}; + struct disk_token_bound { uint8_t exclusive; // really a boolean disk_string token;