sstables: De-futurize file_writer

This commit is contained in:
Tomasz Grabiec
2018-12-06 17:27:38 +01:00
parent a1fb441df8
commit 13999a4d09
2 changed files with 45 additions and 42 deletions

View File

@@ -246,7 +246,7 @@ write(sstable_version_types v, file_writer& out, T i) {
auto *nr = reinterpret_cast<const net::packed<T> *>(&i);
i = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&i);
out.write(p, sizeof(T)).get();
out.write(p, sizeof(T));
}
template <typename T>
@@ -295,7 +295,7 @@ inline void write(sstable_version_types v, file_writer& out, double d) {
auto *nr = reinterpret_cast<const net::packed<unsigned long> *>(&d);
auto tmp = net::hton(*nr);
auto p = reinterpret_cast<const char*>(&tmp);
out.write(p, sizeof(unsigned long)).get();
out.write(p, sizeof(unsigned long));
}
template <typename T>
@@ -308,11 +308,11 @@ future<> parse(sstable_version_types, random_access_reader& in, T& len, bytes& s
}
inline void write(sstable_version_types v, file_writer& out, const bytes& s) {
out.write(s).get();
out.write(s);
}
inline void write(sstable_version_types v, file_writer& out, bytes_view s) {
out.write(reinterpret_cast<const char*>(s.data()), s.size()).get();
out.write(reinterpret_cast<const char*>(s.data()), s.size());
}
inline void write(sstable_version_types v, file_writer& out, bytes_ostream s) {
@@ -363,7 +363,7 @@ future<> parse(sstable_version_types, random_access_reader& in, utils::UUID& uui
}
inline void write(sstable_version_types v, file_writer& out, const utils::UUID& uuid) {
out.write(uuid.serialize()).get();
out.write(uuid.serialize());
}
template <class T>
@@ -514,7 +514,7 @@ write(sstable_version_types v, file_writer& out, const utils::chunked_vector<Mem
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = now * sizeof(Members);
out.write(p, bytes).get();
out.write(p, bytes);
idx += now;
}
}
@@ -757,7 +757,7 @@ inline void write(sstable_version_types v, file_writer& out, const summary_entry
// endianness. We can treat it as little endian to preserve portability.
write(v, out, entry.key);
auto p = seastar::cpu_to_le<uint64_t>(entry.position);
out.write(reinterpret_cast<const char*>(&p), sizeof(p)).get();
out.write(reinterpret_cast<const char*>(&p), sizeof(p));
}
inline void write(sstable_version_types v, file_writer& out, const summary& s) {
@@ -769,7 +769,7 @@ inline void write(sstable_version_types v, file_writer& out, const summary& s) {
s.header.size_at_full_sampling);
for (auto&& e : s.positions) {
auto p = seastar::cpu_to_le(e);
out.write(reinterpret_cast<const char*>(&p), sizeof(p)).get();
out.write(reinterpret_cast<const char*>(&p), sizeof(p));
}
write(v, out, s.entries);
write(v, out, s.first_key, s.last_key);
@@ -900,7 +900,7 @@ inline void write(sstable_version_types v, file_writer& out, const utils::estima
auto p = reinterpret_cast<const char*>(elements.data());
auto bytes = elements.size() * sizeof(element);
out.write(p, bytes).get();
out.write(p, bytes);
}
struct streaming_histogram_element {
@@ -1010,7 +1010,7 @@ void write(sstable_version_types v, file_writer& out, const compression& c) {
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = now * sizeof(uint64_t);
out.write(p, bytes).get();
out.write(p, bytes);
idx += now;
}
}
@@ -1125,8 +1125,8 @@ void sstable::write_toc(const io_priority_class& pc) {
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
write(_version, w, b);
}
w.flush().get();
w.close().get();
w.flush();
w.close();
// Flushing parent directory to guarantee that temporary TOC file reached
// the disk.
@@ -1172,7 +1172,7 @@ void sstable::write_crc(const checksum& c) {
options.buffer_size = 4096;
auto w = file_writer(std::move(f), std::move(options));
write(get_version(), w, c);
w.close().get();
w.close();
}
// Digest file stores the full checksum of data file converted into a string.
@@ -1189,7 +1189,7 @@ void sstable::write_digest(uint32_t full_checksum) {
auto digest = to_sstring<bytes>(full_checksum);
write(get_version(), w, digest);
w.close().get();
w.close();
}
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
@@ -1236,8 +1236,8 @@ void sstable::write_simple(const T& component, const io_priority_class& pc) {
options.io_priority_class = pc;
auto w = file_writer(std::move(f), std::move(options));
write(_version, w, component);
w.flush().get();
w.close().get();
w.flush();
w.close();
}
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f, const io_priority_class& pc);
@@ -1374,8 +1374,8 @@ void sstable::rewrite_statistics(const io_priority_class& pc) {
options.io_priority_class = pc;
auto w = file_writer(std::move(f), std::move(options));
write(_version, w, _components->statistics);
w.flush().get();
w.close().get();
w.flush();
w.close();
// rename() guarantees atomicity when renaming a file into place.
sstable_write_io_check(rename_file, file_path, filename(component_type::Statistics)).get();
}
@@ -2458,7 +2458,7 @@ void components_writer::consume_end_of_stream() {
seal_summary(_sst._components->summary, std::move(_first_key), std::move(_last_key), _index_sampling_state);
_index_needs_close = false;
_index.close().get();
_index.close();
if (_sst.has_component(component_type::CompressionInfo)) {
_sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
@@ -2472,7 +2472,7 @@ void components_writer::consume_end_of_stream() {
components_writer::~components_writer() {
if (_index_needs_close) {
try {
_index.close().get();
_index.close();
} catch (...) {
sstlog.error("components_writer failed to close file: {}", std::current_exception());
}
@@ -2588,7 +2588,7 @@ void sstable_writer_k_l::prepare_file_writer()
void sstable_writer_k_l::finish_file_writer()
{
auto writer = std::move(_writer);
writer->close().get();
writer->close();
if (!_compression_enabled) {
auto chksum_wr = static_cast<adler32_checksummed_file_writer*>(writer.get());
@@ -2602,7 +2602,7 @@ void sstable_writer_k_l::finish_file_writer()
sstable_writer_k_l::~sstable_writer_k_l() {
if (_writer) {
try {
_writer->close().get();
_writer->close();
} catch (...) {
sstlog.error("sstable_writer failed to close file: {}", std::current_exception());
}
@@ -2939,7 +2939,7 @@ private:
void flush_tmp_bufs() {
for (auto&& buf : _tmp_bufs.buffers()) {
_data_writer->write(buf.get(), buf.size()).get();
_data_writer->write(buf.get(), buf.size());
}
_tmp_bufs.clear();
}
@@ -2987,7 +2987,7 @@ sstable_writer_m::~sstable_writer_m() {
auto close_writer = [](auto& writer) {
if (writer) {
try {
writer->close().get();
writer->close();
} catch (...) {
sstlog.error("sstable_writer_m failed to close file: {}", std::current_exception());
}
@@ -3058,7 +3058,7 @@ void sstable_writer_m::init_file_writers() {
void sstable_writer_m::close_data_writer() {
auto writer = std::move(_data_writer);
writer->close().get();
writer->close();
if (!_compression_enabled) {
auto chksum_wr = static_cast<crc32_checksummed_file_writer*>(writer.get());
@@ -3376,8 +3376,8 @@ uint64_t calculate_write_size(Func&& func) {
{
auto counting_writer = file_writer(make_sizing_output_stream(written_size));
func(counting_writer);
counting_writer.flush().get();
counting_writer.close().get();
counting_writer.flush();
counting_writer.close();
}
return written_size;
}
@@ -3420,7 +3420,7 @@ void sstable_writer_m::write_static_row(const row& static_row) {
write(_sst.get_version(), *_data_writer, row_extended_flags::is_static);
write_cells(_tmp_writer, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion);
_tmp_writer.flush().get();
_tmp_writer.flush();
uint64_t row_body_size = _tmp_bufs.size() + unsigned_vint::serialized_size(0);
write_vint(*_data_writer, row_body_size);
@@ -3474,7 +3474,7 @@ void sstable_writer_m::write_clustered(const clustering_row& clustered_row, uint
write_clustering_prefix(*_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
write_row_body(_tmp_writer, clustered_row, has_complex_deletion);
_tmp_writer.flush().get();
_tmp_writer.flush();
uint64_t row_body_size = _tmp_bufs.size() + unsigned_vint::serialized_size(prev_row_size);
write_vint(*_data_writer, row_body_size);
@@ -3616,7 +3616,7 @@ void sstable_writer_m::consume_end_of_stream() {
_sst.get_metadata_collector().add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
_index_writer->close().get();
_index_writer->close();
_index_writer.reset();
_sst.set_first_and_last_keys();
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(),
@@ -3637,7 +3637,7 @@ void sstable_writer_m::consume_end_of_stream() {
if (!_cfg.leave_unsealed) {
_sst.seal_sstable(_cfg.backup).get();
}
_tmp_writer.close().get();
_tmp_writer.close();
_cfg.monitor->on_flush_completed();
}

View File

@@ -43,20 +43,23 @@ public:
virtual ~file_writer() = default;
file_writer(file_writer&&) = default;
future<> write(const char* buf, size_t n) {
// Must be called in a seastar thread.
void write(const char* buf, size_t n) {
_offset.offset += n;
return _out.write(buf, n);
_out.write(buf, n).get();
}
future<> write(const bytes& s) {
// Must be called in a seastar thread.
void write(const bytes& s) {
_offset.offset += s.size();
return _out.write(s);
_out.write(s).get();
}
future<> flush() {
return _out.flush();
// Must be called in a seastar thread.
void flush() {
_out.flush().get();
}
future<> close() {
return _out.close();
// Must be called in a seastar thread.
void close() {
_out.close().get();
}
uint64_t offset() const {
return _offset.offset;
@@ -110,8 +113,8 @@ serialized_size(sstable_version_types v, const T& object) {
uint64_t size = 0;
auto writer = file_writer(make_sizing_output_stream(size));
write(v, writer, object);
writer.flush().get();
writer.close().get();
writer.flush();
writer.close();
return size;
}