Pass sstable_version_types to parse methods

Parsing will depend on the sstable version when
we have support for both 2_x and 3_x formats.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2018-03-28 10:48:21 +02:00
parent 444b468d46
commit cb84ca8abb

View File

@@ -243,7 +243,7 @@ read_integer(temporary_buffer<char>& buf, T& i) {
template <typename T>
typename std::enable_if_t<std::is_integral<T>::value, future<>>
parse(random_access_reader& in, T& i) {
parse(sstable_version_types v, random_access_reader& in, T& i) {
return in.read_exactly(sizeof(T)).then([&i] (auto buf) {
check_buf_size(buf, sizeof(T));
@@ -263,8 +263,8 @@ write(file_writer& out, T i) {
template <typename T>
typename std::enable_if_t<std::is_enum<T>::value, future<>>
parse(random_access_reader& in, T& i) {
return parse(in, reinterpret_cast<typename std::underlying_type<T>::type&>(i));
parse(sstable_version_types v, random_access_reader& in, T& i) {
return parse(v, in, reinterpret_cast<typename std::underlying_type<T>::type&>(i));
}
template <typename T>
@@ -273,8 +273,8 @@ write(file_writer& out, T i) {
write(out, static_cast<typename std::underlying_type<T>::type>(i));
}
future<> parse(random_access_reader& in, bool& i) {
return parse(in, reinterpret_cast<uint8_t&>(i));
future<> parse(sstable_version_types v, random_access_reader& in, bool& i) {
return parse(v, in, reinterpret_cast<uint8_t&>(i));
}
inline void write(file_writer& out, bool i) {
@@ -293,7 +293,7 @@ static inline To convert(From f) {
return conv.to;
}
future<> parse(random_access_reader& in, double& d) {
future<> parse(sstable_version_types, random_access_reader& in, double& d) {
return in.read_exactly(sizeof(double)).then([&d] (auto buf) {
check_buf_size(buf, sizeof(double));
@@ -311,7 +311,7 @@ inline void write(file_writer& out, double d) {
}
template <typename T>
future<> parse(random_access_reader& in, T& len, bytes& s) {
future<> parse(sstable_version_types, random_access_reader& in, T& len, bytes& s) {
return in.read_exactly(len).then([&s, len] (auto buf) {
check_buf_size(buf, len);
// Likely a different type of char. Most bufs are unsigned, whereas the bytes type is signed.
@@ -335,9 +335,9 @@ inline void write(file_writer& out, bytes_ostream s) {
// All composite parsers must come after this
template<typename First, typename... Rest>
future<> parse(random_access_reader& in, First& first, Rest&&... rest) {
return parse(in, first).then([&in, &rest...] {
return parse(in, std::forward<Rest>(rest)...);
future<> parse(sstable_version_types v, random_access_reader& in, First& first, Rest&&... rest) {
return parse(v, in, first).then([v, &in, &rest...] {
return parse(v, in, std::forward<Rest>(rest)...);
});
}
@@ -350,9 +350,9 @@ inline void write(file_writer& out, const First& first, Rest&&... rest) {
// Intended to be used for a type that describes itself through describe_type().
template <class T>
typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, future<>>
parse(random_access_reader& in, T& t) {
return t.describe_type([&in] (auto&&... what) -> future<> {
return parse(in, what...);
parse(sstable_version_types v, random_access_reader& in, T& t) {
return t.describe_type([v, &in] (auto&&... what) -> future<> {
return parse(v, in, what...);
});
}
@@ -371,11 +371,11 @@ write(file_writer& out, const T& t) {
// are contiguous, it is not always the case. So we want to have the
// flexibility of parsing them separately.
template <typename Size>
future<> parse(random_access_reader& in, disk_string<Size>& s) {
future<> parse(sstable_version_types v, random_access_reader& in, disk_string<Size>& s) {
auto len = std::make_unique<Size>();
auto f = parse(in, *len);
return f.then([&in, &s, len = std::move(len)] {
return parse(in, *len, s.value);
auto f = parse(v, in, *len);
return f.then([v, &in, &s, len = std::move(len)] {
return parse(v, in, *len, s.value);
});
}
@@ -404,21 +404,21 @@ inline void write(file_writer& out, const disk_string_view<Size>& s) {
// We'll offer a specialization for that case below.
template <typename Size, typename Members>
typename std::enable_if_t<!std::is_integral<Members>::value, future<>>
parse(random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
parse(sstable_version_types v, random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
auto count = make_lw_shared<size_t>(0);
auto eoarr = [count, len] { return *count == len; };
return do_until(eoarr, [count, &in, &arr] {
return do_until(eoarr, [v, count, &in, &arr] {
arr.emplace_back();
(*count)++;
return parse(in, arr.back());
return parse(v, in, arr.back());
});
}
template <typename Size, typename Members>
typename std::enable_if_t<std::is_integral<Members>::value, future<>>
parse(random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
parse(sstable_version_types, random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
auto done = make_lw_shared<size_t>(0);
return repeat([&in, &len, &arr, done] {
auto now = std::min(len - *done, 100000 / sizeof(Members));
@@ -438,12 +438,12 @@ parse(random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr)
// We resize the array here, before we pass it to the integer / non-integer
// specializations
template <typename Size, typename Members>
future<> parse(random_access_reader& in, disk_array<Size, Members>& arr) {
future<> parse(sstable_version_types v, random_access_reader& in, disk_array<Size, Members>& arr) {
auto len = make_lw_shared<Size>();
auto f = parse(in, *len);
return f.then([&in, &arr, len] {
auto f = parse(v, in, *len);
return f.then([v, &in, &arr, len] {
arr.elements.reserve(*len);
return parse(in, *len, arr.elements);
return parse(v, in, *len, arr.elements);
}).finally([len] {});
}
@@ -493,17 +493,17 @@ inline void write(file_writer& out, const disk_array_ref<Size, Members>& arr) {
}
template <typename Size, typename Key, typename Value>
future<> parse(random_access_reader& in, Size& len, std::unordered_map<Key, Value>& map) {
return do_with(Size(), [&in, len, &map] (Size& count) {
future<> parse(sstable_version_types v, random_access_reader& in, Size& len, std::unordered_map<Key, Value>& map) {
return do_with(Size(), [v, &in, len, &map] (Size& count) {
auto eos = [len, &count] { return len == count++; };
return do_until(eos, [len, &in, &map] {
return do_until(eos, [v, len, &in, &map] {
struct kv {
Key key;
Value value;
};
return do_with(kv(), [&in, &map] (auto& el) {
return parse(in, el.key, el.value).then([&el, &map] {
return do_with(kv(), [v, &in, &map] (auto& el) {
return parse(v, in, el.key, el.value).then([&el, &map] {
map.emplace(el.key, el.value);
});
});
@@ -512,11 +512,11 @@ future<> parse(random_access_reader& in, Size& len, std::unordered_map<Key, Valu
}
template <typename Size, typename Key, typename Value>
future<> parse(random_access_reader& in, disk_hash<Size, Key, Value>& h) {
future<> parse(sstable_version_types v, random_access_reader& in, disk_hash<Size, Key, Value>& h) {
auto w = std::make_unique<Size>();
auto f = parse(in, *w);
return f.then([&in, &h, w = std::move(w)] {
return parse(in, *w, h.map);
auto f = parse(v, in, *w);
return f.then([v, &in, &h, w = std::move(w)] {
return parse(v, in, *w, h.map);
});
}
@@ -540,7 +540,7 @@ template <typename DiskSetOfTaggedUnion>
struct single_tagged_union_member_serdes {
using value_type = typename DiskSetOfTaggedUnion::value_type;
virtual ~single_tagged_union_member_serdes() {}
virtual future<> do_parse(random_access_reader& in, value_type& v) const = 0;
virtual future<> do_parse(sstable_version_types version, random_access_reader& in, value_type& v) const = 0;
virtual uint32_t do_size(const value_type& v) const = 0;
virtual void do_write(file_writer& out, const value_type& v) const = 0;
};
@@ -550,9 +550,9 @@ template <typename DiskSetOfTaggedUnion, typename Member>
struct single_tagged_union_member_serdes_for final : single_tagged_union_member_serdes<DiskSetOfTaggedUnion> {
using base = single_tagged_union_member_serdes<DiskSetOfTaggedUnion>;
using value_type = typename base::value_type;
virtual future<> do_parse(random_access_reader& in, value_type& v) const {
virtual future<> do_parse(sstable_version_types version, random_access_reader& in, value_type& v) const {
v = Member();
return parse(in, boost::get<Member>(v).value);
return parse(version, in, boost::get<Member>(v).value);
}
virtual uint32_t do_size(const value_type& v) const override {
return serialized_size(boost::get<Member>(v).value);
@@ -571,12 +571,12 @@ struct disk_set_of_tagged_union<TagType, Members...>::serdes {
serdes_map_type map = {
{Members::tag(), make_shared<single_tagged_union_member_serdes_for<disk_set, Members>>()}...
};
future<> lookup_and_parse(random_access_reader& in, TagType tag, uint32_t& size, disk_set& s, value_type& value) const {
future<> lookup_and_parse(sstable_version_types v, random_access_reader& in, TagType tag, uint32_t& size, disk_set& s, value_type& value) const {
auto i = map.find(tag);
if (i == map.end()) {
return in.read_exactly(size).discard_result();
} else {
return i->second->do_parse(in, value).then([tag, &s, &value] () mutable {
return i->second->do_parse(v, in, value).then([tag, &s, &value] () mutable {
s.data.emplace(tag, std::move(value));
});
}
@@ -594,17 +594,17 @@ typename disk_set_of_tagged_union<TagType, Members...>::serdes disk_set_of_tagge
template <typename TagType, typename... Members>
future<>
parse(random_access_reader& in, disk_set_of_tagged_union<TagType, Members...>& s) {
parse(sstable_version_types v, random_access_reader& in, disk_set_of_tagged_union<TagType, Members...>& s) {
using disk_set = disk_set_of_tagged_union<TagType, Members...>;
using key_type = typename disk_set::key_type;
using value_type = typename disk_set::value_type;
return do_with(0u, 0u, 0u, value_type{}, [&] (key_type& nr_elements, key_type& new_key, unsigned& new_size, value_type& new_value) {
return parse(in, nr_elements).then([&] {
return parse(v, in, nr_elements).then([&] {
auto rng = boost::irange<key_type>(0, nr_elements); // do_for_each doesn't like an rvalue range
return do_for_each(rng.begin(), rng.end(), [&] (key_type ignore) {
return parse(in, new_key).then([&] {
return parse(in, new_size).then([&] {
return disk_set::s_serdes.lookup_and_parse(in, TagType(new_key), new_size, s, new_value);
return parse(v, in, new_key).then([&] {
return parse(v, in, new_size).then([&] {
return disk_set::s_serdes.lookup_and_parse(v, in, TagType(new_key), new_size, s, new_value);
});
});
});
@@ -625,14 +625,14 @@ void write(file_writer& out, const disk_set_of_tagged_union<TagType, Members...>
}
}
future<> parse(random_access_reader& in, summary& s) {
future<> parse(sstable_version_types v, random_access_reader& in, summary& s) {
using pos_type = typename decltype(summary::positions)::value_type;
return parse(in, s.header.min_index_interval,
return parse(v, in, s.header.min_index_interval,
s.header.size,
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling).then([&in, &s] {
s.header.size_at_full_sampling).then([v, &in, &s] {
return in.read_exactly(s.header.size * sizeof(pos_type)).then([&in, &s] (auto buf) {
auto len = s.header.size * sizeof(pos_type);
check_buf_size(buf, len);
@@ -654,9 +654,9 @@ future<> parse(random_access_reader& in, summary& s) {
s.positions.push_back(s.header.memory_size);
return make_ready_future<>();
});
}).then([&in, &s] {
}).then([v, &in, &s] {
in.seek(sizeof(summary::header) + s.header.memory_size);
return parse(in, s.first_key, s.last_key);
return parse(v, in, s.first_key, s.last_key);
}).then([&in, &s] {
in.seek(s.positions[0] + sizeof(summary::header));
s.entries.reserve(s.header.size);
@@ -723,14 +723,14 @@ future<summary_entry&> sstable::read_summary_entry(size_t i) {
return make_ready_future<summary_entry&>(_components->summary.entries[i]);
}
future<> parse(random_access_reader& in, deletion_time& d) {
return parse(in, d.local_deletion_time, d.marked_for_delete_at);
future<> parse(sstable_version_types v, random_access_reader& in, deletion_time& d) {
return parse(v, in, d.local_deletion_time, d.marked_for_delete_at);
}
template <typename Child>
future<> parse(random_access_reader& in, std::unique_ptr<metadata>& p) {
future<> parse(sstable_version_types v, random_access_reader& in, std::unique_ptr<metadata>& p) {
p.reset(new Child);
return parse(in, *static_cast<Child *>(p.get()));
return parse(v, in, *static_cast<Child *>(p.get()));
}
template <typename Child>
@@ -738,18 +738,18 @@ inline void write(file_writer& out, const std::unique_ptr<metadata>& p) {
write(out, *static_cast<Child *>(p.get()));
}
future<> parse(random_access_reader& in, statistics& s) {
return parse(in, s.hash).then([&in, &s] {
return do_for_each(s.hash.map.begin(), s.hash.map.end(), [&in, &s] (auto val) mutable {
future<> parse(sstable_version_types v, random_access_reader& in, statistics& s) {
return parse(v, in, s.hash).then([v, &in, &s] {
return do_for_each(s.hash.map.begin(), s.hash.map.end(), [v, &in, &s] (auto val) mutable {
in.seek(val.second);
switch (val.first) {
case metadata_type::Validation:
return parse<validation_metadata>(in, s.contents[val.first]);
return parse<validation_metadata>(v, in, s.contents[val.first]);
case metadata_type::Compaction:
return parse<compaction_metadata>(in, s.contents[val.first]);
return parse<compaction_metadata>(v, in, s.contents[val.first]);
case metadata_type::Stats:
return parse<stats_metadata>(in, s.contents[val.first]);
return parse<stats_metadata>(v, in, s.contents[val.first]);
default:
sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.first));
return make_ready_future<>();
@@ -768,10 +768,10 @@ inline void write(file_writer& out, const statistics& s) {
}
}
future<> parse(random_access_reader& in, utils::estimated_histogram& eh) {
future<> parse(sstable_version_types v, random_access_reader& in, utils::estimated_histogram& eh) {
auto len = std::make_unique<uint32_t>();
auto f = parse(in, *len);
auto f = parse(v, in, *len);
return f.then([&in, &eh, len = std::move(len)] {
uint32_t length = *len;
@@ -839,10 +839,10 @@ struct streaming_histogram_element {
auto describe_type(Describer f) { return f(key, value); }
};
future<> parse(random_access_reader& in, utils::streaming_histogram& sh) {
future<> parse(sstable_version_types v, random_access_reader& in, utils::streaming_histogram& sh) {
auto a = std::make_unique<disk_array<uint32_t, streaming_histogram_element>>();
auto f = parse(in, sh.max_bin_size, *a);
auto f = parse(v, in, sh.max_bin_size, *a);
return f.then([&sh, a = std::move(a)] {
auto length = a->elements.size();
if (length > sh.max_bin_size) {
@@ -880,16 +880,16 @@ inline void write(file_writer& out, const utils::streaming_histogram& sh) {
write(out, max_bin_size, a);
}
future<> parse(random_access_reader& in, compression& c) {
future<> parse(sstable_version_types v, random_access_reader& in, compression& c) {
auto data_len_ptr = make_lw_shared<uint64_t>(0);
auto chunk_len_ptr = make_lw_shared<uint32_t>(0);
return parse(in, c.name, c.options, *chunk_len_ptr, *data_len_ptr).then([&in, &c, chunk_len_ptr, data_len_ptr] {
return parse(v, in, c.name, c.options, *chunk_len_ptr, *data_len_ptr).then([v, &in, &c, chunk_len_ptr, data_len_ptr] {
c.set_uncompressed_chunk_length(*chunk_len_ptr);
c.set_uncompressed_file_length(*data_len_ptr);
return do_with(uint32_t(), c.offsets.get_writer(), [&in, &c] (uint32_t& len, compression::segmented_offsets::writer& offsets) {
return parse(in, len).then([&in, &c, &len, &offsets] {
return do_with(uint32_t(), c.offsets.get_writer(), [v, &in, &c] (uint32_t& len, compression::segmented_offsets::writer& offsets) {
return parse(v, in, len).then([&in, &c, &len, &offsets] {
auto eoarr = [&c, &len] { return c.offsets.size() == len; };
return do_until(eoarr, [&in, &c, &len, &offsets] () {
@@ -1116,7 +1116,7 @@ future<> sstable::read_simple(T& component, const io_priority_class& pc) {
return fut.then([this, &component, fi = std::move(fi)] (uint64_t size) {
auto f = make_checked_file(_read_error_handler, fi);
auto r = make_lw_shared<file_random_access_reader>(std::move(f), size, sstable_buffer_size);
auto fut = parse(*r, component);
auto fut = parse(_version, *r, component);
return fut.finally([r] {
return r->close();
}).then([r] {});