diff --git a/sstables/sstables.cc b/sstables/sstables.cc index b06d1d7ed1..d1089afb6c 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -127,6 +127,15 @@ static void check_buf_size(temporary_buffer& buf, size_t expected) { } } +template +static void check_truncate_and_assign(T& to, const U from) { + static_assert(std::is_integral::value && std::is_integral::value, "T and U must be integral"); + if (from >= std::numeric_limits::max()) { + throw std::overflow_error("assigning U to T would cause an overflow"); + } + to = from; +} + // Base parser, parses an integer type template typename std::enable_if_t::value, void> @@ -146,16 +155,38 @@ parse(random_access_reader& in, T& i) { }); } +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, T i) { + auto *nr = reinterpret_cast *>(&i); + i = net::hton(*nr); + auto p = reinterpret_cast(&i); + return out.write(p, sizeof(T)).then([&out] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + template typename std::enable_if_t::value, future<>> parse(random_access_reader& in, T& i) { return parse(in, reinterpret_cast::type&>(i)); } +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, T i) { + return write(out, reinterpret_cast::type>(i)); +} + future<> parse(random_access_reader& in, bool& i) { return parse(in, reinterpret_cast(i)); } +future<> write(output_stream& out, bool i) { + return write(out, static_cast(i)); +} + template static inline To convert(From f) { static_assert(sizeof(To) == sizeof(From), "Sizes must match"); @@ -178,6 +209,16 @@ future<> parse(random_access_reader& in, double& d) { }); } +future<> write(output_stream& out, double d) { + auto *nr = reinterpret_cast *>(&d); + auto tmp = net::hton(*nr); + auto p = reinterpret_cast(&tmp); + return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + template future<> parse(random_access_reader& in, T& len, sstring& s) { return in.read_exactly(len).then([&s, len] (auto buf) { @@ -186,6 +227,13 @@ future<> parse(random_access_reader& in, T& len, sstring& s) { }); } +future<> write(output_stream& out, sstring& s) { + return out.write(s).then([&out, &s] (...) -> future<> { + // TODO: handle result + return make_ready_future<>(); + }); +} + // All composite parsers must come after this template future<> parse(random_access_reader& in, First& first, Rest&&... rest) { @@ -194,6 +242,30 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) { }); } +template +future<> write(output_stream& out, First& first, Rest&&... rest) { + return write(out, first).then([&out, &rest...] { + return write(out, rest...); + }); +} + +// Intended to be used for a type that describes itself through describe_type(). +template +typename std::enable_if_t::value && !std::is_enum::value, future<>> +parse(random_access_reader& in, T& t) { + return t.describe_type([&in] (auto&&... what) -> future<> { + return parse(in, what...); + }); +} + +template +typename std::enable_if_t::value && !std::is_enum::value, future<>> +write(output_stream& out, T& t) { + return t.describe_type([&out] (auto&&... what) -> future<> { + return write(out, what...); + }); +} + // For all types that take a size, we provide a template that takes the type // alone, and another, separate one, that takes a size parameter as well, of // type Size. This is because although most of the time the size and the data @@ -208,6 +280,15 @@ future<> parse(random_access_reader& in, disk_string& s) { }); } +template +future<> write(output_stream& out, disk_string& s) { + Size len = 0; + check_truncate_and_assign(len, s.value.size()); + return write(out, len).then([&out, &s] { + return write(out, s.value); + }); +} + // We cannot simply read the whole array at once, because we don't know its // full size. We know the number of elements, but if we are talking about // disk_strings, for instance, we have no idea how much of the stream each @@ -254,6 +335,45 @@ future<> parse(random_access_reader& in, disk_array& arr) { }); } + +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, std::vector& arr) { + + auto count = make_lw_shared(0); + auto eoarr = [count, &arr] { return *count == arr.size(); }; + + return do_until(eoarr, [count, &out, &arr] { + return write(out, arr[(*count)++]); + }); +} + +template +typename std::enable_if_t::value, future<>> +write(output_stream& out, std::vector& arr) { + std::vector tmp; + tmp.resize(arr.size()); + // copy arr into tmp converting each entry into big-endian representation. + auto *nr = reinterpret_cast *>(arr.data()); + for (size_t i = 0; i < arr.size(); i++) { + tmp[i] = net::hton(nr[i]); + } + auto p = reinterpret_cast(tmp.data()); + auto bytes = tmp.size() * sizeof(Members); + return out.write(p, bytes).then([&out] (...) -> future<> { + return make_ready_future<>(); + }); +} + +template +future<> write(output_stream& out, disk_array& arr) { + Size len = 0; + check_truncate_and_assign(len, arr.elements.size()); + return write(out, len).then([&out, &arr] { + return write(out, arr.elements); + }); +} + template future<> parse(random_access_reader& in, Size& len, std::unordered_map& map) { auto count = make_lw_shared(); @@ -282,14 +402,6 @@ future<> parse(random_access_reader& in, disk_hash& h) { }); } -future<> parse(random_access_reader& in, option& op) { - return parse(in, op.key, op.value); -} - -future<> parse(random_access_reader& in, compression& c) { - return parse(in, c.name, c.options, c.chunk_len, c.data_len, c.offsets); -} - future<> parse(random_access_reader& in, filter& f) { return parse(in, f.hashes, f.buckets); } @@ -566,6 +678,30 @@ future<> sstable::read_simple() { }); } +template +future<> sstable::write_simple() { + + auto file_path = filename(Type); + sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path); + return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) { + + auto out = make_file_output_stream(make_lw_shared(std::move(f)), 4096); + auto w = make_shared>(std::move(out)); + auto fut = write(*w, *this.*Comptr); + return fut.then([w] { + return w->flush().then([w] { + return w->close().then([w] {}); // the underlying file is synced here. + }); + }); + }).then_wrapped([this, file_path] (future<> f) { + try { + f.get(); + } catch (std::system_error& e) { + // TODO: handle exception. + } + }); +} + future<> sstable::read_compression() { // FIXME: If there is no compression, we should expect a CRC file to be present. if (!has_component(sstable::component_type::CompressionInfo)) { @@ -575,6 +711,14 @@ future<> sstable::read_compression() { return read_simple(); } +future<> sstable::write_compression() { + if (!has_component(sstable::component_type::CompressionInfo)) { + return make_ready_future<>(); + } + + return write_simple(); +} + future<> sstable::read_statistics() { return read_simple(); } @@ -611,6 +755,11 @@ future<> sstable::load() { }); } +future<> sstable::store() { + // TODO: write other components as well. + return write_compression(); +} + const bool sstable::has_component(component_type f) { return _components.count(f); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 787955dbed..3eca259267 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -75,7 +75,11 @@ private: template future<> read_simple(); + template + future<> write_simple(); + future<> read_compression(); + future<> write_compression(); future<> read_filter() { return read_simple(); } @@ -113,6 +117,7 @@ public: return read_indexes(position, quantity); } future<> load(); + future<> store(); future read_summary_entry(size_t i); diff --git a/sstables/types.hh b/sstables/types.hh index 53f4381d18..5f45b6a806 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -33,6 +33,9 @@ struct disk_hash { struct option { disk_string key; disk_string value; + + template + future<> describe_type(Describer f) { return f(key, value); } }; struct compression { @@ -41,6 +44,9 @@ struct compression { uint32_t chunk_len; uint64_t data_len; disk_array offsets; + + template + future<> describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); } }; struct filter {