From daaa1a6dcb577189f4ab1c3dbe5e7406aa09f6ec Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 24 Mar 2015 18:02:36 -0300 Subject: [PATCH] sstables: extend it to support write of components By the time being, compression info is the unique component being written by store(). Changes introduced by this patch are generic, so as to make it easier writing other components as well. Signed-off-by: Raphael S. Carvalho --- sstables/sstables.cc | 165 ++++++++++++++++++++++++++++++++++++++++--- sstables/sstables.hh | 5 ++ sstables/types.hh | 6 ++ 3 files changed, 168 insertions(+), 8 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index b06d1d7ed1..d1089afb6c 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -127,6 +127,15 @@ static void check_buf_size(temporary_buffer& buf, size_t expected) { } } +template +static void check_truncate_and_assign(T& to, const U from) { + static_assert(std::is_integral::value && std::is_integral::value, "T and U must be integral"); + if (from >= std::numeric_limits::max()) { + throw std::overflow_error("assigning U to T would cause an overflow"); + } + to = from; +} + // Base parser, parses an integer type template typename std::enable_if_t::value, void> @@ -146,16 +155,38 @@ parse(random_access_reader& in, T& i) { }); } +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, T i) { + auto *nr = reinterpret_cast *>(&i); + i = net::hton(*nr); + auto p = reinterpret_cast(&i); + return out.write(p, sizeof(T)).then([&out] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + template typename std::enable_if_t::value, future<>> parse(random_access_reader& in, T& i) { return parse(in, reinterpret_cast::type&>(i)); } +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, T i) { + return write(out, reinterpret_cast::type>(i)); +} + future<> parse(random_access_reader& in, bool& i) { return parse(in, reinterpret_cast(i)); } +future<> write(output_stream& out, bool i) { + return write(out, static_cast(i)); +} + template static inline To convert(From f) { static_assert(sizeof(To) == sizeof(From), "Sizes must match"); @@ -178,6 +209,16 @@ future<> parse(random_access_reader& in, double& d) { }); } +future<> write(output_stream& out, double d) { + auto *nr = reinterpret_cast *>(&d); + auto tmp = net::hton(*nr); + auto p = reinterpret_cast(&tmp); + return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + template future<> parse(random_access_reader& in, T& len, sstring& s) { return in.read_exactly(len).then([&s, len] (auto buf) { @@ -186,6 +227,13 @@ future<> parse(random_access_reader& in, T& len, sstring& s) { }); } +future<> write(output_stream& out, sstring& s) { + return out.write(s).then([&out, &s] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + // All composite parsers must come after this template future<> parse(random_access_reader& in, First& first, Rest&&... rest) { @@ -194,6 +242,30 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) { }); } +template +future<> write(output_stream& out, First& first, Rest&&... rest) { + return write(out, first).then([&out, &rest...] { + return write(out, rest...); + }); +} + +// Intended to be used for a type that describes itself through describe_type(). +template +typename std::enable_if_t::value && !std::is_enum::value, future<>> +parse(random_access_reader& in, T& t) { + return t.describe_type([&in] (auto&&... what) -> future<> { + return parse(in, what...); + }); +} + +template +typename std::enable_if_t::value && !std::is_enum::value, future<>> +write(output_stream& out, T& t) { + return t.describe_type([&out] (auto&&... what) -> future<> { + return write(out, what...); + }); +} + // For all types that take a size, we provide a template that takes the type // alone, and another, separate one, that takes a size parameter as well, of // type Size. This is because although most of the time the size and the data @@ -208,6 +280,15 @@ future<> parse(random_access_reader& in, disk_string& s) { }); } +template +future<> write(output_stream& out, disk_string& s) { + Size len = 0; + check_truncate_and_assign(len, s.value.size()); + return write(out, len).then([&out, &s] { + return write(out, s.value); + }); +} + // We cannot simply read the whole array at once, because we don't know its // full size. We know the number of elements, but if we are talking about // disk_strings, for instance, we have no idea how much of the stream each @@ -254,6 +335,45 @@ future<> parse(random_access_reader& in, disk_array& arr) { }); } + +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, std::vector& arr) { + + auto count = make_lw_shared(0); + auto eoarr = [count, &arr] { return *count == arr.size(); }; + + return do_until(eoarr, [count, &out, &arr] { + return write(out, arr[(*count)++]); + }); +} + +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, std::vector& arr) { + std::vector tmp; + tmp.resize(arr.size()); + // copy arr into tmp converting each entry into big-endian representation. + auto *nr = reinterpret_cast *>(arr.data()); + for (size_t i = 0; i < arr.size(); i++) { + tmp[i] = net::hton(nr[i]); + } + auto p = reinterpret_cast(tmp.data()); + auto bytes = tmp.size() * sizeof(Members); + return out.write(p, bytes).then([&out] (...) -> future<> { + return make_ready_future<>(); + }); +} + +template +future<> write(output_stream& out, disk_array& arr) { + Size len = 0; + check_truncate_and_assign(len, arr.elements.size()); + return write(out, len).then([&out, &arr] { + return write(out, arr.elements); + }); +} + template future<> parse(random_access_reader& in, Size& len, std::unordered_map& map) { auto count = make_lw_shared(); @@ -282,14 +402,6 @@ future<> parse(random_access_reader& in, disk_hash& h) { }); } -future<> parse(random_access_reader& in, option& op) { - return parse(in, op.key, op.value); -} - -future<> parse(random_access_reader& in, compression& c) { - return parse(in, c.name, c.options, c.chunk_len, c.data_len, c.offsets); -} - future<> parse(random_access_reader& in, filter& f) { return parse(in, f.hashes, f.buckets); } @@ -566,6 +678,30 @@ future<> sstable::read_simple() { }); } +template +future<> sstable::write_simple() { + + auto file_path = filename(Type); + sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path); + return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) { + + auto out = make_file_output_stream(make_lw_shared(std::move(f)), 4096); + auto w = make_shared>(std::move(out)); + auto fut = write(*w, *this.*Comptr); + return fut.then([w] { + return w->flush().then([w] { + return w->close().then([w] {}); // the underlying file is synced here. + }); + }); + }).then_wrapped([this, file_path] (future<> f) { + try { + f.get(); + } catch (std::system_error& e) { + // TODO: handle exception. + } + }); +} + future<> sstable::read_compression() { // FIXME: If there is no compression, we should expect a CRC file to be present. if (!has_component(sstable::component_type::CompressionInfo)) { @@ -575,6 +711,14 @@ future<> sstable::read_compression() { return read_simple(); } +future<> sstable::write_compression() { + if (!has_component(sstable::component_type::CompressionInfo)) { + return make_ready_future<>(); + } + + return write_simple(); +} + future<> sstable::read_statistics() { return read_simple(); } @@ -611,6 +755,11 @@ future<> sstable::load() { }); } +future<> sstable::store() { + // TODO: write other components as well. + return write_compression(); +} + const bool sstable::has_component(component_type f) { return _components.count(f); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 787955dbed..3eca259267 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -75,7 +75,11 @@ private: template future<> read_simple(); + template + future<> write_simple(); + future<> read_compression(); + future<> write_compression(); future<> read_filter() { return read_simple(); } @@ -113,6 +117,7 @@ public: return read_indexes(position, quantity); } future<> load(); + future<> store(); future read_summary_entry(size_t i); diff --git a/sstables/types.hh b/sstables/types.hh index 53f4381d18..5f45b6a806 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -33,6 +33,9 @@ struct disk_hash { struct option { disk_string key; disk_string value; + + template + future<> describe_type(Describer f) { return f(key, value); } }; struct compression { @@ -41,6 +44,9 @@ struct compression { uint32_t chunk_len; uint64_t data_len; disk_array offsets; + + template + future<> describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); } }; struct filter {