From 2dbd2b408a11fc4ef82f27576988cd912eeae244 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Jun 2015 02:04:34 +0300 Subject: [PATCH 1/2] sstables: change describe_type's return type to auto We always return a future, but with the threaded writer, we can get rid of that. So while reads will still return a future, the writer will be able to return void. Signed-off-by: Glauber Costa --- sstables/compress.hh | 2 +- sstables/key.hh | 2 +- sstables/streaming_histogram.hh | 2 +- sstables/types.hh | 16 ++++++++-------- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sstables/compress.hh b/sstables/compress.hh index 30f347f5d1..bcd656cc30 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -94,7 +94,7 @@ struct compression { disk_array offsets; template - future<> describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); } + auto describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); } private: // Variables determined from the above deserialized values, held for convenience: diff --git a/sstables/key.hh b/sstables/key.hh index 2ab0715508..dad23f4334 100644 --- a/sstables/key.hh +++ b/sstables/key.hh @@ -114,7 +114,7 @@ class composite { public: composite (bytes&& b) : _bytes(std::move(b)) {} template - future<> describe_type(Describer f) const { return f(const_cast(_bytes)); } + auto describe_type(Describer f) const { return f(const_cast(_bytes)); } static composite from_bytes(bytes b) { return composite(std::move(b)); } template diff --git a/sstables/streaming_histogram.hh b/sstables/streaming_histogram.hh index 08bcf26738..be56a04ff2 100644 --- a/sstables/streaming_histogram.hh +++ b/sstables/streaming_histogram.hh @@ -131,7 +131,7 @@ struct streaming_histogram { * Function used to describe the type. */ template - future<> describe_type(Describer f) { return f(max_bin_size, bin); } + auto describe_type(Describer f) { return f(max_bin_size, bin); } // FIXME: convert Java code below. #if 0 diff --git a/sstables/types.hh b/sstables/types.hh index 6300e9ba72..25171d557c 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -23,7 +23,7 @@ struct option { disk_string value; template - future<> describe_type(Describer f) { return f(key, value); } + auto describe_type(Describer f) { return f(key, value); } }; struct filter { @@ -31,7 +31,7 @@ struct filter { disk_array buckets; template - future<> describe_type(Describer f) { return f(hashes, buckets); } + auto describe_type(Describer f) { return f(hashes, buckets); } // Create an always positive filter if nothing else is specified. filter() : hashes(0), buckets({}) {} @@ -119,7 +119,7 @@ struct replay_position { } template - future<> describe_type(Describer f) { return f(segment, position); } + auto describe_type(Describer f) { return f(segment, position); } }; struct metadata { @@ -134,7 +134,7 @@ struct validation_metadata : public metadata { } template - future<> describe_type(Describer f) { return f(partitioner, filter_chance); } + auto describe_type(Describer f) { return f(partitioner, filter_chance); } }; struct compaction_metadata : public metadata { @@ -142,7 +142,7 @@ struct compaction_metadata : public metadata { disk_array cardinality; template - future<> describe_type(Describer f) { return f(ancestors, cardinality); } + auto describe_type(Describer f) { return f(ancestors, cardinality); } }; struct la_stats_metadata : public metadata { @@ -161,7 +161,7 @@ struct la_stats_metadata : public metadata { bool has_legacy_counter_shards; template - future<> describe_type(Describer f) { + auto describe_type(Describer f) { return f( estimated_row_size, estimated_column_count, @@ -200,7 +200,7 @@ struct checksum { std::vector checksums; template - future<> describe_type(Describer f) { return f(chunk_size, checksums); } + auto describe_type(Describer f) { return f(chunk_size, checksums); } }; } @@ -228,7 +228,7 @@ struct deletion_time { int64_t marked_for_delete_at; template - future<> describe_type(Describer f) { return f(local_deletion_time, marked_for_delete_at); } + auto describe_type(Describer f) { return f(local_deletion_time, marked_for_delete_at); } bool live() const { return (local_deletion_time == std::numeric_limits::max()) && From a076ea563bb12ba8bc59a803ccb7c6f4d7331b47 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Jun 2015 12:39:45 +0300 Subject: [PATCH 2/2] rework write functions Up until now, we were still generating one future per element that we write. Now that we have new infrastructure, we can avoid that, and generate only the ones we really need to. This has the added advantage of lifting the need to do lambda captures and allowing for a more straightfoward forwarding of rest... parameters Signed-off-by: Glauber Costa --- sstables/sstables.cc | 280 ++++++++++++++++++++----------------------- 1 file changed, 127 insertions(+), 153 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 8b38a0b33c..28ac6a5e4b 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -158,15 +158,12 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { auto *nr = reinterpret_cast *>(&i); i = net::hton(*nr); auto p = reinterpret_cast(&i); - return out.write(p, sizeof(T)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(T)).get(); } template @@ -176,17 +173,17 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { - return write(out, static_cast::type>(i)); + write(out, static_cast::type>(i)); } future<> parse(random_access_reader& in, bool& i) { return parse(in, reinterpret_cast(i)); } -future<> write(file_writer& out, bool i) { - return write(out, static_cast(i)); +inline void write(file_writer& out, bool i) { + write(out, static_cast(i)); } template @@ -211,14 +208,11 @@ future<> parse(random_access_reader& in, double& d) { }); } -future<> write(file_writer& out, double d) { +inline void write(file_writer& out, double d) { auto *nr = reinterpret_cast *>(&d); auto tmp = net::hton(*nr); auto p = reinterpret_cast(&tmp); - return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(unsigned long)).get(); } template @@ -230,15 +224,12 @@ future<> parse(random_access_reader& in, T& len, bytes& s) { }); } -future<> write(file_writer& out, bytes& s) { - return out.write(s).then([&out, &s] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); +inline void write(file_writer& out, bytes& s) { + out.write(s).get(); } -future<> write(file_writer& out, bytes_view s) { - return out.write(reinterpret_cast(s.data()), s.size()); +inline void write(file_writer& out, bytes_view s) { + out.write(reinterpret_cast(s.data()), s.size()).get(); } // All composite parsers must come after this @@ -250,10 +241,9 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) { } template -future<> write(file_writer& out, First& first, Rest&&... rest) { - return write(out, first).then([&out, &rest...] { - return write(out, rest...); - }); +inline void write(file_writer& out, First& first, Rest&&... rest) { + write(out, first); + write(out, std::forward(rest)...); } // Intended to be used for a type that describes itself through describe_type(). @@ -266,10 +256,10 @@ parse(random_access_reader& in, T& t) { } template -typename std::enable_if_t::value && !std::is_enum::value, future<>> +inline typename std::enable_if_t::value && !std::is_enum::value, void> write(file_writer& out, T& t) { - return t.describe_type([&out] (auto&&... what) -> future<> { - return write(out, what...); + t.describe_type([&out] (auto&&... what) -> void { + write(out, std::forward(what)...); }); } @@ -288,19 +278,18 @@ future<> parse(random_access_reader& in, disk_string& s) { } template -future<> write(file_writer& out, disk_string& s) { +inline void write(file_writer& out, disk_string& s) { Size len = 0; check_truncate_and_assign(len, s.value.size()); - return write(out, len).then([&out, &s] { - return write(out, s.value); - }); + write(out, len); + write(out, s.value); } template -future<> write(file_writer& out, disk_string_view& s) { +inline void write(file_writer& out, disk_string_view& s) { Size len; check_truncate_and_assign(len, s.value.size()); - return write(out, len, s.value); + write(out, len, s.value); } // We cannot simply read the whole array at once, because we don't know its @@ -350,42 +339,34 @@ future<> parse(random_access_reader& in, disk_array& arr) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - - auto count = make_lw_shared(0); - auto eoarr = [count, &arr] { return *count == arr.size(); }; - - return do_until(eoarr, [count, &out, &arr] { - return write(out, arr[(*count)++]); - }); + for (auto& a : arr) { + write(out, a); + } } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - return do_with(std::vector(), [&out, &arr] (auto& tmp) { - tmp.resize(arr.size()); - // copy arr into tmp converting each entry into big-endian representation. - auto *nr = reinterpret_cast *>(arr.data()); - for (size_t i = 0; i < arr.size(); i++) { - tmp[i] = net::hton(nr[i]); - } - auto p = reinterpret_cast(tmp.data()); - auto bytes = tmp.size() * sizeof(Members); - return out.write(p, bytes).then([&out] (...) -> future<> { - return make_ready_future<>(); - }); - }); + std::vector tmp; + tmp.resize(arr.size()); + // copy arr into tmp converting each entry into big-endian representation. + auto *nr = reinterpret_cast *>(arr.data()); + for (size_t i = 0; i < arr.size(); i++) { + tmp[i] = net::hton(nr[i]); + } + auto p = reinterpret_cast(tmp.data()); + auto bytes = tmp.size() * sizeof(Members); + out.write(p, bytes).get(); } template -future<> write(file_writer& out, disk_array& arr) { +inline void write(file_writer& out, disk_array& arr) { Size len = 0; check_truncate_and_assign(len, arr.elements.size()); - return write(out, len).then([&out, &arr] { - return write(out, arr.elements); - }); + write(out, len); + write(out, arr.elements); } template @@ -417,19 +398,18 @@ future<> parse(random_access_reader& in, disk_hash& h) { } template -future<> write(file_writer& out, std::unordered_map& map) { - return do_for_each(map.begin(), map.end(), [&out, &map] (auto& val) { - return write(out, val.first, val.second); - }); +inline void write(file_writer& out, std::unordered_map& map) { + for (auto& val: map) { + write(out, val.first, val.second); + }; } template -future<> write(file_writer& out, disk_hash& h) { +inline void write(file_writer& out, disk_hash& h) { Size len = 0; check_truncate_and_assign(len, h.map.size()); - return write(out, len).then([&out, &h] { - return write(out, h.map); - }); + write(out, len); + write(out, h.map); } future<> parse(random_access_reader& in, summary& s) { @@ -491,33 +471,28 @@ future<> parse(random_access_reader& in, summary& s) { }); } -future<> write(file_writer& out, summary_entry& entry) -{ +inline void write(file_writer& out, summary_entry& entry) { // FIXME: summary entry is supposedly written in memory order, but that // would prevent portability of summary file between machines of different // endianness. We can treat it as little endian to preserve portability. - return write(out, entry.key).then([&out, &entry] { - auto p = reinterpret_cast(&entry.position); - return out.write(p, sizeof(uint64_t)); - }); + write(out, entry.key); + auto p = reinterpret_cast(&entry.position); + out.write(p, sizeof(uint64_t)).get(); } -future<> write(file_writer& out, summary& s) { +inline void write(file_writer& out, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; // NOTE: positions and entries must be stored in NATIVE BYTE ORDER, not BIG-ENDIAN. - return write(out, s.header.min_index_interval, - s.header.size, - s.header.memory_size, - s.header.sampling_level, - s.header.size_at_full_sampling).then([&out, &s] { - auto p = reinterpret_cast(s.positions.data()); - return out.write(p, sizeof(pos_type) * s.positions.size()); - }).then([&out, &s] { - return write(out, s.entries); - }).then([&out, &s] { - return write(out, s.first_key, s.last_key); - }); + write(out, s.header.min_index_interval, + s.header.size, + s.header.memory_size, + s.header.sampling_level, + s.header.size_at_full_sampling); + auto p = reinterpret_cast(s.positions.data()); + out.write(p, sizeof(pos_type) * s.positions.size()).get(); + write(out, s.entries); + write(out, s.first_key, s.last_key); } future sstable::read_summary_entry(size_t i) { @@ -544,8 +519,8 @@ future<> parse(random_access_reader& in, std::unique_ptr& p) { } template -future<> write(file_writer& out, std::unique_ptr& p) { - return write(out, *static_cast(p.get())); +inline void write(file_writer& out, std::unique_ptr& p) { + write(out, *static_cast(p.get())); } future<> parse(random_access_reader& in, statistics& s) { @@ -568,35 +543,37 @@ future<> parse(random_access_reader& in, statistics& s) { }); } -future<> write(file_writer& out, statistics& s) { - return write(out, s.hash).then([&out, &s] { - struct kv { - metadata_type key; - uint32_t value; - }; - // sort map by file offset value and store the result into a vector. - // this is indeed needed because output stream cannot afford random writes. - auto v = make_shared>(); - v->reserve(s.hash.map.size()); - for (auto val : s.hash.map) { - kv tmp = { val.first, val.second }; - v->push_back(tmp); - } - std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); - return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable { - switch (val.key) { - case metadata_type::Validation: - return write(out, s.contents[val.key]); - case metadata_type::Compaction: - return write(out, s.contents[val.key]); - case metadata_type::Stats: - return write(out, s.contents[val.key]); - default: - sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); - return make_ready_future<>(); - } - }); - }); +inline void write(file_writer& out, statistics& s) { + write(out, s.hash); + struct kv { + metadata_type key; + uint32_t value; + }; + // sort map by file offset value and store the result into a vector. + // this is indeed needed because output stream cannot afford random writes. + auto v = make_shared>(); + v->reserve(s.hash.map.size()); + for (auto val : s.hash.map) { + kv tmp = { val.first, val.second }; + v->push_back(tmp); + } + std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); + for (auto& val: *v) { + switch (val.key) { + case metadata_type::Validation: + write(out, s.contents[val.key]); + break; + case metadata_type::Compaction: + write(out, s.contents[val.key]); + break; + case metadata_type::Stats: + write(out, s.contents[val.key]); + break; + default: + sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); + return; // FIXME: should throw + } + } } future<> parse(random_access_reader& in, estimated_histogram& eh) { @@ -625,31 +602,28 @@ future<> parse(random_access_reader& in, estimated_histogram& eh) { }); } -future<> write(file_writer& out, estimated_histogram& eh) { +inline void write(file_writer& out, estimated_histogram& eh) { uint32_t len = 0; check_truncate_and_assign(len, eh.buckets.size()); - return write(out, len).then([&out, &eh] { - struct element { - uint64_t offsets; - uint64_t buckets; - }; - std::vector elements; - elements.resize(eh.buckets.size()); + write(out, len); + struct element { + uint64_t offsets; + uint64_t buckets; + }; + std::vector elements; + elements.resize(eh.buckets.size()); - auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); - auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); - for (size_t i = 0; i < eh.buckets.size(); i++) { - elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); - elements[i].buckets = net::hton(buckets_nr[i]); - } + auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); + auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); + for (size_t i = 0; i < eh.buckets.size(); i++) { + elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); + elements[i].buckets = net::hton(buckets_nr[i]); + } - return do_with(std::move(elements), [&out] (auto& elements) { - auto p = reinterpret_cast(elements.data()); - auto bytes = elements.size() * sizeof(element); - return out.write(p, bytes); - }); - }); + auto p = reinterpret_cast(elements.data()); + auto bytes = elements.size() * sizeof(element); + out.write(p, bytes).get(); } // This is small enough, and well-defined. Easier to just read it all @@ -718,7 +692,7 @@ future<> sstable::write_toc() { // new line character is appended to the end of each component name. auto value = _component_map[key] + "\n"; bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); - return write(*w, b); + return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); }); }).then([w] { return w->flush().then([w] { return w->close().then([w] {}); @@ -735,7 +709,7 @@ future<> write_crc(const sstring file_path, checksum& c) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - return write(*w, c).then([w] { + return seastar::async([w, &c] () { write(*w, c); }).then([w] { return w->close().then([w] {}); }); }); @@ -751,7 +725,7 @@ future<> write_digest(const sstring file_path, uint32_t full_checksum) { auto w = make_shared(std::move(out)); return do_with(to_sstring(full_checksum), [w] (bytes& digest) { - return write(*w, digest).then([w] { + return seastar::async([w, &digest] { write(*w, digest); }).then([w] { return w->close().then([w] {}); }); }); @@ -841,7 +815,7 @@ future<> sstable::write_simple(T& component) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - auto fut = write(*w, component); + auto fut = seastar::async([w, &component] () mutable { write(*w, component); }); return fut.then([w] { return w->flush().then([w] { return w->close().then([w] {}); // the underlying file is synced here. @@ -959,7 +933,7 @@ void sstable::write_column_name(file_writer& out, const composite& clustering_ke ck_bview.remove_suffix(1); } uint16_t sz = ck_bview.size() + c.size(); - write(out, sz, ck_bview, c).get(); + write(out, sz, ck_bview, c); } static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) { @@ -984,7 +958,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { uint32_t expiration = cell.expiry().time_since_epoch().count(); disk_string_view cell_value { cell.value() }; - write(out, mask, ttl, expiration, timestamp, cell_value).get(); + write(out, mask, ttl, expiration, timestamp, cell_value); } else if (cell.is_dead()) { // tombstone cell @@ -994,14 +968,14 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { _c_stats.tombstone_histogram.update(deletion_time); - write(out, mask, timestamp, deletion_time_size, deletion_time).get(); + write(out, mask, timestamp, deletion_time_size, deletion_time); } else { // regular cell column_mask mask = column_mask::none; disk_string_view cell_value { cell.value() }; - write(out, mask, timestamp, cell_value).get(); + write(out, mask, timestamp, cell_value); } } @@ -1019,7 +993,7 @@ void sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row update_cell_stats(_c_stats, timestamp); - write(out, mask, timestamp, value_length).get(); + write(out, mask, timestamp, value_length); } void sstable::write_range_tombstone(file_writer& out, const composite& clustering_prefix, std::vector suffix, const tombstone t) { @@ -1029,12 +1003,12 @@ void sstable::write_range_tombstone(file_writer& out, const composite& clusterin write_column_name(out, clustering_prefix, suffix, composite_marker::start_range); column_mask mask = column_mask::range_tombstone; - write(out, mask).get(); + write(out, mask); write_column_name(out, clustering_prefix, suffix, composite_marker::end_range); uint64_t timestamp = t.timestamp; uint32_t deletion_time = t.deletion_time.time_since_epoch().count(); - write(out, deletion_time, timestamp).get(); + write(out, deletion_time, timestamp); } void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation::view collection) { @@ -1098,7 +1072,7 @@ static void write_index_entry(file_writer& out, disk_string_view& key, // FIXME: support promoted indexes. uint32_t promoted_index_size = 0; - write(out, key, pos, promoted_index_size).get(); + write(out, key, pos, promoted_index_size); } static constexpr int BASE_SAMPLING_LEVEL = 128; @@ -1223,7 +1197,7 @@ void sstable::do_write_components(const memtable& mt) { write_index_entry(*index, p_key, w->offset()); // Write partition key into data file. - write(*w, p_key).get(); + write(*w, p_key); auto tombstone = partition_entry.second.partition_tombstone(); deletion_time d; @@ -1241,7 +1215,7 @@ void sstable::do_write_components(const memtable& mt) { d.local_deletion_time = std::numeric_limits::max(); d.marked_for_delete_at = std::numeric_limits::min(); } - write(*w, d).get(); + write(*w, d); auto& partition = partition_entry.second; auto& static_row = partition.static_row(); @@ -1257,7 +1231,7 @@ void sstable::do_write_components(const memtable& mt) { write_clustered_row(*w, *mt.schema(), clustered_row); } int16_t end_of_row = 0; - write(*w, end_of_row).get(); + write(*w, end_of_row); // compute size of the current row. _c_stats.row_size = w->offset() - _c_stats.start_offset;