mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 16:33:35 +00:00
sstable: file_writer: add optional filename member
To be used for reporting errors when failing to closing the output stream. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -834,16 +834,16 @@ void writer::init_file_writers() {
|
||||
|
||||
if (!_compression_enabled) {
|
||||
auto out = make_file_data_sink(std::move(_sst._data_file), options).get0();
|
||||
_data_writer = std::make_unique<crc32_checksummed_file_writer>(std::move(out), options.buffer_size);
|
||||
_data_writer = std::make_unique<crc32_checksummed_file_writer>(std::move(out), options.buffer_size, _sst.filename(component_type::Data));
|
||||
} else {
|
||||
auto out = make_file_output_stream(std::move(_sst._data_file), options).get0();
|
||||
_data_writer = std::make_unique<file_writer>(
|
||||
make_compressed_file_m_format_output_stream(
|
||||
std::move(out),
|
||||
&_sst._components->compression,
|
||||
_schema.get_compressor_params()));
|
||||
_schema.get_compressor_params()), _sst.filename(component_type::Data));
|
||||
}
|
||||
auto w = file_writer::make(std::move(_sst._index_file), options);
|
||||
auto w = file_writer::make(std::move(_sst._index_file), std::move(options), _sst.filename(component_type::Index));
|
||||
_index_writer = std::make_unique<file_writer>(w.get0());
|
||||
}
|
||||
|
||||
|
||||
@@ -939,11 +939,27 @@ void sstable::generate_toc(compressor_ptr c, double filter_fp_chance) {
|
||||
_recognized_components.insert(component_type::Scylla);
|
||||
}
|
||||
|
||||
void file_writer::close() {
|
||||
try {
|
||||
_out.close().get();
|
||||
} catch (...) {
|
||||
auto e = std::current_exception();
|
||||
sstlog.error("Error while closing {}: {}", get_filename(), e);
|
||||
std::rethrow_exception(e);
|
||||
}
|
||||
}
|
||||
|
||||
const char* file_writer::get_filename() const noexcept {
|
||||
return _filename ? _filename->c_str() : "<anonymous output_stream>";
|
||||
}
|
||||
|
||||
future<file_writer> sstable::make_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
|
||||
// Note: file_writer::make closes the file if file_writer creation fails
|
||||
// so we don't need to use with_file_close_on_failure here.
|
||||
return new_sstable_component_file(_write_error_handler, c, oflags).then([options = std::move(options)] (file f) {
|
||||
return file_writer::make(std::move(f), std::move(options));
|
||||
return futurize_invoke([this, c] { return filename(c); }).then([this, c, options = std::move(options), oflags] (sstring filename) mutable {
|
||||
return new_sstable_component_file(_write_error_handler, c, oflags).then([options = std::move(options), filename = std::move(filename)] (file f) mutable {
|
||||
return file_writer::make(std::move(f), std::move(options), std::move(filename));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1981,7 +1997,7 @@ file_writer components_writer::index_file_writer(sstable& sst, const io_priority
|
||||
options.buffer_size = sst.sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
options.write_behind = 10;
|
||||
return file_writer::make(std::move(sst._index_file), std::move(options)).get0();
|
||||
return file_writer::make(std::move(sst._index_file), std::move(options), sst.filename(component_type::Index)).get0();
|
||||
}
|
||||
|
||||
// Returns the cost for writing a byte to summary such that the ratio of summary
|
||||
@@ -2265,11 +2281,11 @@ void sstable_writer_k_l::prepare_file_writer()
|
||||
|
||||
if (!_compression_enabled) {
|
||||
auto out = make_file_data_sink(std::move(_sst._data_file), options).get0();
|
||||
_writer = std::make_unique<adler32_checksummed_file_writer>(std::move(out), options.buffer_size);
|
||||
_writer = std::make_unique<adler32_checksummed_file_writer>(std::move(out), options.buffer_size, _sst.get_filename());
|
||||
} else {
|
||||
auto out = make_file_output_stream(std::move(_sst._data_file), std::move(options)).get0();
|
||||
_writer = std::make_unique<file_writer>(make_compressed_file_k_l_format_output_stream(
|
||||
std::move(out), &_sst._components->compression, _schema.get_compressor_params()));
|
||||
std::move(out), &_sst._components->compression, _schema.get_compressor_params()), _sst.get_filename());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3325,7 +3341,7 @@ delete_atomically(std::vector<shared_sstable> ssts) {
|
||||
// Write all toc names into the log file.
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
auto w = file_writer::make(std::move(f), options).get0();
|
||||
auto w = file_writer::make(std::move(f), options, tmp_pending_delete_log).get0();
|
||||
|
||||
for (const auto& sst : ssts) {
|
||||
auto toc = sst->component_basename(component_type::TOC);
|
||||
@@ -3523,11 +3539,11 @@ sstable::sstable(schema_ptr schema,
|
||||
tracker.add(*this);
|
||||
}
|
||||
|
||||
future<file_writer> file_writer::make(file f, file_output_stream_options options) noexcept {
|
||||
future<file_writer> file_writer::make(file f, file_output_stream_options options, sstring filename) noexcept {
|
||||
// note: make_file_output_stream closes the file if the stream creation fails
|
||||
return make_file_output_stream(std::move(f), std::move(options))
|
||||
.then([](output_stream<char>&& out) {
|
||||
return file_writer(std::move(out));
|
||||
.then([filename = std::move(filename)] (output_stream<char>&& out) {
|
||||
return file_writer(std::move(out), std::move(filename));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -42,15 +42,20 @@ class metadata_collector;
|
||||
class file_writer {
|
||||
output_stream<char> _out;
|
||||
writer_offset_tracker _offset;
|
||||
std::optional<sstring> _filename;
|
||||
public:
|
||||
// Closes the file if file_writer creation fails
|
||||
static future<file_writer> make(file f, file_output_stream_options options) noexcept;
|
||||
static future<file_writer> make(file f, file_output_stream_options options, sstring filename) noexcept;
|
||||
|
||||
file_writer(output_stream<char>&& out)
|
||||
: _out(std::move(out)) {}
|
||||
file_writer(output_stream<char>&& out, sstring filename) noexcept
|
||||
: _out(std::move(out))
|
||||
, _filename(std::move(filename))
|
||||
{}
|
||||
|
||||
file_writer(output_stream<char>&& out) noexcept
|
||||
: _out(std::move(out))
|
||||
{}
|
||||
|
||||
virtual ~file_writer() = default;
|
||||
file_writer(file_writer&&) = default;
|
||||
// Must be called in a seastar thread.
|
||||
void write(const char* buf, size_t n) {
|
||||
_offset.offset += n;
|
||||
@@ -66,9 +71,8 @@ public:
|
||||
_out.flush().get();
|
||||
}
|
||||
// Must be called in a seastar thread.
|
||||
void close() {
|
||||
_out.close().get();
|
||||
}
|
||||
void close();
|
||||
|
||||
uint64_t offset() const {
|
||||
return _offset.offset;
|
||||
}
|
||||
@@ -76,6 +80,8 @@ public:
|
||||
const writer_offset_tracker& offset_tracker() const {
|
||||
return _offset;
|
||||
}
|
||||
|
||||
const char* get_filename() const noexcept;
|
||||
};
|
||||
|
||||
|
||||
@@ -185,8 +191,8 @@ class checksummed_file_writer : public file_writer {
|
||||
checksum _c;
|
||||
uint32_t _full_checksum;
|
||||
public:
|
||||
checksummed_file_writer(data_sink out, size_t buffer_size)
|
||||
: file_writer(make_checksummed_file_output_stream<ChecksumType>(std::move(out), _c, _full_checksum, buffer_size))
|
||||
checksummed_file_writer(data_sink out, size_t buffer_size, sstring filename)
|
||||
: file_writer(make_checksummed_file_output_stream<ChecksumType>(std::move(out), _c, _full_checksum, buffer_size), std::move(filename))
|
||||
, _c({uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size))})
|
||||
, _full_checksum(ChecksumType::init_checksum()) {}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user