From cb84ca8abbc8d83ecff75f3e57224406d300e9ed Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 28 Mar 2018 10:48:21 +0200 Subject: [PATCH] Pass sstable_version_types to parse methods Parsing will depend on the sstable version when we have support for both 2_x and 3_x formats. Signed-off-by: Piotr Jastrzebski --- sstables/sstables.cc | 136 +++++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 68 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5495c1548c..c5a1007af1 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -243,7 +243,7 @@ read_integer(temporary_buffer& buf, T& i) { template typename std::enable_if_t::value, future<>> -parse(random_access_reader& in, T& i) { +parse(sstable_version_types v, random_access_reader& in, T& i) { return in.read_exactly(sizeof(T)).then([&i] (auto buf) { check_buf_size(buf, sizeof(T)); @@ -263,8 +263,8 @@ write(file_writer& out, T i) { template typename std::enable_if_t::value, future<>> -parse(random_access_reader& in, T& i) { - return parse(in, reinterpret_cast::type&>(i)); +parse(sstable_version_types v, random_access_reader& in, T& i) { + return parse(v, in, reinterpret_cast::type&>(i)); } template @@ -273,8 +273,8 @@ write(file_writer& out, T i) { write(out, static_cast::type>(i)); } -future<> parse(random_access_reader& in, bool& i) { - return parse(in, reinterpret_cast(i)); +future<> parse(sstable_version_types v, random_access_reader& in, bool& i) { + return parse(v, in, reinterpret_cast(i)); } inline void write(file_writer& out, bool i) { @@ -293,7 +293,7 @@ static inline To convert(From f) { return conv.to; } -future<> parse(random_access_reader& in, double& d) { +future<> parse(sstable_version_types, random_access_reader& in, double& d) { return in.read_exactly(sizeof(double)).then([&d] (auto buf) { check_buf_size(buf, sizeof(double)); @@ -311,7 +311,7 @@ inline void write(file_writer& out, double d) { } template -future<> parse(random_access_reader& in, T& len, bytes& s) { +future<> parse(sstable_version_types, random_access_reader& in, T& len, bytes& s) { return in.read_exactly(len).then([&s, len] (auto buf) { check_buf_size(buf, len); // Likely a different type of char. Most bufs are unsigned, whereas the bytes type is signed. @@ -335,9 +335,9 @@ inline void write(file_writer& out, bytes_ostream s) { // All composite parsers must come after this template -future<> parse(random_access_reader& in, First& first, Rest&&... rest) { - return parse(in, first).then([&in, &rest...] { - return parse(in, std::forward(rest)...); +future<> parse(sstable_version_types v, random_access_reader& in, First& first, Rest&&... rest) { + return parse(v, in, first).then([v, &in, &rest...] { + return parse(v, in, std::forward(rest)...); }); } @@ -350,9 +350,9 @@ inline void write(file_writer& out, const First& first, Rest&&... rest) { // Intended to be used for a type that describes itself through describe_type(). template typename std::enable_if_t::value && !std::is_enum::value, future<>> -parse(random_access_reader& in, T& t) { - return t.describe_type([&in] (auto&&... what) -> future<> { - return parse(in, what...); +parse(sstable_version_types v, random_access_reader& in, T& t) { + return t.describe_type([v, &in] (auto&&... what) -> future<> { + return parse(v, in, what...); }); } @@ -371,11 +371,11 @@ write(file_writer& out, const T& t) { // are contiguous, it is not always the case. So we want to have the // flexibility of parsing them separately. template -future<> parse(random_access_reader& in, disk_string& s) { +future<> parse(sstable_version_types v, random_access_reader& in, disk_string& s) { auto len = std::make_unique(); - auto f = parse(in, *len); - return f.then([&in, &s, len = std::move(len)] { - return parse(in, *len, s.value); + auto f = parse(v, in, *len); + return f.then([v, &in, &s, len = std::move(len)] { + return parse(v, in, *len, s.value); }); } @@ -404,21 +404,21 @@ inline void write(file_writer& out, const disk_string_view& s) { // We'll offer a specialization for that case below. template typename std::enable_if_t::value, future<>> -parse(random_access_reader& in, Size& len, utils::chunked_vector& arr) { +parse(sstable_version_types v, random_access_reader& in, Size& len, utils::chunked_vector& arr) { auto count = make_lw_shared(0); auto eoarr = [count, len] { return *count == len; }; - return do_until(eoarr, [count, &in, &arr] { + return do_until(eoarr, [v, count, &in, &arr] { arr.emplace_back(); (*count)++; - return parse(in, arr.back()); + return parse(v, in, arr.back()); }); } template typename std::enable_if_t::value, future<>> -parse(random_access_reader& in, Size& len, utils::chunked_vector& arr) { +parse(sstable_version_types, random_access_reader& in, Size& len, utils::chunked_vector& arr) { auto done = make_lw_shared(0); return repeat([&in, &len, &arr, done] { auto now = std::min(len - *done, 100000 / sizeof(Members)); @@ -438,12 +438,12 @@ parse(random_access_reader& in, Size& len, utils::chunked_vector& arr) // We resize the array here, before we pass it to the integer / non-integer // specializations template -future<> parse(random_access_reader& in, disk_array& arr) { +future<> parse(sstable_version_types v, random_access_reader& in, disk_array& arr) { auto len = make_lw_shared(); - auto f = parse(in, *len); - return f.then([&in, &arr, len] { + auto f = parse(v, in, *len); + return f.then([v, &in, &arr, len] { arr.elements.reserve(*len); - return parse(in, *len, arr.elements); + return parse(v, in, *len, arr.elements); }).finally([len] {}); } @@ -493,17 +493,17 @@ inline void write(file_writer& out, const disk_array_ref& arr) { } template -future<> parse(random_access_reader& in, Size& len, std::unordered_map& map) { - return do_with(Size(), [&in, len, &map] (Size& count) { +future<> parse(sstable_version_types v, random_access_reader& in, Size& len, std::unordered_map& map) { + return do_with(Size(), [v, &in, len, &map] (Size& count) { auto eos = [len, &count] { return len == count++; }; - return do_until(eos, [len, &in, &map] { + return do_until(eos, [v, len, &in, &map] { struct kv { Key key; Value value; }; - return do_with(kv(), [&in, &map] (auto& el) { - return parse(in, el.key, el.value).then([&el, &map] { + return do_with(kv(), [v, &in, &map] (auto& el) { + return parse(v, in, el.key, el.value).then([&el, &map] { map.emplace(el.key, el.value); }); }); @@ -512,11 +512,11 @@ future<> parse(random_access_reader& in, Size& len, std::unordered_map -future<> parse(random_access_reader& in, disk_hash& h) { +future<> parse(sstable_version_types v, random_access_reader& in, disk_hash& h) { auto w = std::make_unique(); - auto f = parse(in, *w); - return f.then([&in, &h, w = std::move(w)] { - return parse(in, *w, h.map); + auto f = parse(v, in, *w); + return f.then([v, &in, &h, w = std::move(w)] { + return parse(v, in, *w, h.map); }); } @@ -540,7 +540,7 @@ template struct single_tagged_union_member_serdes { using value_type = typename DiskSetOfTaggedUnion::value_type; virtual ~single_tagged_union_member_serdes() {} - virtual future<> do_parse(random_access_reader& in, value_type& v) const = 0; + virtual future<> do_parse(sstable_version_types version, random_access_reader& in, value_type& v) const = 0; virtual uint32_t do_size(const value_type& v) const = 0; virtual void do_write(file_writer& out, const value_type& v) const = 0; }; @@ -550,9 +550,9 @@ template struct single_tagged_union_member_serdes_for final : single_tagged_union_member_serdes { using base = single_tagged_union_member_serdes; using value_type = typename base::value_type; - virtual future<> do_parse(random_access_reader& in, value_type& v) const { + virtual future<> do_parse(sstable_version_types version, random_access_reader& in, value_type& v) const { v = Member(); - return parse(in, boost::get(v).value); + return parse(version, in, boost::get(v).value); } virtual uint32_t do_size(const value_type& v) const override { return serialized_size(boost::get(v).value); @@ -571,12 +571,12 @@ struct disk_set_of_tagged_union::serdes { serdes_map_type map = { {Members::tag(), make_shared>()}... }; - future<> lookup_and_parse(random_access_reader& in, TagType tag, uint32_t& size, disk_set& s, value_type& value) const { + future<> lookup_and_parse(sstable_version_types v, random_access_reader& in, TagType tag, uint32_t& size, disk_set& s, value_type& value) const { auto i = map.find(tag); if (i == map.end()) { return in.read_exactly(size).discard_result(); } else { - return i->second->do_parse(in, value).then([tag, &s, &value] () mutable { + return i->second->do_parse(v, in, value).then([tag, &s, &value] () mutable { s.data.emplace(tag, std::move(value)); }); } @@ -594,17 +594,17 @@ typename disk_set_of_tagged_union::serdes disk_set_of_tagge template future<> -parse(random_access_reader& in, disk_set_of_tagged_union& s) { +parse(sstable_version_types v, random_access_reader& in, disk_set_of_tagged_union& s) { using disk_set = disk_set_of_tagged_union; using key_type = typename disk_set::key_type; using value_type = typename disk_set::value_type; return do_with(0u, 0u, 0u, value_type{}, [&] (key_type& nr_elements, key_type& new_key, unsigned& new_size, value_type& new_value) { - return parse(in, nr_elements).then([&] { + return parse(v, in, nr_elements).then([&] { auto rng = boost::irange(0, nr_elements); // do_for_each doesn't like an rvalue range return do_for_each(rng.begin(), rng.end(), [&] (key_type ignore) { - return parse(in, new_key).then([&] { - return parse(in, new_size).then([&] { - return disk_set::s_serdes.lookup_and_parse(in, TagType(new_key), new_size, s, new_value); + return parse(v, in, new_key).then([&] { + return parse(v, in, new_size).then([&] { + return disk_set::s_serdes.lookup_and_parse(v, in, TagType(new_key), new_size, s, new_value); }); }); }); @@ -625,14 +625,14 @@ void write(file_writer& out, const disk_set_of_tagged_union } } -future<> parse(random_access_reader& in, summary& s) { +future<> parse(sstable_version_types v, random_access_reader& in, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; - return parse(in, s.header.min_index_interval, + return parse(v, in, s.header.min_index_interval, s.header.size, s.header.memory_size, s.header.sampling_level, - s.header.size_at_full_sampling).then([&in, &s] { + s.header.size_at_full_sampling).then([v, &in, &s] { return in.read_exactly(s.header.size * sizeof(pos_type)).then([&in, &s] (auto buf) { auto len = s.header.size * sizeof(pos_type); check_buf_size(buf, len); @@ -654,9 +654,9 @@ future<> parse(random_access_reader& in, summary& s) { s.positions.push_back(s.header.memory_size); return make_ready_future<>(); }); - }).then([&in, &s] { + }).then([v, &in, &s] { in.seek(sizeof(summary::header) + s.header.memory_size); - return parse(in, s.first_key, s.last_key); + return parse(v, in, s.first_key, s.last_key); }).then([&in, &s] { in.seek(s.positions[0] + sizeof(summary::header)); s.entries.reserve(s.header.size); @@ -723,14 +723,14 @@ future sstable::read_summary_entry(size_t i) { return make_ready_future(_components->summary.entries[i]); } -future<> parse(random_access_reader& in, deletion_time& d) { - return parse(in, d.local_deletion_time, d.marked_for_delete_at); +future<> parse(sstable_version_types v, random_access_reader& in, deletion_time& d) { + return parse(v, in, d.local_deletion_time, d.marked_for_delete_at); } template -future<> parse(random_access_reader& in, std::unique_ptr& p) { +future<> parse(sstable_version_types v, random_access_reader& in, std::unique_ptr& p) { p.reset(new Child); - return parse(in, *static_cast(p.get())); + return parse(v, in, *static_cast(p.get())); } template @@ -738,18 +738,18 @@ inline void write(file_writer& out, const std::unique_ptr& p) { write(out, *static_cast(p.get())); } -future<> parse(random_access_reader& in, statistics& s) { - return parse(in, s.hash).then([&in, &s] { - return do_for_each(s.hash.map.begin(), s.hash.map.end(), [&in, &s] (auto val) mutable { +future<> parse(sstable_version_types v, random_access_reader& in, statistics& s) { + return parse(v, in, s.hash).then([v, &in, &s] { + return do_for_each(s.hash.map.begin(), s.hash.map.end(), [v, &in, &s] (auto val) mutable { in.seek(val.second); switch (val.first) { case metadata_type::Validation: - return parse(in, s.contents[val.first]); + return parse(v, in, s.contents[val.first]); case metadata_type::Compaction: - return parse(in, s.contents[val.first]); + return parse(v, in, s.contents[val.first]); case metadata_type::Stats: - return parse(in, s.contents[val.first]); + return parse(v, in, s.contents[val.first]); default: sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.first)); return make_ready_future<>(); @@ -768,10 +768,10 @@ inline void write(file_writer& out, const statistics& s) { } } -future<> parse(random_access_reader& in, utils::estimated_histogram& eh) { +future<> parse(sstable_version_types v, random_access_reader& in, utils::estimated_histogram& eh) { auto len = std::make_unique(); - auto f = parse(in, *len); + auto f = parse(v, in, *len); return f.then([&in, &eh, len = std::move(len)] { uint32_t length = *len; @@ -839,10 +839,10 @@ struct streaming_histogram_element { auto describe_type(Describer f) { return f(key, value); } }; -future<> parse(random_access_reader& in, utils::streaming_histogram& sh) { +future<> parse(sstable_version_types v, random_access_reader& in, utils::streaming_histogram& sh) { auto a = std::make_unique>(); - auto f = parse(in, sh.max_bin_size, *a); + auto f = parse(v, in, sh.max_bin_size, *a); return f.then([&sh, a = std::move(a)] { auto length = a->elements.size(); if (length > sh.max_bin_size) { @@ -880,16 +880,16 @@ inline void write(file_writer& out, const utils::streaming_histogram& sh) { write(out, max_bin_size, a); } -future<> parse(random_access_reader& in, compression& c) { +future<> parse(sstable_version_types v, random_access_reader& in, compression& c) { auto data_len_ptr = make_lw_shared(0); auto chunk_len_ptr = make_lw_shared(0); - return parse(in, c.name, c.options, *chunk_len_ptr, *data_len_ptr).then([&in, &c, chunk_len_ptr, data_len_ptr] { + return parse(v, in, c.name, c.options, *chunk_len_ptr, *data_len_ptr).then([v, &in, &c, chunk_len_ptr, data_len_ptr] { c.set_uncompressed_chunk_length(*chunk_len_ptr); c.set_uncompressed_file_length(*data_len_ptr); - return do_with(uint32_t(), c.offsets.get_writer(), [&in, &c] (uint32_t& len, compression::segmented_offsets::writer& offsets) { - return parse(in, len).then([&in, &c, &len, &offsets] { + return do_with(uint32_t(), c.offsets.get_writer(), [v, &in, &c] (uint32_t& len, compression::segmented_offsets::writer& offsets) { + return parse(v, in, len).then([&in, &c, &len, &offsets] { auto eoarr = [&c, &len] { return c.offsets.size() == len; }; return do_until(eoarr, [&in, &c, &len, &offsets] () { @@ -1116,7 +1116,7 @@ future<> sstable::read_simple(T& component, const io_priority_class& pc) { return fut.then([this, &component, fi = std::move(fi)] (uint64_t size) { auto f = make_checked_file(_read_error_handler, fi); auto r = make_lw_shared(std::move(f), size, sstable_buffer_size); - auto fut = parse(*r, component); + auto fut = parse(_version, *r, component); return fut.finally([r] { return r->close(); }).then([r] {});