From 13999a4d09e9491cbbdb0fbb15f5cba6bd705ea7 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 6 Dec 2018 17:27:38 +0100 Subject: [PATCH] sstables: De-futurize file_writer --- sstables/sstables.cc | 62 ++++++++++++++++++++++---------------------- sstables/writer.hh | 25 ++++++++++-------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index c8bb5ffdf0..7e043e4de6 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -246,7 +246,7 @@ write(sstable_version_types v, file_writer& out, T i) { auto *nr = reinterpret_cast *>(&i); i = net::hton(*nr); auto p = reinterpret_cast(&i); - out.write(p, sizeof(T)).get(); + out.write(p, sizeof(T)); } template @@ -295,7 +295,7 @@ inline void write(sstable_version_types v, file_writer& out, double d) { auto *nr = reinterpret_cast *>(&d); auto tmp = net::hton(*nr); auto p = reinterpret_cast(&tmp); - out.write(p, sizeof(unsigned long)).get(); + out.write(p, sizeof(unsigned long)); } template @@ -308,11 +308,11 @@ future<> parse(sstable_version_types, random_access_reader& in, T& len, bytes& s } inline void write(sstable_version_types v, file_writer& out, const bytes& s) { - out.write(s).get(); + out.write(s); } inline void write(sstable_version_types v, file_writer& out, bytes_view s) { - out.write(reinterpret_cast(s.data()), s.size()).get(); + out.write(reinterpret_cast(s.data()), s.size()); } inline void write(sstable_version_types v, file_writer& out, bytes_ostream s) { @@ -363,7 +363,7 @@ future<> parse(sstable_version_types, random_access_reader& in, utils::UUID& uui } inline void write(sstable_version_types v, file_writer& out, const utils::UUID& uuid) { - out.write(uuid.serialize()).get(); + out.write(uuid.serialize()); } template @@ -514,7 +514,7 @@ write(sstable_version_types v, file_writer& out, const utils::chunked_vector(tmp.data()); auto bytes = now * sizeof(Members); - out.write(p, bytes).get(); + out.write(p, bytes); idx += now; } } @@ -757,7 +757,7 @@ inline void write(sstable_version_types v, file_writer& out, const summary_entry // endianness. We can treat it as little endian to preserve portability. write(v, out, entry.key); auto p = seastar::cpu_to_le(entry.position); - out.write(reinterpret_cast(&p), sizeof(p)).get(); + out.write(reinterpret_cast(&p), sizeof(p)); } inline void write(sstable_version_types v, file_writer& out, const summary& s) { @@ -769,7 +769,7 @@ inline void write(sstable_version_types v, file_writer& out, const summary& s) { s.header.size_at_full_sampling); for (auto&& e : s.positions) { auto p = seastar::cpu_to_le(e); - out.write(reinterpret_cast(&p), sizeof(p)).get(); + out.write(reinterpret_cast(&p), sizeof(p)); } write(v, out, s.entries); write(v, out, s.first_key, s.last_key); @@ -900,7 +900,7 @@ inline void write(sstable_version_types v, file_writer& out, const utils::estima auto p = reinterpret_cast(elements.data()); auto bytes = elements.size() * sizeof(element); - out.write(p, bytes).get(); + out.write(p, bytes); } struct streaming_histogram_element { @@ -1010,7 +1010,7 @@ void write(sstable_version_types v, file_writer& out, const compression& c) { } auto p = reinterpret_cast(tmp.data()); auto bytes = now * sizeof(uint64_t); - out.write(p, bytes).get(); + out.write(p, bytes); idx += now; } } @@ -1125,8 +1125,8 @@ void sstable::write_toc(const io_priority_class& pc) { bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); write(_version, w, b); } - w.flush().get(); - w.close().get(); + w.flush(); + w.close(); // Flushing parent directory to guarantee that temporary TOC file reached // the disk. @@ -1172,7 +1172,7 @@ void sstable::write_crc(const checksum& c) { options.buffer_size = 4096; auto w = file_writer(std::move(f), std::move(options)); write(get_version(), w, c); - w.close().get(); + w.close(); } // Digest file stores the full checksum of data file converted into a string. @@ -1189,7 +1189,7 @@ void sstable::write_digest(uint32_t full_checksum) { auto digest = to_sstring(full_checksum); write(get_version(), w, digest); - w.close().get(); + w.close(); } thread_local std::array, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache; @@ -1236,8 +1236,8 @@ void sstable::write_simple(const T& component, const io_priority_class& pc) { options.io_priority_class = pc; auto w = file_writer(std::move(f), std::move(options)); write(_version, w, component); - w.flush().get(); - w.close().get(); + w.flush(); + w.close(); } template future<> sstable::read_simple(sstables::filter& f, const io_priority_class& pc); @@ -1374,8 +1374,8 @@ void sstable::rewrite_statistics(const io_priority_class& pc) { options.io_priority_class = pc; auto w = file_writer(std::move(f), std::move(options)); write(_version, w, _components->statistics); - w.flush().get(); - w.close().get(); + w.flush(); + w.close(); // rename() guarantees atomicity when renaming a file into place. sstable_write_io_check(rename_file, file_path, filename(component_type::Statistics)).get(); } @@ -2458,7 +2458,7 @@ void components_writer::consume_end_of_stream() { seal_summary(_sst._components->summary, std::move(_first_key), std::move(_last_key), _index_sampling_state); _index_needs_close = false; - _index.close().get(); + _index.close(); if (_sst.has_component(component_type::CompressionInfo)) { _sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length()); @@ -2472,7 +2472,7 @@ void components_writer::consume_end_of_stream() { components_writer::~components_writer() { if (_index_needs_close) { try { - _index.close().get(); + _index.close(); } catch (...) { sstlog.error("components_writer failed to close file: {}", std::current_exception()); } @@ -2588,7 +2588,7 @@ void sstable_writer_k_l::prepare_file_writer() void sstable_writer_k_l::finish_file_writer() { auto writer = std::move(_writer); - writer->close().get(); + writer->close(); if (!_compression_enabled) { auto chksum_wr = static_cast(writer.get()); @@ -2602,7 +2602,7 @@ void sstable_writer_k_l::finish_file_writer() sstable_writer_k_l::~sstable_writer_k_l() { if (_writer) { try { - _writer->close().get(); + _writer->close(); } catch (...) { sstlog.error("sstable_writer failed to close file: {}", std::current_exception()); } @@ -2939,7 +2939,7 @@ private: void flush_tmp_bufs() { for (auto&& buf : _tmp_bufs.buffers()) { - _data_writer->write(buf.get(), buf.size()).get(); + _data_writer->write(buf.get(), buf.size()); } _tmp_bufs.clear(); } @@ -2987,7 +2987,7 @@ sstable_writer_m::~sstable_writer_m() { auto close_writer = [](auto& writer) { if (writer) { try { - writer->close().get(); + writer->close(); } catch (...) { sstlog.error("sstable_writer_m failed to close file: {}", std::current_exception()); } @@ -3058,7 +3058,7 @@ void sstable_writer_m::init_file_writers() { void sstable_writer_m::close_data_writer() { auto writer = std::move(_data_writer); - writer->close().get(); + writer->close(); if (!_compression_enabled) { auto chksum_wr = static_cast(writer.get()); @@ -3376,8 +3376,8 @@ uint64_t calculate_write_size(Func&& func) { { auto counting_writer = file_writer(make_sizing_output_stream(written_size)); func(counting_writer); - counting_writer.flush().get(); - counting_writer.close().get(); + counting_writer.flush(); + counting_writer.close(); } return written_size; } @@ -3420,7 +3420,7 @@ void sstable_writer_m::write_static_row(const row& static_row) { write(_sst.get_version(), *_data_writer, row_extended_flags::is_static); write_cells(_tmp_writer, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion); - _tmp_writer.flush().get(); + _tmp_writer.flush(); uint64_t row_body_size = _tmp_bufs.size() + unsigned_vint::serialized_size(0); write_vint(*_data_writer, row_body_size); @@ -3474,7 +3474,7 @@ void sstable_writer_m::write_clustered(const clustering_row& clustered_row, uint write_clustering_prefix(*_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()}); write_row_body(_tmp_writer, clustered_row, has_complex_deletion); - _tmp_writer.flush().get(); + _tmp_writer.flush(); uint64_t row_body_size = _tmp_bufs.size() + unsigned_vint::serialized_size(prev_row_size); write_vint(*_data_writer, row_body_size); @@ -3616,7 +3616,7 @@ void sstable_writer_m::consume_end_of_stream() { _sst.get_metadata_collector().add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length()); } - _index_writer->close().get(); + _index_writer->close(); _index_writer.reset(); _sst.set_first_and_last_keys(); seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(), @@ -3637,7 +3637,7 @@ void sstable_writer_m::consume_end_of_stream() { if (!_cfg.leave_unsealed) { _sst.seal_sstable(_cfg.backup).get(); } - _tmp_writer.close().get(); + _tmp_writer.close(); _cfg.monitor->on_flush_completed(); } diff --git a/sstables/writer.hh b/sstables/writer.hh index 7b9cac2fa0..3a9a67d1b3 100644 --- a/sstables/writer.hh +++ b/sstables/writer.hh @@ -43,20 +43,23 @@ public: virtual ~file_writer() = default; file_writer(file_writer&&) = default; - - future<> write(const char* buf, size_t n) { + // Must be called in a seastar thread. + void write(const char* buf, size_t n) { _offset.offset += n; - return _out.write(buf, n); + _out.write(buf, n).get(); } - future<> write(const bytes& s) { + // Must be called in a seastar thread. + void write(const bytes& s) { _offset.offset += s.size(); - return _out.write(s); + _out.write(s).get(); } - future<> flush() { - return _out.flush(); + // Must be called in a seastar thread. + void flush() { + _out.flush().get(); } - future<> close() { - return _out.close(); + // Must be called in a seastar thread. + void close() { + _out.close().get(); } uint64_t offset() const { return _offset.offset; @@ -110,8 +113,8 @@ serialized_size(sstable_version_types v, const T& object) { uint64_t size = 0; auto writer = file_writer(make_sizing_output_stream(size)); write(v, writer, object); - writer.flush().get(); - writer.close().get(); + writer.flush(); + writer.close(); return size; }