sstables: de-futurize write path

The sstables write path has been partially de-futurized, but now creates a
ton of threads, and yet does not exploit this as everything is serialized.

Remove those extra threads and futures and use a single thread to write
everything.  If needed, we'll employ write-behind in output_stream to
increase parallelism.

Reviewed-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
Avi Kivity
2015-08-02 19:05:27 +03:00
parent 2929510ef3
commit 3a5e3c8829
3 changed files with 68 additions and 91 deletions

View File

@@ -33,9 +33,9 @@ future<> sstable::read_filter() {
});
}
future<> sstable::write_filter() {
void sstable::write_filter() {
if (!has_component(sstable::component_type::Filter)) {
return make_ready_future<>();
return;
}
auto f = static_cast<utils::filter::murmur3_bloom_filter *>(_filter.get());
@@ -43,8 +43,8 @@ future<> sstable::write_filter() {
std::vector<utils::filter::bloom_filter::bitmap_block> v;
boost::to_block_range(f->bits(), std::back_inserter(v));
return do_with(sstables::filter(f->num_hashes(), std::move(v)), [this] (auto& filter) {
return this->write_simple<sstable::component_type::Filter>(filter);
});
auto filter = sstables::filter(f->num_hashes(), std::move(v));
write_simple<sstable::component_type::Filter>(filter);
}
}

View File

@@ -675,57 +675,48 @@ future<> sstable::read_toc() {
}
future<> sstable::write_toc() {
void sstable::write_toc() {
auto file_path = filename(sstable::component_type::TOC);
sstlog.debug("Writing TOC file {} ", file_path);
return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this] (file f) {
auto out = file_writer(std::move(f), 4096);
auto w = make_shared<file_writer>(std::move(out));
file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
auto out = file_writer(std::move(f), 4096);
auto w = file_writer(std::move(out));
return do_for_each(_components, [this, w] (auto key) {
for (auto&& key : _components) {
// new line character is appended to the end of each component name.
auto value = _component_map[key] + "\n";
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
return seastar::async([w, b = std::move(b)] () mutable { write(*w, b); });
}).then([w] {
return w->flush().then([w] {
return w->close().then([w] {});
});
});
});
auto value = _component_map[key] + "\n";
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
write(w, b);
}
w.flush().get();
w.close().get();
}
future<> write_crc(const sstring file_path, checksum& c) {
void write_crc(const sstring file_path, checksum& c) {
sstlog.debug("Writing CRC file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
return engine().open_file_dma(file_path, oflags).then([&c] (file f) {
auto out = file_writer(std::move(f), 4096);
auto w = make_shared<file_writer>(std::move(out));
return seastar::async([w, &c] () { write(*w, c); }).then([w] {
return w->close().then([w] {});
});
});
file f = engine().open_file_dma(file_path, oflags).get0();
auto out = file_writer(std::move(f), 4096);
auto w = file_writer(std::move(out));
write(w, c);
w.close().get();
}
// Digest file stores the full checksum of data file converted into a string.
future<> write_digest(const sstring file_path, uint32_t full_checksum) {
void write_digest(const sstring file_path, uint32_t full_checksum) {
sstlog.debug("Writing Digest file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
return engine().open_file_dma(file_path, oflags).then([full_checksum] (file f) {
auto out = file_writer(std::move(f), 4096);
auto w = make_shared<file_writer>(std::move(out));
auto f = engine().open_file_dma(file_path, oflags).get0();
auto out = file_writer(std::move(f), 4096);
auto w = file_writer(std::move(out));
return do_with(to_sstring<bytes>(full_checksum), [w] (bytes& digest) {
return seastar::async([w, &digest] { write(*w, digest); }).then([w] {
return w->close().then([w] {});
});
});
});
auto digest = to_sstring<bytes>(full_checksum);
write(w, digest);
w.close().get();
}
future<index_list> sstable::read_indexes(uint64_t position, uint64_t quantity) {
@@ -803,22 +794,19 @@ future<> sstable::read_simple(T& component) {
}
template <sstable::component_type Type, typename T>
future<> sstable::write_simple(T& component) {
void sstable::write_simple(T& component) {
auto file_path = filename(Type);
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
return engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).then([this, &component] (file f) {
auto out = file_writer(std::move(f), 4096);
auto w = make_shared<file_writer>(std::move(out));
auto fut = seastar::async([w, &component] () mutable { write(*w, component); });
return fut.then([w] {
return w->flush().then([w] {
return w->close().then([w] {}); // the underlying file is synced here.
});
});
});
file f = engine().open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
auto out = file_writer(std::move(f), 4096);
auto w = file_writer(std::move(out));
write(w, component);
w.flush().get();
w.close().get();
}
template future<> sstable::read_simple<sstable::component_type::Filter>(sstables::filter& f);
template future<> sstable::write_simple<sstable::component_type::Filter>(sstables::filter& f);
template void sstable::write_simple<sstable::component_type::Filter>(sstables::filter& f);
future<> sstable::read_compression() {
// FIXME: If there is no compression, we should expect a CRC file to be present.
@@ -829,20 +817,20 @@ future<> sstable::read_compression() {
return read_simple<component_type::CompressionInfo>(_compression);
}
future<> sstable::write_compression() {
void sstable::write_compression() {
if (!has_component(sstable::component_type::CompressionInfo)) {
return make_ready_future<>();
return;
}
return write_simple<component_type::CompressionInfo>(_compression);
write_simple<component_type::CompressionInfo>(_compression);
}
future<> sstable::read_statistics() {
return read_simple<component_type::Statistics>(_statistics);
}
future<> sstable::write_statistics() {
return write_simple<component_type::Statistics>(_statistics);
void sstable::write_statistics() {
write_simple<component_type::Statistics>(_statistics);
}
future<> sstable::open_data() {
@@ -891,14 +879,11 @@ future<> sstable::load() {
future<> sstable::store() {
// TODO: write other components as well.
return write_toc().then([this] {
return write_statistics();
}).then([this] {
return write_compression();
}).then([this] {
return write_filter();
}).then([this] {
return write_summary();
return seastar::async([this] {
write_statistics();
write_compression();
write_filter();
write_summary();
});
}
@@ -1328,8 +1313,8 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
w->close().get();
_data_file = file(); // w->close() closed _data_file
write_digest(filename(sstable::component_type::Digest), w->full_checksum()).get();
write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()).get();
write_digest(filename(sstable::component_type::Digest), w->full_checksum());
write_crc(filename(sstable::component_type::CRC), w->finalize_checksum());
} else {
prepare_compression(_compression, *schema);
auto w = make_shared<file_writer>(make_compressed_file_output_stream(_data_file, &_compression));
@@ -1338,7 +1323,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
w->close().get();
_data_file = file(); // w->close() closed _data_file
write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()).get();
write_digest(filename(sstable::component_type::Digest), _compression.full_checksum());
}
}
@@ -1349,24 +1334,16 @@ future<> sstable::write_components(const memtable& mt) {
future<> sstable::write_components(::mutation_reader mr,
uint64_t estimated_partitions, schema_ptr schema) {
return touch_directory(_dir).then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
return create_data().then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
auto w = [this] (::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema) {
this->prepare_write_components(std::move(mr), estimated_partitions, std::move(schema));
};
return seastar::async(std::move(w), std::move(mr), estimated_partitions, std::move(schema)).then([this] {
return write_summary();
}).then([this] {
return write_filter();
}).then([this] {
return write_statistics();
}).then([this] {
// NOTE: write_compression means maybe_write_compression.
return write_compression();
}).then([this] {
return write_toc();
});
});
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
touch_directory(_dir).get();
create_data().get();
prepare_write_components(std::move(mr), estimated_partitions, std::move(schema));
write_summary();
write_filter();
write_statistics();
// NOTE: write_compression means maybe_write_compression.
write_compression();
write_toc();
});
}

View File

@@ -282,27 +282,27 @@ private:
future<> read_simple(T& comp);
template <sstable::component_type Type, typename T>
future<> write_simple(T& comp);
void write_simple(T& comp);
future<> read_toc();
future<> write_toc();
void write_toc();
future<> read_compression();
future<> write_compression();
void write_compression();
future<> read_filter();
future<> write_filter();
void write_filter();
future<> read_summary() {
return read_simple<component_type::Summary>(_summary);
}
future<> write_summary() {
return write_simple<component_type::Summary>(_summary);
void write_summary() {
write_simple<component_type::Summary>(_summary);
}
future<> read_statistics();
future<> write_statistics();
void write_statistics();
future<> open_data();
future<> create_data();