From d277ec2ab9320ce2fd1bc5eee9076d44f0394ce4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 22 Dec 2019 10:03:56 +0200 Subject: [PATCH] sstable: file_writer: add optional filename member To be used for reporting errors when failing to closing the output stream. Signed-off-by: Benny Halevy --- sstables/mc/writer.cc | 6 +++--- sstables/sstables.cc | 34 +++++++++++++++++++++++++--------- sstables/writer.hh | 26 ++++++++++++++++---------- 3 files changed, 44 insertions(+), 22 deletions(-) diff --git a/sstables/mc/writer.cc b/sstables/mc/writer.cc index cf0d07cb17..1b70589041 100644 --- a/sstables/mc/writer.cc +++ b/sstables/mc/writer.cc @@ -834,16 +834,16 @@ void writer::init_file_writers() { if (!_compression_enabled) { auto out = make_file_data_sink(std::move(_sst._data_file), options).get0(); - _data_writer = std::make_unique(std::move(out), options.buffer_size); + _data_writer = std::make_unique(std::move(out), options.buffer_size, _sst.filename(component_type::Data)); } else { auto out = make_file_output_stream(std::move(_sst._data_file), options).get0(); _data_writer = std::make_unique( make_compressed_file_m_format_output_stream( std::move(out), &_sst._components->compression, - _schema.get_compressor_params())); + _schema.get_compressor_params()), _sst.filename(component_type::Data)); } - auto w = file_writer::make(std::move(_sst._index_file), options); + auto w = file_writer::make(std::move(_sst._index_file), std::move(options), _sst.filename(component_type::Index)); _index_writer = std::make_unique(w.get0()); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 08faf205f7..f17280e7de 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -939,11 +939,27 @@ void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) { _recognized_components.insert(component_type::Scylla); } +void file_writer::close() { + try { + _out.close().get(); + } catch (...) { + auto e = std::current_exception(); + sstlog.error("Error while closing {}: {}", get_filename(), e); + std::rethrow_exception(e); + } +} + +const char* file_writer::get_filename() const noexcept { + return _filename ? _filename->c_str() : ""; +} + future sstable::make_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept { // Note: file_writer::make closes the file if file_writer creation fails // so we don't need to use with_file_close_on_failure here. - return new_sstable_component_file(_write_error_handler, c, oflags).then([options = std::move(options)] (file f) { - return file_writer::make(std::move(f), std::move(options)); + return futurize_invoke([this, c] { return filename(c); }).then([this, c, options = std::move(options), oflags] (sstring filename) mutable { + return new_sstable_component_file(_write_error_handler, c, oflags).then([options = std::move(options), filename = std::move(filename)] (file f) mutable { + return file_writer::make(std::move(f), std::move(options), std::move(filename)); + }); }); } @@ -1981,7 +1997,7 @@ file_writer components_writer::index_file_writer(sstable& sst, const io_priority options.buffer_size = sst.sstable_buffer_size; options.io_priority_class = pc; options.write_behind = 10; - return file_writer::make(std::move(sst._index_file), std::move(options)).get0(); + return file_writer::make(std::move(sst._index_file), std::move(options), sst.filename(component_type::Index)).get0(); } // Returns the cost for writing a byte to summary such that the ratio of summary @@ -2265,11 +2281,11 @@ void sstable_writer_k_l::prepare_file_writer() if (!_compression_enabled) { auto out = make_file_data_sink(std::move(_sst._data_file), options).get0(); - _writer = std::make_unique(std::move(out), options.buffer_size); + _writer = std::make_unique(std::move(out), options.buffer_size, _sst.get_filename()); } else { auto out = make_file_output_stream(std::move(_sst._data_file), std::move(options)).get0(); _writer = std::make_unique(make_compressed_file_k_l_format_output_stream( - std::move(out), &_sst._components->compression, _schema.get_compressor_params())); + std::move(out), &_sst._components->compression, _schema.get_compressor_params()), _sst.get_filename()); } } @@ -3325,7 +3341,7 @@ delete_atomically(std::vector ssts) { // Write all toc names into the log file. file_output_stream_options options; options.buffer_size = 4096; - auto w = file_writer::make(std::move(f), options).get0(); + auto w = file_writer::make(std::move(f), options, tmp_pending_delete_log).get0(); for (const auto& sst : ssts) { auto toc = sst->component_basename(component_type::TOC); @@ -3523,11 +3539,11 @@ sstable::sstable(schema_ptr schema, tracker.add(*this); } -future file_writer::make(file f, file_output_stream_options options) noexcept { +future file_writer::make(file f, file_output_stream_options options, sstring filename) noexcept { // note: make_file_output_stream closes the file if the stream creation fails return make_file_output_stream(std::move(f), std::move(options)) - .then([](output_stream&& out) { - return file_writer(std::move(out)); + .then([filename = std::move(filename)] (output_stream&& out) { + return file_writer(std::move(out), std::move(filename)); }); } diff --git a/sstables/writer.hh b/sstables/writer.hh index 67ad527a30..975e11d453 100644 --- a/sstables/writer.hh +++ b/sstables/writer.hh @@ -42,15 +42,20 @@ class metadata_collector; class file_writer { output_stream _out; writer_offset_tracker _offset; + std::optional _filename; public: // Closes the file if file_writer creation fails - static future make(file f, file_output_stream_options options) noexcept; + static future make(file f, file_output_stream_options options, sstring filename) noexcept; - file_writer(output_stream&& out) - : _out(std::move(out)) {} + file_writer(output_stream&& out, sstring filename) noexcept + : _out(std::move(out)) + , _filename(std::move(filename)) + {} + + file_writer(output_stream&& out) noexcept + : _out(std::move(out)) + {} - virtual ~file_writer() = default; - file_writer(file_writer&&) = default; // Must be called in a seastar thread. void write(const char* buf, size_t n) { _offset.offset += n; @@ -66,9 +71,8 @@ public: _out.flush().get(); } // Must be called in a seastar thread. - void close() { - _out.close().get(); - } + void close(); + uint64_t offset() const { return _offset.offset; } @@ -76,6 +80,8 @@ public: const writer_offset_tracker& offset_tracker() const { return _offset; } + + const char* get_filename() const noexcept; }; @@ -185,8 +191,8 @@ class checksummed_file_writer : public file_writer { checksum _c; uint32_t _full_checksum; public: - checksummed_file_writer(data_sink out, size_t buffer_size) - : file_writer(make_checksummed_file_output_stream(std::move(out), _c, _full_checksum, buffer_size)) + checksummed_file_writer(data_sink out, size_t buffer_size, sstring filename) + : file_writer(make_checksummed_file_output_stream(std::move(out), _c, _full_checksum, buffer_size), std::move(filename)) , _c({uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size))}) , _full_checksum(ChecksumType::init_checksum()) {}