diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 8b38a0b33c..28ac6a5e4b 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -158,15 +158,12 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { auto *nr = reinterpret_cast *>(&i); i = net::hton(*nr); auto p = reinterpret_cast(&i); - return out.write(p, sizeof(T)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(T)).get(); } template @@ -176,17 +173,17 @@ parse(random_access_reader& in, T& i) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, T i) { - return write(out, static_cast::type>(i)); + write(out, static_cast::type>(i)); } future<> parse(random_access_reader& in, bool& i) { return parse(in, reinterpret_cast(i)); } -future<> write(file_writer& out, bool i) { - return write(out, static_cast(i)); +inline void write(file_writer& out, bool i) { + write(out, static_cast(i)); } template @@ -211,14 +208,11 @@ future<> parse(random_access_reader& in, double& d) { }); } -future<> write(file_writer& out, double d) { +inline void write(file_writer& out, double d) { auto *nr = reinterpret_cast *>(&d); auto tmp = net::hton(*nr); auto p = reinterpret_cast(&tmp); - return out.write(p, sizeof(unsigned long)).then([&out] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); + out.write(p, sizeof(unsigned long)).get(); } template @@ -230,15 +224,12 @@ future<> parse(random_access_reader& in, T& len, bytes& s) { }); } -future<> write(file_writer& out, bytes& s) { - return out.write(s).then([&out, &s] (...) -> future<> { - // TODO: handle result - return make_ready_future<>(); - }); +inline void write(file_writer& out, bytes& s) { + out.write(s).get(); } -future<> write(file_writer& out, bytes_view s) { - return out.write(reinterpret_cast(s.data()), s.size()); +inline void write(file_writer& out, bytes_view s) { + out.write(reinterpret_cast(s.data()), s.size()).get(); } // All composite parsers must come after this @@ -250,10 +241,9 @@ future<> parse(random_access_reader& in, First& first, Rest&&... rest) { } template -future<> write(file_writer& out, First& first, Rest&&... rest) { - return write(out, first).then([&out, &rest...] { - return write(out, rest...); - }); +inline void write(file_writer& out, First& first, Rest&&... rest) { + write(out, first); + write(out, std::forward(rest)...); } // Intended to be used for a type that describes itself through describe_type(). @@ -266,10 +256,10 @@ parse(random_access_reader& in, T& t) { } template -typename std::enable_if_t::value && !std::is_enum::value, future<>> +inline typename std::enable_if_t::value && !std::is_enum::value, void> write(file_writer& out, T& t) { - return t.describe_type([&out] (auto&&... what) -> future<> { - return write(out, what...); + t.describe_type([&out] (auto&&... what) -> void { + write(out, std::forward(what)...); }); } @@ -288,19 +278,18 @@ future<> parse(random_access_reader& in, disk_string& s) { } template -future<> write(file_writer& out, disk_string& s) { +inline void write(file_writer& out, disk_string& s) { Size len = 0; check_truncate_and_assign(len, s.value.size()); - return write(out, len).then([&out, &s] { - return write(out, s.value); - }); + write(out, len); + write(out, s.value); } template -future<> write(file_writer& out, disk_string_view& s) { +inline void write(file_writer& out, disk_string_view& s) { Size len; check_truncate_and_assign(len, s.value.size()); - return write(out, len, s.value); + write(out, len, s.value); } // We cannot simply read the whole array at once, because we don't know its @@ -350,42 +339,34 @@ future<> parse(random_access_reader& in, disk_array& arr) { } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - - auto count = make_lw_shared(0); - auto eoarr = [count, &arr] { return *count == arr.size(); }; - - return do_until(eoarr, [count, &out, &arr] { - return write(out, arr[(*count)++]); - }); + for (auto& a : arr) { + write(out, a); + } } template -typename std::enable_if_t::value, future<>> +inline typename std::enable_if_t::value, void> write(file_writer& out, std::vector& arr) { - return do_with(std::vector(), [&out, &arr] (auto& tmp) { - tmp.resize(arr.size()); - // copy arr into tmp converting each entry into big-endian representation. - auto *nr = reinterpret_cast *>(arr.data()); - for (size_t i = 0; i < arr.size(); i++) { - tmp[i] = net::hton(nr[i]); - } - auto p = reinterpret_cast(tmp.data()); - auto bytes = tmp.size() * sizeof(Members); - return out.write(p, bytes).then([&out] (...) -> future<> { - return make_ready_future<>(); - }); - }); + std::vector tmp; + tmp.resize(arr.size()); + // copy arr into tmp converting each entry into big-endian representation. + auto *nr = reinterpret_cast *>(arr.data()); + for (size_t i = 0; i < arr.size(); i++) { + tmp[i] = net::hton(nr[i]); + } + auto p = reinterpret_cast(tmp.data()); + auto bytes = tmp.size() * sizeof(Members); + out.write(p, bytes).get(); } template -future<> write(file_writer& out, disk_array& arr) { +inline void write(file_writer& out, disk_array& arr) { Size len = 0; check_truncate_and_assign(len, arr.elements.size()); - return write(out, len).then([&out, &arr] { - return write(out, arr.elements); - }); + write(out, len); + write(out, arr.elements); } template @@ -417,19 +398,18 @@ future<> parse(random_access_reader& in, disk_hash& h) { } template -future<> write(file_writer& out, std::unordered_map& map) { - return do_for_each(map.begin(), map.end(), [&out, &map] (auto& val) { - return write(out, val.first, val.second); - }); +inline void write(file_writer& out, std::unordered_map& map) { + for (auto& val: map) { + write(out, val.first, val.second); + }; } template -future<> write(file_writer& out, disk_hash& h) { +inline void write(file_writer& out, disk_hash& h) { Size len = 0; check_truncate_and_assign(len, h.map.size()); - return write(out, len).then([&out, &h] { - return write(out, h.map); - }); + write(out, len); + write(out, h.map); } future<> parse(random_access_reader& in, summary& s) { @@ -491,33 +471,28 @@ future<> parse(random_access_reader& in, summary& s) { }); } -future<> write(file_writer& out, summary_entry& entry) -{ +inline void write(file_writer& out, summary_entry& entry) { // FIXME: summary entry is supposedly written in memory order, but that // would prevent portability of summary file between machines of different // endianness. We can treat it as little endian to preserve portability. - return write(out, entry.key).then([&out, &entry] { - auto p = reinterpret_cast(&entry.position); - return out.write(p, sizeof(uint64_t)); - }); + write(out, entry.key); + auto p = reinterpret_cast(&entry.position); + out.write(p, sizeof(uint64_t)).get(); } -future<> write(file_writer& out, summary& s) { +inline void write(file_writer& out, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; // NOTE: positions and entries must be stored in NATIVE BYTE ORDER, not BIG-ENDIAN. - return write(out, s.header.min_index_interval, - s.header.size, - s.header.memory_size, - s.header.sampling_level, - s.header.size_at_full_sampling).then([&out, &s] { - auto p = reinterpret_cast(s.positions.data()); - return out.write(p, sizeof(pos_type) * s.positions.size()); - }).then([&out, &s] { - return write(out, s.entries); - }).then([&out, &s] { - return write(out, s.first_key, s.last_key); - }); + write(out, s.header.min_index_interval, + s.header.size, + s.header.memory_size, + s.header.sampling_level, + s.header.size_at_full_sampling); + auto p = reinterpret_cast(s.positions.data()); + out.write(p, sizeof(pos_type) * s.positions.size()).get(); + write(out, s.entries); + write(out, s.first_key, s.last_key); } future sstable::read_summary_entry(size_t i) { @@ -544,8 +519,8 @@ future<> parse(random_access_reader& in, std::unique_ptr& p) { } template -future<> write(file_writer& out, std::unique_ptr& p) { - return write(out, *static_cast(p.get())); +inline void write(file_writer& out, std::unique_ptr& p) { + write(out, *static_cast(p.get())); } future<> parse(random_access_reader& in, statistics& s) { @@ -568,35 +543,37 @@ future<> parse(random_access_reader& in, statistics& s) { }); } -future<> write(file_writer& out, statistics& s) { - return write(out, s.hash).then([&out, &s] { - struct kv { - metadata_type key; - uint32_t value; - }; - // sort map by file offset value and store the result into a vector. - // this is indeed needed because output stream cannot afford random writes. - auto v = make_shared>(); - v->reserve(s.hash.map.size()); - for (auto val : s.hash.map) { - kv tmp = { val.first, val.second }; - v->push_back(tmp); - } - std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); - return do_for_each(v->begin(), v->end(), [&out, &s, v] (auto val) mutable { - switch (val.key) { - case metadata_type::Validation: - return write(out, s.contents[val.key]); - case metadata_type::Compaction: - return write(out, s.contents[val.key]); - case metadata_type::Stats: - return write(out, s.contents[val.key]); - default: - sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); - return make_ready_future<>(); - } - }); - }); +inline void write(file_writer& out, statistics& s) { + write(out, s.hash); + struct kv { + metadata_type key; + uint32_t value; + }; + // sort map by file offset value and store the result into a vector. + // this is indeed needed because output stream cannot afford random writes. + auto v = make_shared>(); + v->reserve(s.hash.map.size()); + for (auto val : s.hash.map) { + kv tmp = { val.first, val.second }; + v->push_back(tmp); + } + std::sort(v->begin(), v->end(), [] (kv i, kv j) { return i.value < j.value; }); + for (auto& val: *v) { + switch (val.key) { + case metadata_type::Validation: + write(out, s.contents[val.key]); + break; + case metadata_type::Compaction: + write(out, s.contents[val.key]); + break; + case metadata_type::Stats: + write(out, s.contents[val.key]); + break; + default: + sstlog.warn("Invalid metadata type at Statistics file: {} ", int(val.key)); + return; // FIXME: should throw + } + } } future<> parse(random_access_reader& in, estimated_histogram& eh) { @@ -625,31 +602,28 @@ future<> parse(random_access_reader& in, estimated_histogram& eh) { }); } -future<> write(file_writer& out, estimated_histogram& eh) { +inline void write(file_writer& out, estimated_histogram& eh) { uint32_t len = 0; check_truncate_and_assign(len, eh.buckets.size()); - return write(out, len).then([&out, &eh] { - struct element { - uint64_t offsets; - uint64_t buckets; - }; - std::vector elements; - elements.resize(eh.buckets.size()); + write(out, len); + struct element { + uint64_t offsets; + uint64_t buckets; + }; + std::vector elements; + elements.resize(eh.buckets.size()); - auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); - auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); - for (size_t i = 0; i < eh.buckets.size(); i++) { - elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); - elements[i].buckets = net::hton(buckets_nr[i]); - } + auto *offsets_nr = reinterpret_cast *>(eh.bucket_offsets.data()); + auto *buckets_nr = reinterpret_cast *>(eh.buckets.data()); + for (size_t i = 0; i < eh.buckets.size(); i++) { + elements[i].offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]); + elements[i].buckets = net::hton(buckets_nr[i]); + } - return do_with(std::move(elements), [&out] (auto& elements) { - auto p = reinterpret_cast(elements.data()); - auto bytes = elements.size() * sizeof(element); - return out.write(p, bytes); - }); - }); + auto p = reinterpret_cast(elements.data()); + auto bytes = elements.size() * sizeof(element); + out.write(p, bytes).get(); } // This is small enough, and well-defined. Easier to just read it all @@ -718,7 +692,7 @@ future<> sstable::write_toc() { // new line character is appended to the end of each component name. auto value = _component_map[key] + "\n"; bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); - return write(*w, b); + return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); }); }).then([w] { return w->flush().then([w] { return w->close().then([w] {}); @@ -735,7 +709,7 @@ future<> write_crc(const sstring file_path, checksum& c) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - return write(*w, c).then([w] { + return seastar::async([w, &c] () { write(*w, c); }).then([w] { return w->close().then([w] {}); }); }); @@ -751,7 +725,7 @@ future<> write_digest(const sstring file_path, uint32_t full_checksum) { auto w = make_shared(std::move(out)); return do_with(to_sstring(full_checksum), [w] (bytes& digest) { - return write(*w, digest).then([w] { + return seastar::async([w, &digest] { write(*w, digest); }).then([w] { return w->close().then([w] {}); }); }); @@ -841,7 +815,7 @@ future<> sstable::write_simple(T& component) { auto out = file_writer(make_lw_shared(std::move(f)), 4096); auto w = make_shared(std::move(out)); - auto fut = write(*w, component); + auto fut = seastar::async([w, &component] () mutable { write(*w, component); }); return fut.then([w] { return w->flush().then([w] { return w->close().then([w] {}); // the underlying file is synced here. @@ -959,7 +933,7 @@ void sstable::write_column_name(file_writer& out, const composite& clustering_ke ck_bview.remove_suffix(1); } uint16_t sz = ck_bview.size() + c.size(); - write(out, sz, ck_bview, c).get(); + write(out, sz, ck_bview, c); } static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) { @@ -984,7 +958,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { uint32_t expiration = cell.expiry().time_since_epoch().count(); disk_string_view cell_value { cell.value() }; - write(out, mask, ttl, expiration, timestamp, cell_value).get(); + write(out, mask, ttl, expiration, timestamp, cell_value); } else if (cell.is_dead()) { // tombstone cell @@ -994,14 +968,14 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell) { _c_stats.tombstone_histogram.update(deletion_time); - write(out, mask, timestamp, deletion_time_size, deletion_time).get(); + write(out, mask, timestamp, deletion_time_size, deletion_time); } else { // regular cell column_mask mask = column_mask::none; disk_string_view cell_value { cell.value() }; - write(out, mask, timestamp, cell_value).get(); + write(out, mask, timestamp, cell_value); } } @@ -1019,7 +993,7 @@ void sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row update_cell_stats(_c_stats, timestamp); - write(out, mask, timestamp, value_length).get(); + write(out, mask, timestamp, value_length); } void sstable::write_range_tombstone(file_writer& out, const composite& clustering_prefix, std::vector suffix, const tombstone t) { @@ -1029,12 +1003,12 @@ void sstable::write_range_tombstone(file_writer& out, const composite& clusterin write_column_name(out, clustering_prefix, suffix, composite_marker::start_range); column_mask mask = column_mask::range_tombstone; - write(out, mask).get(); + write(out, mask); write_column_name(out, clustering_prefix, suffix, composite_marker::end_range); uint64_t timestamp = t.timestamp; uint32_t deletion_time = t.deletion_time.time_since_epoch().count(); - write(out, deletion_time, timestamp).get(); + write(out, deletion_time, timestamp); } void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation::view collection) { @@ -1098,7 +1072,7 @@ static void write_index_entry(file_writer& out, disk_string_view& key, // FIXME: support promoted indexes. uint32_t promoted_index_size = 0; - write(out, key, pos, promoted_index_size).get(); + write(out, key, pos, promoted_index_size); } static constexpr int BASE_SAMPLING_LEVEL = 128; @@ -1223,7 +1197,7 @@ void sstable::do_write_components(const memtable& mt) { write_index_entry(*index, p_key, w->offset()); // Write partition key into data file. - write(*w, p_key).get(); + write(*w, p_key); auto tombstone = partition_entry.second.partition_tombstone(); deletion_time d; @@ -1241,7 +1215,7 @@ void sstable::do_write_components(const memtable& mt) { d.local_deletion_time = std::numeric_limits::max(); d.marked_for_delete_at = std::numeric_limits::min(); } - write(*w, d).get(); + write(*w, d); auto& partition = partition_entry.second; auto& static_row = partition.static_row(); @@ -1257,7 +1231,7 @@ void sstable::do_write_components(const memtable& mt) { write_clustered_row(*w, *mt.schema(), clustered_row); } int16_t end_of_row = 0; - write(*w, end_of_row).get(); + write(*w, end_of_row); // compute size of the current row. _c_stats.row_size = w->offset() - _c_stats.start_offset;