mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
sstables: de-futurize write path
The sstables write path has been partially de-futurized, but now creates a ton of threads, and yet does not exploit this as everything is serialized. Remove those extra threads and futures and use a single thread to write everything. If needed, we'll employ write-behind in output_stream to increase parallelism. Reviewed-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
@@ -33,9 +33,9 @@ future<> sstable::read_filter() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable::write_filter() {
|
||||
void sstable::write_filter() {
|
||||
if (!has_component(sstable::component_type::Filter)) {
|
||||
return make_ready_future<>();
|
||||
return;
|
||||
}
|
||||
|
||||
auto f = static_cast<utils::filter::murmur3_bloom_filter *>(_filter.get());
|
||||
@@ -43,8 +43,8 @@ future<> sstable::write_filter() {
|
||||
std::vector<utils::filter::bloom_filter::bitmap_block> v;
|
||||
boost::to_block_range(f->bits(), std::back_inserter(v));
|
||||
|
||||
return do_with(sstables::filter(f->num_hashes(), std::move(v)), [this] (auto& filter) {
|
||||
return this->write_simple<sstable::component_type::Filter>(filter);
|
||||
});
|
||||
auto filter = sstables::filter(f->num_hashes(), std::move(v));
|
||||
write_simple<sstable::component_type::Filter>(filter);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -675,57 +675,48 @@ future<> sstable::read_toc() {
|
||||
|
||||
}
|
||||
|
||||
future<> sstable::write_toc() {
|
||||
void sstable::write_toc() {
|
||||
auto file_path = filename(sstable::component_type::TOC);
|
||||
|
||||
sstlog.debug("Writing TOC file {} ", file_path);
|
||||
|
||||
return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) {
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = make_shared<file_writer>(std::move(out));
|
||||
file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = file_writer(std::move(out));
|
||||
|
||||
return do_for_each(_components, [this, w] (auto key) {
|
||||
for (auto&& key : _components) {
|
||||
// new line character is appended to the end of each component name.
|
||||
auto value = _component_map[key] + "\n";
|
||||
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
|
||||
return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); });
|
||||
}).then([w] {
|
||||
return w->flush().then([w] {
|
||||
return w->close().then([w] {});
|
||||
});
|
||||
});
|
||||
});
|
||||
auto value = _component_map[key] + "\n";
|
||||
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
|
||||
write(w, b);
|
||||
}
|
||||
w.flush().get();
|
||||
w.close().get();
|
||||
}
|
||||
|
||||
future<> write_crc(const sstring file_path, checksum& c) {
|
||||
void write_crc(const sstring file_path, checksum& c) {
|
||||
sstlog.debug("Writing CRC file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
return engine().open_file_dma(file_path, oflags).then([&c] (file f) {
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = make_shared<file_writer>(std::move(out));
|
||||
|
||||
return seastar::async([w, &c] () { write(*w, c); }).then([w] {
|
||||
return w->close().then([w] {});
|
||||
});
|
||||
});
|
||||
file f = engine().open_file_dma(file_path, oflags).get0();
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = file_writer(std::move(out));
|
||||
write(w, c);
|
||||
w.close().get();
|
||||
}
|
||||
|
||||
// Digest file stores the full checksum of data file converted into a string.
|
||||
future<> write_digest(const sstring file_path, uint32_t full_checksum) {
|
||||
void write_digest(const sstring file_path, uint32_t full_checksum) {
|
||||
sstlog.debug("Writing Digest file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
return engine().open_file_dma(file_path, oflags).then([full_checksum] (file f) {
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = make_shared<file_writer>(std::move(out));
|
||||
auto f = engine().open_file_dma(file_path, oflags).get0();
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = file_writer(std::move(out));
|
||||
|
||||
return do_with(to_sstring<bytes>(full_checksum), [w] (bytes& digest) {
|
||||
return seastar::async([w, &digest] { write(*w, digest); }).then([w] {
|
||||
return w->close().then([w] {});
|
||||
});
|
||||
});
|
||||
});
|
||||
auto digest = to_sstring<bytes>(full_checksum);
|
||||
write(w, digest);
|
||||
w.close().get();
|
||||
}
|
||||
|
||||
future<index_list> sstable::read_indexes(uint64_t position, uint64_t quantity) {
|
||||
@@ -803,22 +794,19 @@ future<> sstable::read_simple(T& component) {
|
||||
}
|
||||
|
||||
template <sstable::component_type Type, typename T>
|
||||
future<> sstable::write_simple(T& component) {
|
||||
void sstable::write_simple(T& component) {
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
|
||||
return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this, &component] (file f) {
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = make_shared<file_writer>(std::move(out));
|
||||
auto fut = seastar::async([w, &component] () mutable { write(*w, component); });
|
||||
return fut.then([w] {
|
||||
return w->flush().then([w] {
|
||||
return w->close().then([w] {}); // the underlying file is synced here.
|
||||
});
|
||||
});
|
||||
});
|
||||
file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
auto out = file_writer(std::move(f), 4096);
|
||||
auto w = file_writer(std::move(out));
|
||||
write(w, component);
|
||||
w.flush().get();
|
||||
w.close().get();
|
||||
}
|
||||
|
||||
template future<> sstable::read_simple<sstable::component_type::Filter>(sstables::filter& f);
|
||||
template future<> sstable::write_simple<sstable::component_type::Filter>(sstables::filter& f);
|
||||
template void sstable::write_simple<sstable::component_type::Filter>(sstables::filter& f);
|
||||
|
||||
future<> sstable::read_compression() {
|
||||
// FIXME: If there is no compression, we should expect a CRC file to be present.
|
||||
@@ -829,20 +817,20 @@ future<> sstable::read_compression() {
|
||||
return read_simple<component_type::CompressionInfo>(_compression);
|
||||
}
|
||||
|
||||
future<> sstable::write_compression() {
|
||||
void sstable::write_compression() {
|
||||
if (!has_component(sstable::component_type::CompressionInfo)) {
|
||||
return make_ready_future<>();
|
||||
return;
|
||||
}
|
||||
|
||||
return write_simple<component_type::CompressionInfo>(_compression);
|
||||
write_simple<component_type::CompressionInfo>(_compression);
|
||||
}
|
||||
|
||||
future<> sstable::read_statistics() {
|
||||
return read_simple<component_type::Statistics>(_statistics);
|
||||
}
|
||||
|
||||
future<> sstable::write_statistics() {
|
||||
return write_simple<component_type::Statistics>(_statistics);
|
||||
void sstable::write_statistics() {
|
||||
write_simple<component_type::Statistics>(_statistics);
|
||||
}
|
||||
|
||||
future<> sstable::open_data() {
|
||||
@@ -891,14 +879,11 @@ future<> sstable::load() {
|
||||
|
||||
future<> sstable::store() {
|
||||
// TODO: write other components as well.
|
||||
return write_toc().then([this] {
|
||||
return write_statistics();
|
||||
}).then([this] {
|
||||
return write_compression();
|
||||
}).then([this] {
|
||||
return write_filter();
|
||||
}).then([this] {
|
||||
return write_summary();
|
||||
return seastar::async([this] {
|
||||
write_statistics();
|
||||
write_compression();
|
||||
write_filter();
|
||||
write_summary();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1328,8 +1313,8 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
|
||||
w->close().get();
|
||||
_data_file = file(); // w->close() closed _data_file
|
||||
|
||||
write_digest(filename(sstable::component_type::Digest), w->full_checksum()).get();
|
||||
write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()).get();
|
||||
write_digest(filename(sstable::component_type::Digest), w->full_checksum());
|
||||
write_crc(filename(sstable::component_type::CRC), w->finalize_checksum());
|
||||
} else {
|
||||
prepare_compression(_compression, *schema);
|
||||
auto w = make_shared<file_writer>(make_compressed_file_output_stream(_data_file, &_compression));
|
||||
@@ -1338,7 +1323,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
|
||||
w->close().get();
|
||||
_data_file = file(); // w->close() closed _data_file
|
||||
|
||||
write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()).get();
|
||||
write_digest(filename(sstable::component_type::Digest), _compression.full_checksum());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1349,24 +1334,16 @@ future<> sstable::write_components(const memtable& mt) {
|
||||
|
||||
future<> sstable::write_components(::mutation_reader mr,
|
||||
uint64_t estimated_partitions, schema_ptr schema) {
|
||||
return touch_directory(_dir).then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
|
||||
return create_data().then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
|
||||
auto w = [this] (::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema) {
|
||||
this->prepare_write_components(std::move(mr), estimated_partitions, std::move(schema));
|
||||
};
|
||||
return seastar::async(std::move(w), std::move(mr), estimated_partitions, std::move(schema)).then([this] {
|
||||
return write_summary();
|
||||
}).then([this] {
|
||||
return write_filter();
|
||||
}).then([this] {
|
||||
return write_statistics();
|
||||
}).then([this] {
|
||||
// NOTE: write_compression means maybe_write_compression.
|
||||
return write_compression();
|
||||
}).then([this] {
|
||||
return write_toc();
|
||||
});
|
||||
});
|
||||
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
|
||||
touch_directory(_dir).get();
|
||||
create_data().get();
|
||||
prepare_write_components(std::move(mr), estimated_partitions, std::move(schema));
|
||||
write_summary();
|
||||
write_filter();
|
||||
write_statistics();
|
||||
// NOTE: write_compression means maybe_write_compression.
|
||||
write_compression();
|
||||
write_toc();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -282,27 +282,27 @@ private:
|
||||
future<> read_simple(T& comp);
|
||||
|
||||
template <sstable::component_type Type, typename T>
|
||||
future<> write_simple(T& comp);
|
||||
void write_simple(T& comp);
|
||||
|
||||
future<> read_toc();
|
||||
future<> write_toc();
|
||||
void write_toc();
|
||||
|
||||
future<> read_compression();
|
||||
future<> write_compression();
|
||||
void write_compression();
|
||||
|
||||
future<> read_filter();
|
||||
|
||||
future<> write_filter();
|
||||
void write_filter();
|
||||
|
||||
future<> read_summary() {
|
||||
return read_simple<component_type::Summary>(_summary);
|
||||
}
|
||||
future<> write_summary() {
|
||||
return write_simple<component_type::Summary>(_summary);
|
||||
void write_summary() {
|
||||
write_simple<component_type::Summary>(_summary);
|
||||
}
|
||||
|
||||
future<> read_statistics();
|
||||
future<> write_statistics();
|
||||
void write_statistics();
|
||||
|
||||
future<> open_data();
|
||||
future<> create_data();
|
||||
|
||||
Reference in New Issue
Block a user