Write serialization header to Statistics.db for SSTables 3.x.

Serialization header is a new components in Statistics.db introduced in
SSTables 3.0 ('ma') format. It is essential for reading data file as it
contains the base values used for delta-encoded values (timestamps,
TTLs, local deletion times) and description of column types.

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-05-01 19:22:24 -07:00
parent 6e4601d177
commit 5db6002720
3 changed files with 129 additions and 2 deletions

View File

@@ -63,6 +63,20 @@ struct disk_array {
utils::chunked_vector<Members> elements;
};
// A wrapper struct for integers to be written using variable-length encoding
template <typename T>
struct vint {
static_assert(std::is_integral_v<T>, "Can only wrap integral types");
T value;
};
// Same as disk_array but with its size serialized as variable-length integer
template <typename Size, typename Members>
struct disk_array_vint_size {
static_assert(std::is_integral<Size>::value, "Length type must be convertible to integer");
utils::chunked_vector<Members> elements;
};
template <typename Size, typename Members>
struct disk_array_ref {
static_assert(std::is_integral<Size>::value, "Length type must be convertible to integer");

View File

@@ -359,6 +359,11 @@ parse(sstable_version_types v, random_access_reader& in, T& t) {
});
}
template <class T>
inline void write(sstable_version_types v, file_writer& out, const vint<T>& t) {
write_vint(out, t.value);
}
template <class T>
inline typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, void>
write(sstable_version_types v, file_writer& out, const T& t) {
@@ -487,6 +492,14 @@ inline void write(sstable_version_types v, file_writer& out, const disk_array<Si
write(v, out, arr.elements);
}
template <typename Size, typename Members>
inline void write(sstable_version_types v, file_writer& out, const disk_array_vint_size<Size, Members>& arr) {
Size len = 0;
check_truncate_and_assign(len, arr.elements.size());
write_vint(out, len);
write(v, out, arr.elements);
}
template <typename Size, typename Members>
inline void write(sstable_version_types v, file_writer& out, const disk_array_ref<Size, Members>& arr) {
Size len = 0;
@@ -2056,12 +2069,63 @@ create_sharding_metadata(schema_ptr schema, const dht::decorated_key& first_key,
return sm;
}
template <typename T>
static bytes_array_vint_size to_bytes_array_vint_size(const T& t) {
static_assert(sizeof(typename T::value_type) == 1, "Only single-byte char types are allowed");
bytes_array_vint_size result;
boost::copy(t, std::back_inserter(result.elements));
return result;
}
static sstring pk_type_to_string(const schema& s) {
if (s.partition_key_size() == 1) {
return s.partition_key_columns().begin()->type->name();
} else {
sstring type_params = ::join(",", s.partition_key_columns()
| boost::adaptors::transformed(std::mem_fn(&column_definition::type))
| boost::adaptors::transformed(std::mem_fn(&abstract_type::name)));
return "org.apache.cassandra.db.marshal.CompositeType(" + type_params + ")";
}
}
static serialization_header make_serialization_header(const schema& s, const encoding_stats& enc_stats) {
serialization_header header;
header.min_timestamp.value = enc_stats.min_timestamp - encoding_stats::timestamp_epoch;
header.min_local_deletion_time.value = enc_stats.min_local_deletion_time - encoding_stats::deletion_time_epoch;
header.min_ttl.value = enc_stats.min_ttl - encoding_stats::ttl_epoch;
header.pk_type_name = to_bytes_array_vint_size(pk_type_to_string(s));
header.clustering_key_types_names.elements.reserve(s.clustering_key_size());
for (const auto& ck_column : s.clustering_key_columns()) {
auto ck_type_name = to_bytes_array_vint_size(ck_column.type->name());
header.clustering_key_types_names.elements.push_back(std::move(ck_type_name));
}
header.static_columns.elements.reserve(s.static_columns_count());
for (const auto& static_column : s.static_columns()) {
serialization_header::column_desc cd;
cd.name = to_bytes_array_vint_size(static_column.name());
cd.type_name = to_bytes_array_vint_size(static_column.type->name());
header.static_columns.elements.push_back(std::move(cd));
}
header.regular_columns.elements.reserve(s.regular_columns_count());
for (const auto& regular_column : s.regular_columns()) {
serialization_header::column_desc cd;
cd.name = to_bytes_array_vint_size(regular_column.name());
cd.type_name = to_bytes_array_vint_size(regular_column.type->name());
header.regular_columns.elements.push_back(std::move(cd));
}
return header;
}
// In the beginning of the statistics file, there is a disk_hash used to
// map each metadata type to its correspondent position in the file.
static void seal_statistics(sstable_version_types v, statistics& s, metadata_collector& collector,
const sstring partitioner, double bloom_filter_fp_chance, schema_ptr schema,
const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
const dht::decorated_key& first_key, const dht::decorated_key& last_key, encoding_stats enc_stats = {}) {
validation_metadata validation;
compaction_metadata compaction;
stats_metadata stats;
@@ -2076,6 +2140,11 @@ static void seal_statistics(sstable_version_types v, statistics& s, metadata_col
collector.construct_stats(stats);
s.contents[metadata_type::Stats] = std::make_unique<stats_metadata>(std::move(stats));
if (v == sstable_version_types::mc) {
auto header = make_serialization_header(*schema, enc_stats);
s.contents[metadata_type::Serialization] = std::make_unique<serialization_header>(std::move(header));
}
populate_statistics_offsets(v, s);
}
@@ -3154,7 +3223,7 @@ void sstable_writer_m::consume_end_of_stream() {
_sst.set_first_and_last_keys();
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(),
dht::global_partitioner().name(), _schema.bloom_filter_fp_chance(),
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key());
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key(), _enc_stats);
_cfg.monitor->on_data_write_completed();
close_data_writer();
_sst.write_summary(_pc);

View File

@@ -372,6 +372,50 @@ struct stats_metadata : public metadata_base<stats_metadata> {
}
};
using bytes_array_vint_size = disk_array_vint_size<uint32_t, bytes::value_type>;
struct serialization_header : public metadata_base<serialization_header> {
vint<uint64_t> min_timestamp;
vint<uint32_t> min_local_deletion_time;
vint<uint32_t> min_ttl;
bytes_array_vint_size pk_type_name;
disk_array_vint_size<uint32_t, bytes_array_vint_size> clustering_key_types_names;
struct column_desc {
bytes_array_vint_size name;
bytes_array_vint_size type_name;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) {
return f(
name,
type_name
);
}
};
disk_array_vint_size<uint32_t, column_desc> static_columns;
disk_array_vint_size<uint32_t, column_desc> regular_columns;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) {
switch (v) {
case sstable_version_types::mc:
return f(
min_timestamp,
min_local_deletion_time,
min_ttl,
pk_type_name,
clustering_key_types_names,
static_columns,
regular_columns
);
case sstable_version_types::ka:
case sstable_version_types::la:
throw std::runtime_error(
"Statistics is malformed: SSTable is in 2.x format but contains serialization header.");
}
// Should never reach here - compiler will complain if switch above does not cover all sstable versions
abort();
}
};
struct disk_token_bound {
uint8_t exclusive; // really a boolean
disk_string<uint16_t> token;