From a076ea563bb12ba8bc59a803ccb7c6f4d7331b47 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Jun 2015 12:39:45 +0300 Subject: [PATCH] rework write functions Up until now, we were still generating one future per element that we write. Now that we have new infrastructure, we can avoid that, and generate only the ones we really need to. This has the added advantage of lifting the need to do lambda captures and allowing for a more straightfoward forwarding of rest... parameters Signed-off-by: Glauber Costa --- sstables/sstables.cc | 280 ++++++++++++++++++++----------------------- 1 file changed, 127 insertions(+), 153 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 8b38a0b33c..28ac6a5e4b 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -158,15 +158,12 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { auto *nr = reinterpret_cast *>(&i); i = net::hton(*nr); auto p = reinterpret_cast(&i); - return out.write(p, sizeof(T)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(T)).get(); } template @@ -176,17 +173,17 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { - return write(out, static_cast::type>(i)); + write(out, static_cast::type>(i)); } future<> parse(random_access_reader& in, bool& i) { return parse(in, reinterpret_cast(i)); } -future<> write(file_writer& out, bool i) { - return write(out, static_cast(i)); +inline void write(file_writer& out, bool i) { + write(out, static_cast(i)); } template @@ -211,14 +208,11 @@ future<> parse(random_access_reader& in, double& d) { }); } -future<> write(file_writer& out, double d) { +inline void write(file_writer& out, double d) { auto *nr = reinterpret_cast *>(&d); auto tmp = net::hton(*nr); auto p = reinterpret_cast(&tmp); - return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(unsigned long)).get(); } template @@ -230,15 +224,12 @@ future<> parse(random_access_reader& in, T& len, bytes& s) { }); } -future<> write(file_writer& out, bytes& s) { - return out.write(s).then([&out, &s] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); +inline void write(file_writer& out, bytes& s) { + out.write(s).get(); } -future<> write(file_writer& out, bytes_view s) { - return out.write(reinterpret_cast(s.data()), s.size()); +inline void write(file_writer& out, bytes_view s) { + out.write(reinterpret_cast(s.data()), s.size()).get(); } // All composite parsers must come after this @@ -250,10 +241,9 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) { } template -future<> write(file_writer& out, First& first, Rest&&... rest) { - return write(out, first).then([&out, &rest...] { - return write(out, rest...); - }); +inline void write(file_writer& out, First& first, Rest&&... rest) { + write(out, first); + write(out, std::forward(rest)...); } // Intended to be used for a type that describes itself through describe_type(). @@ -266,10 +256,10 @@ parse(random_access_reader& in, T& t) { } template -typename std::enable_if_t::value && !std::is_enum::value, future<>> +inline typename std::enable_if_t::value && !std::is_enum::value, void> write(file_writer& out, T& t) { - return t.describe_type([&out] (auto&&... what) -> future<> { - return write(out, what...); + t.describe_type([&out] (auto&&... what) -> void { + write(out, std::forward(what)...); }); } @@ -288,19 +278,18 @@ future<> parse(random_access_reader& in, disk_string& s) { } template -future<> write(file_writer& out, disk_string& s) { +inline void write(file_writer& out, disk_string& s) { Size len = 0; check_truncate_and_assign(len, s.value.size()); - return write(out, len).then([&out, &s] { - return write(out, s.value); - }); + write(out, len); + write(out, s.value); } template -future<> write(file_writer& out, disk_string_view& s) { +inline void write(file_writer& out, disk_string_view& s) { Size len; check_truncate_and_assign(len, s.value.size()); - return write(out, len, s.value); + write(out, len, s.value); } // We cannot simply read the whole array at once, because we don't know its @@ -350,42 +339,34 @@ future<> parse(random_access_reader& in, disk_array& arr) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - - auto count = make_lw_shared(0); - auto eoarr = [count, &arr] { return *count == arr.size(); }; - - return do_until(eoarr, [count, &out, &arr] { - return write(out, arr[(*count)++]); - }); + for (auto& a : arr) { + write(out, a); + } } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - return do_with(std::vector(), [&out, &arr] (auto& tmp) { - tmp.resize(arr.size()); - // copy arr into tmp converting each entry into big-endian representation. - auto *nr = reinterpret_cast *>(arr.data()); - for (size_t i = 0; i < arr.size(); i++) { - tmp[i] = net::hton(nr[i]); - } - auto p = reinterpret_cast(tmp.data()); - auto bytes = tmp.size() * sizeof(Members); - return out.write(p, bytes).then([&out] (...) -> future<> { - return make_ready_future<>(); - }); - }); + std::vector tmp; + tmp.resize(arr.size()); + // copy arr into tmp converting each entry into big-endian representation. + auto *nr = reinterpret_cast *>(arr.data()); + for (size_t i = 0; i < arr.size(); i++) { + tmp[i] = net::hton(nr[i]); + } + auto p = reinterpret_cast(tmp.data()); + auto bytes = tmp.size() * sizeof(Members); + out.write(p, bytes).get(); } template -future<> write(file_writer& out, disk_array& arr) { +inline void write(file_writer& out, disk_array& arr) { Size len = 0; check_truncate_and_assign(len, arr.elements.size()); - return write(out, len).then([&out, &arr] { - return write(out, arr.elements); - }); + write(out, len); + write(out, arr.elements); } template @@ -417,19 +398,18 @@ future<> parse(random_access_reader& in, disk_hash& h) { } template -future<> write(file_writer& out, std::unordered_map& map) { - return do_for_each(map.begin(), map.end(), [&out, &map] (auto& val) { - return write(out, val.first, val.second); - }); +inline void write(file_writer& out, std::unordered_map& map) { + for (auto& val: map) { + write(out, val.first, val.second); + }; } template -future<> write(file_writer& out, disk_hash& h) { +inline void write(file_writer& out, disk_hash& h) { Size len = 0; check_truncate_and_assign(len, h.map.size()); - return write(out, len).then([&out, &h] { - return write(out, h.map); - }); + write(out, len); + write(out, h.map); } future<> parse(random_access_reader& in, summary& s) { @@ -491,33 +471,28 @@ future<> parse(random_access_reader& in, summary& s) { }); } -future<> write(file_writer& out, summary_entry& entry) -{ +inline void write(file_writer& out, summary_entry& entry) { // FIXME: summary entry is supposedly written in memory order, but that // would prevent portability of summary file between machines of different // endianness. We can treat it as little endian to preserve portability. - return write(out, entry.key).then([&out, &entry] { - auto p = reinterpret_cast(&entry.position); - return out.write(p, sizeof(uint64_t)); - }); + write(out, entry.key); + auto p = reinterpret_cast(&entry.position); + out.write(p, sizeof(uint64_t)).get(); } -future<> write(file_writer& out, summary& s) { +inline void write(file_writer& out, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; // NOTE: positions and entries must be stored in NATIVE BYTE ORDER, not BIG-ENDIAN. - return write(out, s.header.min_index_interval, - s.header.size, - s.header.memory_size, - s.header.sampling_level, - s.header.size_at_full_sampling).then([&out, &s] { - auto p = reinterpret_cast(s.positions.data()); - return out.write(p, sizeof(pos_type) * s.positions.size()); - }).then([&out, &s] { - return write(out, s.entries); - }).then([&out, &s] { - return write(out, s.first_key, s.last_key); - }); + write(out, s.header.min_index_interval, + s.header.size, + s.header.memory_size, + s.header.sampling_level, + s.header.size_at_full_sampling); + auto p = reinterpret_cast(s.positions.data()); + out.write(p, sizeof(pos_type) * s.positions.size()).get(); + write(out, s.entries); + write(out, s.first_key, s.last_key); } future sstable::read_summary_entry(size_t i) { @@ -544,8 +519,8 @@ future<> parse(random_access_reader& in, std::unique_ptr& p) { } template -future<> write(file_writer& out, std::unique_ptr& p) { - return write(out, *static_cast(p.get())); +inline void write(file_writer& out, std::unique_ptr& p) { + write(out, *static_cast(p.get())); } future<> parse(random_access_reader& in, statistics& s) { @@ -568,35 +543,37 @@ future<> parse(random_access_reader& in, statistics& s) { }); } -future<> write(file_writer& out, statistics& s) { - return write(out, s.hash).then([&out, &s] { - struct kv { - metadata_type key; - uint32_t value; - }; - // sort map by file offset value and store the result into a vector. - // this is indeed needed because output stream cannot afford random writes. - auto v = make_shared>(); - v->reserve(s.hash.map.size()); - for (auto val : s.hash.map) { - kv tmp = { val.first, val.second }; - v->push_back(tmp); - } - std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); - return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable { - switch (val.key) { - case metadata_type::Validation: - return write(out, s.contents[val.key]); - case metadata_type::Compaction: - return write(out, s.contents[val.key]); - case metadata_type::Stats: - return write(out, s.contents[val.key]); - default: - sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); - return make_ready_future<>(); - } - }); - }); +inline void write(file_writer& out, statistics& s) { + write(out, s.hash); + struct kv { + metadata_type key; + uint32_t value; + }; + // sort map by file offset value and store the result into a vector. + // this is indeed needed because output stream cannot afford random writes. + auto v = make_shared>(); + v->reserve(s.hash.map.size()); + for (auto val : s.hash.map) { + kv tmp = { val.first, val.second }; + v->push_back(tmp); + } + std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); + for (auto& val: *v) { + switch (val.key) { + case metadata_type::Validation: + write(out, s.contents[val.key]); + break; + case metadata_type::Compaction: + write(out, s.contents[val.key]); + break; + case metadata_type::Stats: + write(out, s.contents[val.key]); + break; + default: + sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); + return; // FIXME: should throw + } + } } future<> parse(random_access_reader& in, estimated_histogram& eh) { @@ -625,31 +602,28 @@ future<> parse(random_access_reader& in, estimated_histogram& eh) { }); } -future<> write(file_writer& out, estimated_histogram& eh) { +inline void write(file_writer& out, estimated_histogram& eh) { uint32_t len = 0; check_truncate_and_assign(len, eh.buckets.size()); - return write(out, len).then([&out, &eh] { - struct element { - uint64_t offsets; - uint64_t buckets; - }; - std::vector elements; - elements.resize(eh.buckets.size()); + write(out, len); + struct element { + uint64_t offsets; + uint64_t buckets; + }; + std::vector elements; + elements.resize(eh.buckets.size()); - auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); - auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); - for (size_t i = 0; i < eh.buckets.size(); i++) { - elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); - elements[i].buckets = net::hton(buckets_nr[i]); - } + auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); + auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); + for (size_t i = 0; i < eh.buckets.size(); i++) { + elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); + elements[i].buckets = net::hton(buckets_nr[i]); + } - return do_with(std::move(elements), [&out] (auto& elements) { - auto p = reinterpret_cast(elements.data()); - auto bytes = elements.size() * sizeof(element); - return out.write(p, bytes); - }); - }); + auto p = reinterpret_cast(elements.data()); + auto bytes = elements.size() * sizeof(element); + out.write(p, bytes).get(); } // This is small enough, and well-defined. Easier to just read it all @@ -718,7 +692,7 @@ future<> sstable::write_toc() { // new line character is appended to the end of each component name. auto value = _component_map[key] + "\n"; bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); - return write(*w, b); + return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); }); }).then([w] { return w->flush().then([w] { return w->close().then([w] {}); @@ -735,7 +709,7 @@ future<> write_crc(const sstring file_path, checksum& c) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - return write(*w, c).then([w] { + return seastar::async([w, &c] () { write(*w, c); }).then([w] { return w->close().then([w] {}); }); }); @@ -751,7 +725,7 @@ future<> write_digest(const sstring file_path, uint32_t full_checksum) { auto w = make_shared(std::move(out)); return do_with(to_sstring(full_checksum), [w] (bytes& digest) { - return write(*w, digest).then([w] { + return seastar::async([w, &digest] { write(*w, digest); }).then([w] { return w->close().then([w] {}); }); }); @@ -841,7 +815,7 @@ future<> sstable::write_simple(T& component) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - auto fut = write(*w, component); + auto fut = seastar::async([w, &component] () mutable { write(*w, component); }); return fut.then([w] { return w->flush().then([w] { return w->close().then([w] {}); // the underlying file is synced here. @@ -959,7 +933,7 @@ void sstable::write_column_name(file_writer& out, const composite& clustering_ke ck_bview.remove_suffix(1); } uint16_t sz = ck_bview.size() + c.size(); - write(out, sz, ck_bview, c).get(); + write(out, sz, ck_bview, c); } static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) { @@ -984,7 +958,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { uint32_t expiration = cell.expiry().time_since_epoch().count(); disk_string_view cell_value { cell.value() }; - write(out, mask, ttl, expiration, timestamp, cell_value).get(); + write(out, mask, ttl, expiration, timestamp, cell_value); } else if (cell.is_dead()) { // tombstone cell @@ -994,14 +968,14 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { _c_stats.tombstone_histogram.update(deletion_time); - write(out, mask, timestamp, deletion_time_size, deletion_time).get(); + write(out, mask, timestamp, deletion_time_size, deletion_time); } else { // regular cell column_mask mask = column_mask::none; disk_string_view cell_value { cell.value() }; - write(out, mask, timestamp, cell_value).get(); + write(out, mask, timestamp, cell_value); } } @@ -1019,7 +993,7 @@ void sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row update_cell_stats(_c_stats, timestamp); - write(out, mask, timestamp, value_length).get(); + write(out, mask, timestamp, value_length); } void sstable::write_range_tombstone(file_writer& out, const composite& clustering_prefix, std::vector suffix, const tombstone t) { @@ -1029,12 +1003,12 @@ void sstable::write_range_tombstone(file_writer& out, const composite& clusterin write_column_name(out, clustering_prefix, suffix, composite_marker::start_range); column_mask mask = column_mask::range_tombstone; - write(out, mask).get(); + write(out, mask); write_column_name(out, clustering_prefix, suffix, composite_marker::end_range); uint64_t timestamp = t.timestamp; uint32_t deletion_time = t.deletion_time.time_since_epoch().count(); - write(out, deletion_time, timestamp).get(); + write(out, deletion_time, timestamp); } void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation::view collection) { @@ -1098,7 +1072,7 @@ static void write_index_entry(file_writer& out, disk_string_view& key, // FIXME: support promoted indexes. uint32_t promoted_index_size = 0; - write(out, key, pos, promoted_index_size).get(); + write(out, key, pos, promoted_index_size); } static constexpr int BASE_SAMPLING_LEVEL = 128; @@ -1223,7 +1197,7 @@ void sstable::do_write_components(const memtable& mt) { write_index_entry(*index, p_key, w->offset()); // Write partition key into data file. - write(*w, p_key).get(); + write(*w, p_key); auto tombstone = partition_entry.second.partition_tombstone(); deletion_time d; @@ -1241,7 +1215,7 @@ void sstable::do_write_components(const memtable& mt) { d.local_deletion_time = std::numeric_limits::max(); d.marked_for_delete_at = std::numeric_limits::min(); } - write(*w, d).get(); + write(*w, d); auto& partition = partition_entry.second; auto& static_row = partition.static_row(); @@ -1257,7 +1231,7 @@ void sstable::do_write_components(const memtable& mt) { write_clustered_row(*w, *mt.schema(), clustered_row); } int16_t end_of_row = 0; - write(*w, end_of_row).get(); + write(*w, end_of_row); // compute size of the current row. _c_stats.row_size = w->offset() - _c_stats.start_offset;