diff --git a/sstables/filter.cc b/sstables/filter.cc index e750ffca20..b079d098ff 100644 --- a/sstables/filter.cc +++ b/sstables/filter.cc @@ -33,9 +33,9 @@ future<> sstable::read_filter() { }); } -future<> sstable::write_filter() { +void sstable::write_filter() { if (!has_component(sstable::component_type::Filter)) { - return make_ready_future<>(); + return; } auto f = static_cast(_filter.get()); @@ -43,8 +43,8 @@ future<> sstable::write_filter() { std::vector v; boost::to_block_range(f->bits(), std::back_inserter(v)); - return do_with(sstables::filter(f->num_hashes(), std::move(v)), [this] (auto& filter) { - return this->write_simple(filter); - }); + auto filter = sstables::filter(f->num_hashes(), std::move(v)); + write_simple(filter); } + } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index f8d939860a..2d01cbcda5 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -675,57 +675,48 @@ future<> sstable::read_toc() { } -future<> sstable::write_toc() { +void sstable::write_toc() { auto file_path = filename(sstable::component_type::TOC); sstlog.debug("Writing TOC file {} ", file_path); - return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) { - auto out = file_writer(std::move(f), 4096); - auto w = make_shared(std::move(out)); + file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0(); + auto out = file_writer(std::move(f), 4096); + auto w = file_writer(std::move(out)); - return do_for_each(_components, [this, w] (auto key) { + for (auto&& key : _components) { // new line character is appended to the end of each component name. - auto value = _component_map[key] + "\n"; - bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); - return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); }); - }).then([w] { - return w->flush().then([w] { - return w->close().then([w] {}); - }); - }); - }); + auto value = _component_map[key] + "\n"; + bytes b = bytes(reinterpret_cast(value.c_str()), value.size()); + write(w, b); + } + w.flush().get(); + w.close().get(); } -future<> write_crc(const sstring file_path, checksum& c) { +void write_crc(const sstring file_path, checksum& c) { sstlog.debug("Writing CRC file {} ", file_path); auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive; - return engine().open_file_dma(file_path, oflags).then([&c] (file f) { - auto out = file_writer(std::move(f), 4096); - auto w = make_shared(std::move(out)); - - return seastar::async([w, &c] () { write(*w, c); }).then([w] { - return w->close().then([w] {}); - }); - }); + file f = engine().open_file_dma(file_path, oflags).get0(); + auto out = file_writer(std::move(f), 4096); + auto w = file_writer(std::move(out)); + write(w, c); + w.close().get(); } // Digest file stores the full checksum of data file converted into a string. -future<> write_digest(const sstring file_path, uint32_t full_checksum) { +void write_digest(const sstring file_path, uint32_t full_checksum) { sstlog.debug("Writing Digest file {} ", file_path); auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive; - return engine().open_file_dma(file_path, oflags).then([full_checksum] (file f) { - auto out = file_writer(std::move(f), 4096); - auto w = make_shared(std::move(out)); + auto f = engine().open_file_dma(file_path, oflags).get0(); + auto out = file_writer(std::move(f), 4096); + auto w = file_writer(std::move(out)); - return do_with(to_sstring(full_checksum), [w] (bytes& digest) { - return seastar::async([w, &digest] { write(*w, digest); }).then([w] { - return w->close().then([w] {}); - }); - }); - }); + auto digest = to_sstring(full_checksum); + write(w, digest); + w.close().get(); } future sstable::read_indexes(uint64_t position, uint64_t quantity) { @@ -803,22 +794,19 @@ future<> sstable::read_simple(T& component) { } template -future<> sstable::write_simple(T& component) { +void sstable::write_simple(T& component) { auto file_path = filename(Type); sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path); - return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this, &component] (file f) { - auto out = file_writer(std::move(f), 4096); - auto w = make_shared(std::move(out)); - auto fut = seastar::async([w, &component] () mutable { write(*w, component); }); - return fut.then([w] { - return w->flush().then([w] { - return w->close().then([w] {}); // the underlying file is synced here. - }); - }); - }); + file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0(); + auto out = file_writer(std::move(f), 4096); + auto w = file_writer(std::move(out)); + write(w, component); + w.flush().get(); + w.close().get(); } + template future<> sstable::read_simple(sstables::filter& f); -template future<> sstable::write_simple(sstables::filter& f); +template void sstable::write_simple(sstables::filter& f); future<> sstable::read_compression() { // FIXME: If there is no compression, we should expect a CRC file to be present. @@ -829,20 +817,20 @@ future<> sstable::read_compression() { return read_simple(_compression); } -future<> sstable::write_compression() { +void sstable::write_compression() { if (!has_component(sstable::component_type::CompressionInfo)) { - return make_ready_future<>(); + return; } - return write_simple(_compression); + write_simple(_compression); } future<> sstable::read_statistics() { return read_simple(_statistics); } -future<> sstable::write_statistics() { - return write_simple(_statistics); +void sstable::write_statistics() { + write_simple(_statistics); } future<> sstable::open_data() { @@ -891,14 +879,11 @@ future<> sstable::load() { future<> sstable::store() { // TODO: write other components as well. - return write_toc().then([this] { - return write_statistics(); - }).then([this] { - return write_compression(); - }).then([this] { - return write_filter(); - }).then([this] { - return write_summary(); + return seastar::async([this] { + write_statistics(); + write_compression(); + write_filter(); + write_summary(); }); } @@ -1328,8 +1313,8 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_ w->close().get(); _data_file = file(); // w->close() closed _data_file - write_digest(filename(sstable::component_type::Digest), w->full_checksum()).get(); - write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()).get(); + write_digest(filename(sstable::component_type::Digest), w->full_checksum()); + write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()); } else { prepare_compression(_compression, *schema); auto w = make_shared(make_compressed_file_output_stream(_data_file, &_compression)); @@ -1338,7 +1323,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_ w->close().get(); _data_file = file(); // w->close() closed _data_file - write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()).get(); + write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()); } } @@ -1349,24 +1334,16 @@ future<> sstable::write_components(const memtable& mt) { future<> sstable::write_components(::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema) { - return touch_directory(_dir).then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] { - return create_data().then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] { - auto w = [this] (::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema) { - this->prepare_write_components(std::move(mr), estimated_partitions, std::move(schema)); - }; - return seastar::async(std::move(w), std::move(mr), estimated_partitions, std::move(schema)).then([this] { - return write_summary(); - }).then([this] { - return write_filter(); - }).then([this] { - return write_statistics(); - }).then([this] { - // NOTE: write_compression means maybe_write_compression. - return write_compression(); - }).then([this] { - return write_toc(); - }); - }); + return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] { + touch_directory(_dir).get(); + create_data().get(); + prepare_write_components(std::move(mr), estimated_partitions, std::move(schema)); + write_summary(); + write_filter(); + write_statistics(); + // NOTE: write_compression means maybe_write_compression. + write_compression(); + write_toc(); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 2d23e53b7b..ca4e3d7835 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -282,27 +282,27 @@ private: future<> read_simple(T& comp); template - future<> write_simple(T& comp); + void write_simple(T& comp); future<> read_toc(); - future<> write_toc(); + void write_toc(); future<> read_compression(); - future<> write_compression(); + void write_compression(); future<> read_filter(); - future<> write_filter(); + void write_filter(); future<> read_summary() { return read_simple(_summary); } - future<> write_summary() { - return write_simple(_summary); + void write_summary() { + write_simple(_summary); } future<> read_statistics(); - future<> write_statistics(); + void write_statistics(); future<> open_data(); future<> create_data();