sstables: add support to write the component statistics
This code adds the ability to write statistics to disk.
On-disk format:
uint32_t Size;
struct {
uint32_t metadata_type;
uint32_t offset; /* offset into this file */
} metadata_metadata[Size];
* each metadata_metadata entry corresponds to a metadata
stored in the file.
Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
This commit is contained in:
committed by
Avi Kivity
parent
6636a751e2
commit
08e5d3ca8b
@@ -176,7 +176,7 @@ parse(random_access_reader& in, T& i) {
|
||||
template <typename T>
|
||||
typename std::enable_if_t<std::is_enum<T>::value, future<>>
|
||||
write(output_stream<char>& out, T i) {
|
||||
return write(out, reinterpret_cast<typename std::underlying_type<T>::type>(i));
|
||||
return write(out, static_cast<typename std::underlying_type<T>::type>(i));
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, bool& i) {
|
||||
@@ -402,6 +402,24 @@ future<> parse(random_access_reader& in, disk_hash<Size, Key, Value>& h) {
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Key, typename Value>
|
||||
future<> write(output_stream<char>& out, std::unordered_map<Key, Value>& map) {
|
||||
return do_for_each(map.begin(), map.end(), [&out, &map] (auto val) {
|
||||
Key key = val.first;
|
||||
Value value = val.second;
|
||||
return write(out, key, value);
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Size, typename Key, typename Value>
|
||||
future<> write(output_stream<char>& out, disk_hash<Size, Key, Value>& h) {
|
||||
Size len = 0;
|
||||
check_truncate_and_assign(len, h.map.size());
|
||||
return write(out, len).then([&out, &h] {
|
||||
return write(out, h.map);
|
||||
});
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, summary& s) {
|
||||
using pos_type = typename decltype(summary::positions)::value_type;
|
||||
|
||||
@@ -470,30 +488,6 @@ future<summary_entry&> sstable::read_summary_entry(size_t i) {
|
||||
return make_ready_future<summary_entry&>(_summary.entries[i]);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, struct replay_position& rp) {
|
||||
return parse(in, rp.segment, rp.position);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, estimated_histogram::eh_elem &e) {
|
||||
return parse(in, e.offset, e.bucket);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, estimated_histogram &e) {
|
||||
return parse(in, e.elements);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, streaming_histogram &h) {
|
||||
return parse(in, h.max_bin_size, h.hash);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, validation_metadata& m) {
|
||||
return parse(in, m.partitioner, m.filter_chance);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, compaction_metadata& m) {
|
||||
return parse(in, m.ancestors, m.cardinality);
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, index_entry& ie) {
|
||||
return parse(in, ie.key, ie.position, ie.promoted_index);
|
||||
}
|
||||
@@ -508,22 +502,9 @@ future<> parse(random_access_reader& in, std::unique_ptr<metadata>& p) {
|
||||
return parse(in, *static_cast<Child *>(p.get()));
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, stats_metadata& m) {
|
||||
return parse(in,
|
||||
m.estimated_row_size,
|
||||
m.estimated_column_count,
|
||||
m.position,
|
||||
m.min_timestamp,
|
||||
m.max_timestamp,
|
||||
m.max_local_deletion_time,
|
||||
m.compression_ratio,
|
||||
m.estimated_tombstone_drop_time,
|
||||
m.sstable_level,
|
||||
m.repaired_at,
|
||||
m.min_column_names,
|
||||
m.max_column_names,
|
||||
m.has_legacy_counter_shards
|
||||
);
|
||||
template <typename Child>
|
||||
future<> write(output_stream<char>& out, std::unique_ptr<metadata>& p) {
|
||||
return write(out, *static_cast<Child *>(p.get()));
|
||||
}
|
||||
|
||||
future<> parse(random_access_reader& in, statistics& s) {
|
||||
@@ -546,6 +527,37 @@ future<> parse(random_access_reader& in, statistics& s) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> write(output_stream<char>& out, statistics& s) {
|
||||
return write(out, s.hash).then([&out, &s] {
|
||||
struct kv {
|
||||
metadata_type key;
|
||||
uint32_t value;
|
||||
};
|
||||
// sort map by file offset value and store the result into a vector.
|
||||
// this is indeed needed because output stream cannot afford random writes.
|
||||
auto v = make_shared<std::vector<kv>>();
|
||||
v->reserve(s.hash.map.size());
|
||||
for (auto val : s.hash.map) {
|
||||
kv tmp = { val.first, val.second };
|
||||
v->push_back(tmp);
|
||||
}
|
||||
std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; });
|
||||
return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable {
|
||||
switch (val.key) {
|
||||
case metadata_type::Validation:
|
||||
return write<validation_metadata>(out, s.contents[val.key]);
|
||||
case metadata_type::Compaction:
|
||||
return write<compaction_metadata>(out, s.contents[val.key]);
|
||||
case metadata_type::Stats:
|
||||
return write<stats_metadata>(out, s.contents[val.key]);
|
||||
default:
|
||||
sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// This is small enough, and well-defined. Easier to just read it all
|
||||
// at once
|
||||
future<> sstable::read_toc() {
|
||||
@@ -719,6 +731,10 @@ future<> sstable::read_statistics() {
|
||||
return read_simple<statistics, component_type::Statistics, &sstable::_statistics>();
|
||||
}
|
||||
|
||||
future<> sstable::write_statistics() {
|
||||
return write_simple<statistics, component_type::Statistics, &sstable::_statistics>();
|
||||
}
|
||||
|
||||
future<> sstable::open_data() {
|
||||
return when_all(engine().open_file_dma(filename(component_type::Index), open_flags::ro),
|
||||
engine().open_file_dma(filename(component_type::Data), open_flags::ro)).then([this] (auto files) {
|
||||
@@ -752,7 +768,9 @@ future<> sstable::load() {
|
||||
|
||||
future<> sstable::store() {
|
||||
// TODO: write other components as well.
|
||||
return write_compression().then([this] {
|
||||
return write_statistics().then([this] {
|
||||
return write_compression();
|
||||
}).then([this] {
|
||||
return write_filter();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -91,7 +91,10 @@ private:
|
||||
future<> read_summary() {
|
||||
return read_simple<summary, component_type::Summary, &sstable::_summary>();
|
||||
}
|
||||
|
||||
future<> read_statistics();
|
||||
future<> write_statistics();
|
||||
|
||||
future<> open_data();
|
||||
|
||||
future<index_list> read_indexes(uint64_t position, uint64_t quantity);
|
||||
|
||||
@@ -93,19 +93,31 @@ struct estimated_histogram {
|
||||
struct eh_elem {
|
||||
uint64_t offset;
|
||||
uint64_t bucket;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(offset, bucket); }
|
||||
};
|
||||
|
||||
disk_array<uint32_t, eh_elem> elements;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(elements); }
|
||||
};
|
||||
|
||||
struct replay_position {
|
||||
uint64_t segment;
|
||||
uint32_t position;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(segment, position); }
|
||||
};
|
||||
|
||||
struct streaming_histogram {
|
||||
uint32_t max_bin_size;
|
||||
disk_hash<uint32_t, double, uint64_t> hash;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(max_bin_size, hash); }
|
||||
};
|
||||
|
||||
struct metadata {
|
||||
@@ -114,11 +126,17 @@ struct metadata {
|
||||
struct validation_metadata : public metadata {
|
||||
disk_string<uint16_t> partitioner;
|
||||
double filter_chance;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(partitioner, filter_chance); }
|
||||
};
|
||||
|
||||
struct compaction_metadata : public metadata {
|
||||
disk_array<uint32_t, uint32_t> ancestors;
|
||||
disk_array<uint32_t, uint8_t> cardinality;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) { return f(ancestors, cardinality); }
|
||||
};
|
||||
|
||||
struct la_stats_metadata : public metadata {
|
||||
@@ -135,6 +153,25 @@ struct la_stats_metadata : public metadata {
|
||||
disk_array<uint32_t, disk_string<uint16_t>> min_column_names;
|
||||
disk_array<uint32_t, disk_string<uint16_t>> max_column_names;
|
||||
bool has_legacy_counter_shards;
|
||||
|
||||
template <typename Describer>
|
||||
future<> describe_type(Describer f) {
|
||||
return f(
|
||||
estimated_row_size,
|
||||
estimated_column_count,
|
||||
position,
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
max_local_deletion_time,
|
||||
compression_ratio,
|
||||
estimated_tombstone_drop_time,
|
||||
sstable_level,
|
||||
repaired_at,
|
||||
min_column_names,
|
||||
max_column_names,
|
||||
has_legacy_counter_shards
|
||||
);
|
||||
}
|
||||
};
|
||||
using stats_metadata = la_stats_metadata;
|
||||
|
||||
|
||||
@@ -29,6 +29,14 @@ public:
|
||||
future<index_list> read_indexes(uint64_t position, uint64_t quantity) {
|
||||
return _sst->read_indexes(position, quantity);
|
||||
}
|
||||
|
||||
future<> read_statistics() {
|
||||
return _sst->read_statistics();
|
||||
}
|
||||
|
||||
statistics& get_statistics() {
|
||||
return _sst->_statistics;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -206,16 +214,21 @@ const sstring filename(sstring dir, sstring version, unsigned long generation, s
|
||||
return dir + "/" + version + "-" + to_sstring(generation) + "-" + format + "-" + component;
|
||||
}
|
||||
|
||||
static future<> write_sst_info(sstring dir, unsigned long generation) {
|
||||
static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>(dir, generation, la, big);
|
||||
return sst->load().then([sst, generation] {
|
||||
sst->set_generation(generation + 1);
|
||||
return sst->store().then([sst] {
|
||||
return make_ready_future<>();
|
||||
auto fut = sst->store();
|
||||
return std::move(fut).then([sst = std::move(sst)] {
|
||||
return make_ready_future<sstable_ptr>(std::move(sst));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> write_sst_info(sstring dir, unsigned long generation) {
|
||||
return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
|
||||
}
|
||||
|
||||
static future<std::pair<char*, size_t>> read_file(sstring file_path)
|
||||
{
|
||||
return engine().open_file_dma(file_path, open_flags::rw).then([] (file f) {
|
||||
@@ -259,6 +272,26 @@ SEASTAR_TEST_CASE(check_filter_func) {
|
||||
return check_component_integrity("Filter.db");
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_statistics_func) {
|
||||
return do_write_sst("tests/urchin/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("tests/urchin/sstables/compressed", 2, la, big);
|
||||
return sstables::test(sst2).read_statistics().then([sst1, sst2] {
|
||||
statistics& sst1_s = sstables::test(sst1).get_statistics();
|
||||
statistics& sst2_s = sstables::test(sst2).get_statistics();
|
||||
|
||||
BOOST_REQUIRE(sst1_s.hash.map.size() == sst2_s.hash.map.size());
|
||||
BOOST_REQUIRE(sst1_s.contents.size() == sst2_s.contents.size());
|
||||
|
||||
return do_for_each(sst1_s.hash.map.begin(), sst1_s.hash.map.end(),
|
||||
[sst1, sst2, &sst1_s, &sst2_s] (auto val) {
|
||||
BOOST_REQUIRE(val.second == sst2_s.hash.map[val.first]);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
// TODO: compare the field contents from both sstables.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
|
||||
return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) {
|
||||
// note: it's important to pass on a shared copy of sstp to prevent its
|
||||
|
||||
Reference in New Issue
Block a user