sstables: extend it to support write of components

By the time being, compression info is the unique component being
written by store(). Changes introduced by this patch are generic,
so as to make it easier writing other components as well.

Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
This commit is contained in:
Raphael S. Carvalho
2015-03-24 18:02:36 -03:00
committed by Avi Kivity
parent b4e805e811
commit daaa1a6dcb
3 changed files with 168 additions and 8 deletions

View File

@@ -127,6 +127,15 @@ static void check_buf_size(temporary_buffer<char>& buf, size_t expected) {
}
}
template <typename T, typename U>
static void check_truncate_and_assign(T& to, const U from) {
static_assert(std::is_integral<T>::value && std::is_integral<U>::value, "T and U must be integral");
if (from >= std::numeric_limits<T>::max()) {
throw std::overflow_error("assigning U to T would cause an overflow");
}
to = from;
}
// Base parser, parses an integer type
template <typename T>
typename std::enable_if_t<std::is_integral<T>::value, void>
@@ -146,16 +155,38 @@ parse(random_access_reader& in, T& i) {
});
}
template <typename T>
typename std::enable_if_t<std::is_integral<T>::value, future<>>
write(output_stream<char>& out, T i) {
auto *nr = reinterpret_cast<const net::packed<T> *>(&i);
i = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&i);
return out.write(p, sizeof(T)).then([&out] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
}
template <typename T>
typename std::enable_if_t<std::is_enum<T>::value, future<>>
parse(random_access_reader& in, T& i) {
return parse(in, reinterpret_cast<typename std::underlying_type<T>::type&>(i));
}
template <typename T>
typename std::enable_if_t<std::is_enum<T>::value, future<>>
write(output_stream<char>& out, T i) {
return write(out, reinterpret_cast<typename std::underlying_type<T>::type>(i));
}
future<> parse(random_access_reader& in, bool& i) {
return parse(in, reinterpret_cast<uint8_t&>(i));
}
future<> write(output_stream<char>& out, bool i) {
return write(out, static_cast<uint8_t>(i));
}
template <typename To, typename From>
static inline To convert(From f) {
static_assert(sizeof(To) == sizeof(From), "Sizes must match");
@@ -178,6 +209,16 @@ future<> parse(random_access_reader& in, double& d) {
});
}
future<> write(output_stream<char>& out, double d) {
auto *nr = reinterpret_cast<const net::packed<unsigned long> *>(&d);
auto tmp = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&tmp);
return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
}
template <typename T>
future<> parse(random_access_reader& in, T& len, sstring& s) {
return in.read_exactly(len).then([&s, len] (auto buf) {
@@ -186,6 +227,13 @@ future<> parse(random_access_reader& in, T& len, sstring& s) {
});
}
future<> write(output_stream<char>& out, sstring& s) {
return out.write(s).then([&out, &s] (...) -> future<> {
// TODO: handle result
return make_ready_future<>();
});
}
// All composite parsers must come after this
template<typename First, typename... Rest>
future<> parse(random_access_reader& in, First& first, Rest&&... rest) {
@@ -194,6 +242,30 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) {
});
}
template<typename First, typename... Rest>
future<> write(output_stream<char>& out, First& first, Rest&&... rest) {
return write(out, first).then([&out, &rest...] {
return write(out, rest...);
});
}
// Intended to be used for a type that describes itself through describe_type().
template <class T>
typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, future<>>
parse(random_access_reader& in, T& t) {
return t.describe_type([&in] (auto&&... what) -> future<> {
return parse(in, what...);
});
}
template <class T>
typename std::enable_if_t<!std::is_integral<T>::value && !std::is_enum<T>::value, future<>>
write(output_stream<char>& out, T& t) {
return t.describe_type([&out] (auto&&... what) -> future<> {
return write(out, what...);
});
}
// For all types that take a size, we provide a template that takes the type
// alone, and another, separate one, that takes a size parameter as well, of
// type Size. This is because although most of the time the size and the data
@@ -208,6 +280,15 @@ future<> parse(random_access_reader& in, disk_string<Size>& s) {
});
}
template <typename Size>
future<> write(output_stream<char>& out, disk_string<Size>& s) {
Size len = 0;
check_truncate_and_assign(len, s.value.size());
return write(out, len).then([&out, &s] {
return write(out, s.value);
});
}
// We cannot simply read the whole array at once, because we don't know its
// full size. We know the number of elements, but if we are talking about
// disk_strings, for instance, we have no idea how much of the stream each
@@ -254,6 +335,45 @@ future<> parse(random_access_reader& in, disk_array<Size, Members>& arr) {
});
}
template <typename Members>
typename std::enable_if_t<!std::is_integral<Members>::value, future<>>
write(output_stream<char>& out, std::vector<Members>& arr) {
auto count = make_lw_shared<size_t>(0);
auto eoarr = [count, &arr] { return *count == arr.size(); };
return do_until(eoarr, [count, &out, &arr] {
return write(out, arr[(*count)++]);
});
}
template <typename Members>
typename std::enable_if_t<std::is_integral<Members>::value, future<>>
write(output_stream<char>& out, std::vector<Members>& arr) {
std::vector<Members> tmp;
tmp.resize(arr.size());
// copy arr into tmp converting each entry into big-endian representation.
auto *nr = reinterpret_cast<const net::packed<Members> *>(arr.data());
for (size_t i = 0; i < arr.size(); i++) {
tmp[i] = net::hton(nr[i]);
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = tmp.size() * sizeof(Members);
return out.write(p, bytes).then([&out] (...) -> future<> {
return make_ready_future<>();
});
}
template <typename Size, typename Members>
future<> write(output_stream<char>& out, disk_array<Size, Members>& arr) {
Size len = 0;
check_truncate_and_assign(len, arr.elements.size());
return write(out, len).then([&out, &arr] {
return write(out, arr.elements);
});
}
template <typename Size, typename Key, typename Value>
future<> parse(random_access_reader& in, Size& len, std::unordered_map<Key, Value>& map) {
auto count = make_lw_shared<Size>();
@@ -282,14 +402,6 @@ future<> parse(random_access_reader& in, disk_hash<Size, Key, Value>& h) {
});
}
future<> parse(random_access_reader& in, option& op) {
return parse(in, op.key, op.value);
}
future<> parse(random_access_reader& in, compression& c) {
return parse(in, c.name, c.options, c.chunk_len, c.data_len, c.offsets);
}
future<> parse(random_access_reader& in, filter& f) {
return parse(in, f.hashes, f.buckets);
}
@@ -566,6 +678,30 @@ future<> sstable::read_simple() {
});
}
template <typename T, sstable::component_type Type, T sstable::* Comptr>
future<> sstable::write_simple() {
auto file_path = filename(Type);
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) {
auto out = make_file_output_stream(make_lw_shared<file>(std::move(f)), 4096);
auto w = make_shared<output_stream<char>>(std::move(out));
auto fut = write(*w, *this.*Comptr);
return fut.then([w] {
return w->flush().then([w] {
return w->close().then([w] {}); // the underlying file is synced here.
});
});
}).then_wrapped([this, file_path] (future<> f) {
try {
f.get();
} catch (std::system_error& e) {
// TODO: handle exception.
}
});
}
future<> sstable::read_compression() {
// FIXME: If there is no compression, we should expect a CRC file to be present.
if (!has_component(sstable::component_type::CompressionInfo)) {
@@ -575,6 +711,14 @@ future<> sstable::read_compression() {
return read_simple<compression, component_type::CompressionInfo, &sstable::_compression>();
}
future<> sstable::write_compression() {
if (!has_component(sstable::component_type::CompressionInfo)) {
return make_ready_future<>();
}
return write_simple<compression, component_type::CompressionInfo, &sstable::_compression>();
}
future<> sstable::read_statistics() {
return read_simple<statistics, component_type::Statistics, &sstable::_statistics>();
}
@@ -611,6 +755,11 @@ future<> sstable::load() {
});
}
future<> sstable::store() {
// TODO: write other components as well.
return write_compression();
}
const bool sstable::has_component(component_type f) {
return _components.count(f);
}

View File

@@ -75,7 +75,11 @@ private:
template <typename T, sstable::component_type Type, T sstable::* Comptr>
future<> read_simple();
template <typename T, sstable::component_type Type, T sstable::* Comptr>
future<> write_simple();
future<> read_compression();
future<> write_compression();
future<> read_filter() {
return read_simple<filter, component_type::Filter, &sstable::_filter>();
}
@@ -113,6 +117,7 @@ public:
return read_indexes(position, quantity);
}
future<> load();
future<> store();
future<summary_entry&> read_summary_entry(size_t i);

View File

@@ -33,6 +33,9 @@ struct disk_hash {
struct option {
disk_string<uint16_t> key;
disk_string<uint16_t> value;
template <typename Describer>
future<> describe_type(Describer f) { return f(key, value); }
};
struct compression {
@@ -41,6 +44,9 @@ struct compression {
uint32_t chunk_len;
uint64_t data_len;
disk_array<uint32_t, uint64_t> offsets;
template <typename Describer>
future<> describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); }
};
struct filter {