diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 47680ead4b..25a83d1d2b 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -176,7 +176,7 @@ parse(random_access_reader& in, T& i) { template typename std::enable_if_t::value, future<>> write(output_stream& out, T i) { - return write(out, reinterpret_cast::type>(i)); + return write(out, static_cast::type>(i)); } future<> parse(random_access_reader& in, bool& i) { @@ -402,6 +402,24 @@ future<> parse(random_access_reader& in, disk_hash& h) { }); } +template +future<> write(output_stream& out, std::unordered_map& map) { + return do_for_each(map.begin(), map.end(), [&out, &map] (auto val) { + Key key = val.first; + Value value = val.second; + return write(out, key, value); + }); +} + +template +future<> write(output_stream& out, disk_hash& h) { + Size len = 0; + check_truncate_and_assign(len, h.map.size()); + return write(out, len).then([&out, &h] { + return write(out, h.map); + }); +} + future<> parse(random_access_reader& in, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; @@ -470,30 +488,6 @@ future sstable::read_summary_entry(size_t i) { return make_ready_future(_summary.entries[i]); } -future<> parse(random_access_reader& in, struct replay_position& rp) { - return parse(in, rp.segment, rp.position); -} - -future<> parse(random_access_reader& in, estimated_histogram::eh_elem &e) { - return parse(in, e.offset, e.bucket); -} - -future<> parse(random_access_reader& in, estimated_histogram &e) { - return parse(in, e.elements); -} - -future<> parse(random_access_reader& in, streaming_histogram &h) { - return parse(in, h.max_bin_size, h.hash); -} - -future<> parse(random_access_reader& in, validation_metadata& m) { - return parse(in, m.partitioner, m.filter_chance); -} - -future<> parse(random_access_reader& in, compaction_metadata& m) { - return parse(in, m.ancestors, m.cardinality); -} - future<> parse(random_access_reader& in, index_entry& ie) { return parse(in, ie.key, ie.position, ie.promoted_index); } @@ -508,22 +502,9 @@ future<> parse(random_access_reader& in, std::unique_ptr& p) { return parse(in, *static_cast(p.get())); } -future<> parse(random_access_reader& in, stats_metadata& m) { - return parse(in, - m.estimated_row_size, - m.estimated_column_count, - m.position, - m.min_timestamp, - m.max_timestamp, - m.max_local_deletion_time, - m.compression_ratio, - m.estimated_tombstone_drop_time, - m.sstable_level, - m.repaired_at, - m.min_column_names, - m.max_column_names, - m.has_legacy_counter_shards - ); +template +future<> write(output_stream& out, std::unique_ptr& p) { + return write(out, *static_cast(p.get())); } future<> parse(random_access_reader& in, statistics& s) { @@ -546,6 +527,37 @@ future<> parse(random_access_reader& in, statistics& s) { }); } +future<> write(output_stream& out, statistics& s) { + return write(out, s.hash).then([&out, &s] { + struct kv { + metadata_type key; + uint32_t value; + }; + // sort map by file offset value and store the result into a vector. + // this is indeed needed because output stream cannot afford random writes. + auto v = make_shared>(); + v->reserve(s.hash.map.size()); + for (auto val : s.hash.map) { + kv tmp = { val.first, val.second }; + v->push_back(tmp); + } + std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); + return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable { + switch (val.key) { + case metadata_type::Validation: + return write(out, s.contents[val.key]); + case metadata_type::Compaction: + return write(out, s.contents[val.key]); + case metadata_type::Stats: + return write(out, s.contents[val.key]); + default: + sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); + return make_ready_future<>(); + } + }); + }); +} + // This is small enough, and well-defined. Easier to just read it all // at once future<> sstable::read_toc() { @@ -719,6 +731,10 @@ future<> sstable::read_statistics() { return read_simple(); } +future<> sstable::write_statistics() { + return write_simple(); +} + future<> sstable::open_data() { return when_all(engine().open_file_dma(filename(component_type::Index), open_flags::ro), engine().open_file_dma(filename(component_type::Data), open_flags::ro)).then([this] (auto files) { @@ -752,7 +768,9 @@ future<> sstable::load() { future<> sstable::store() { // TODO: write other components as well. - return write_compression().then([this] { + return write_statistics().then([this] { + return write_compression(); + }).then([this] { return write_filter(); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 71f72d422e..57649030a5 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -91,7 +91,10 @@ private: future<> read_summary() { return read_simple(); } + future<> read_statistics(); + future<> write_statistics(); + future<> open_data(); future read_indexes(uint64_t position, uint64_t quantity); diff --git a/sstables/types.hh b/sstables/types.hh index 7f96e61863..0dc5943f71 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -93,19 +93,31 @@ struct estimated_histogram { struct eh_elem { uint64_t offset; uint64_t bucket; + + template + future<> describe_type(Describer f) { return f(offset, bucket); } }; disk_array elements; + + template + future<> describe_type(Describer f) { return f(elements); } }; struct replay_position { uint64_t segment; uint32_t position; + + template + future<> describe_type(Describer f) { return f(segment, position); } }; struct streaming_histogram { uint32_t max_bin_size; disk_hash hash; + + template + future<> describe_type(Describer f) { return f(max_bin_size, hash); } }; struct metadata { @@ -114,11 +126,17 @@ struct metadata { struct validation_metadata : public metadata { disk_string partitioner; double filter_chance; + + template + future<> describe_type(Describer f) { return f(partitioner, filter_chance); } }; struct compaction_metadata : public metadata { disk_array ancestors; disk_array cardinality; + + template + future<> describe_type(Describer f) { return f(ancestors, cardinality); } }; struct la_stats_metadata : public metadata { @@ -135,6 +153,25 @@ struct la_stats_metadata : public metadata { disk_array> min_column_names; disk_array> max_column_names; bool has_legacy_counter_shards; + + template + future<> describe_type(Describer f) { + return f( + estimated_row_size, + estimated_column_count, + position, + min_timestamp, + max_timestamp, + max_local_deletion_time, + compression_ratio, + estimated_tombstone_drop_time, + sstable_level, + repaired_at, + min_column_names, + max_column_names, + has_legacy_counter_shards + ); + } }; using stats_metadata = la_stats_metadata; diff --git a/tests/urchin/sstable_test.cc b/tests/urchin/sstable_test.cc index 591ed562fe..a813bfa3a1 100644 --- a/tests/urchin/sstable_test.cc +++ b/tests/urchin/sstable_test.cc @@ -29,6 +29,14 @@ public: future read_indexes(uint64_t position, uint64_t quantity) { return _sst->read_indexes(position, quantity); } + + future<> read_statistics() { + return _sst->read_statistics(); + } + + statistics& get_statistics() { + return _sst->_statistics; + } }; } @@ -206,16 +214,21 @@ const sstring filename(sstring dir, sstring version, unsigned long generation, s return dir + "/" + version + "-" + to_sstring(generation) + "-" + format + "-" + component; } -static future<> write_sst_info(sstring dir, unsigned long generation) { +static future do_write_sst(sstring dir, unsigned long generation) { auto sst = make_lw_shared(dir, generation, la, big); return sst->load().then([sst, generation] { sst->set_generation(generation + 1); - return sst->store().then([sst] { - return make_ready_future<>(); + auto fut = sst->store(); + return std::move(fut).then([sst = std::move(sst)] { + return make_ready_future(std::move(sst)); }); }); } +static future<> write_sst_info(sstring dir, unsigned long generation) { + return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); }); +} + static future> read_file(sstring file_path) { return engine().open_file_dma(file_path, open_flags::rw).then([] (file f) { @@ -259,6 +272,26 @@ SEASTAR_TEST_CASE(check_filter_func) { return check_component_integrity("Filter.db"); } +SEASTAR_TEST_CASE(check_statistics_func) { + return do_write_sst("tests/urchin/sstables/compressed", 1).then([] (auto sst1) { + auto sst2 = make_lw_shared("tests/urchin/sstables/compressed", 2, la, big); + return sstables::test(sst2).read_statistics().then([sst1, sst2] { + statistics& sst1_s = sstables::test(sst1).get_statistics(); + statistics& sst2_s = sstables::test(sst2).get_statistics(); + + BOOST_REQUIRE(sst1_s.hash.map.size() == sst2_s.hash.map.size()); + BOOST_REQUIRE(sst1_s.contents.size() == sst2_s.contents.size()); + + return do_for_each(sst1_s.hash.map.begin(), sst1_s.hash.map.end(), + [sst1, sst2, &sst1_s, &sst2_s] (auto val) { + BOOST_REQUIRE(val.second == sst2_s.hash.map[val.first]); + return make_ready_future<>(); + }); + // TODO: compare the field contents from both sstables. + }); + }); +} + SEASTAR_TEST_CASE(uncompressed_random_access_read) { return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) { // note: it's important to pass on a shared copy of sstp to prevent its