sstables: always use a file_*_stream_options in our readers and writes
Instead of using the APIs that explicitly pass things like buffer_size, always use the options instance instead. This will make it easier to pass extra options in the future. Signed-off-by: Glauber Costa <glauber@scylladb.com> Message-Id: <5b04e60ab469c319a17a522694e5bedf806702fe.1453219530.git.glauber@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
c3ac5257b5
commit
63967db8bf
@@ -100,7 +100,10 @@ class file_random_access_reader : public random_access_reader {
|
||||
size_t _buffer_size;
|
||||
public:
|
||||
virtual input_stream<char> open_at(uint64_t pos) override {
|
||||
return make_file_input_stream(_file, pos, _buffer_size);
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = _buffer_size;
|
||||
|
||||
return make_file_input_stream(_file, pos, std::move(options));
|
||||
}
|
||||
explicit file_random_access_reader(file f, size_t buffer_size = 8192)
|
||||
: _file(std::move(f)), _buffer_size(buffer_size)
|
||||
@@ -748,7 +751,10 @@ void sstable::write_toc() {
|
||||
|
||||
// Writing TOC content to temporary file.
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
auto w = file_writer(std::move(f), 4096);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
auto w = file_writer(std::move(f), std::move(options));
|
||||
|
||||
for (auto&& key : _components) {
|
||||
// new line character is appended to the end of each component name.
|
||||
@@ -787,7 +793,10 @@ void write_crc(const sstring file_path, checksum& c) {
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
file f = open_file_dma(file_path, oflags).get0();
|
||||
auto w = file_writer(std::move(f), 4096);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
auto w = file_writer(std::move(f), std::move(options));
|
||||
write(w, c);
|
||||
w.close().get();
|
||||
}
|
||||
@@ -798,7 +807,10 @@ void write_digest(const sstring file_path, uint32_t full_checksum) {
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
auto f = open_file_dma(file_path, oflags).get0();
|
||||
auto w = file_writer(std::move(f), 4096);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
auto w = file_writer(std::move(f), std::move(options));
|
||||
|
||||
auto digest = to_sstring<bytes>(full_checksum);
|
||||
write(w, digest);
|
||||
@@ -828,7 +840,9 @@ future<index_list> sstable::read_indexes(uint64_t summary_idx) {
|
||||
estimated_size = std::max<size_t>(estimated_size, 8192);
|
||||
|
||||
return do_with(index_consumer(quantity), [this, position, estimated_size] (index_consumer& ic) {
|
||||
auto stream = make_file_input_stream(this->_index_file, position, estimated_size);
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = estimated_size;
|
||||
auto stream = make_file_input_stream(this->_index_file, position, std::move(options));
|
||||
auto ctx = make_lw_shared<index_consume_entry_context>(ic, std::move(stream), this->index_size() - position);
|
||||
return ctx->consume_input(*ctx).then([ctx, &ic] {
|
||||
return make_ready_future<index_list>(std::move(ic.indexes));
|
||||
@@ -863,7 +877,10 @@ void sstable::write_simple(T& component) {
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
auto w = file_writer(std::move(f), sstable_buffer_size);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
auto w = file_writer(std::move(f), std::move(options));
|
||||
write(w, component);
|
||||
w.flush().get();
|
||||
w.close().get();
|
||||
@@ -1248,7 +1265,9 @@ static void seal_statistics(statistics& s, metadata_collector& collector,
|
||||
///
|
||||
void sstable::do_write_components(::mutation_reader mr,
|
||||
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, file_writer& out) {
|
||||
auto index = make_shared<file_writer>(_index_file, sstable_buffer_size);
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
auto index = make_shared<file_writer>(_index_file, std::move(options));
|
||||
|
||||
auto filter_fp_chance = schema->bloom_filter_fp_chance();
|
||||
_filter = utils::i_filter::get_filter(estimated_partitions, filter_fp_chance);
|
||||
@@ -1353,7 +1372,10 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
|
||||
bool checksum_file = has_component(sstable::component_type::CRC);
|
||||
|
||||
if (checksum_file) {
|
||||
auto w = make_shared<checksummed_file_writer>(_data_file, sstable_buffer_size, checksum_file);
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
|
||||
auto w = make_shared<checksummed_file_writer>(_data_file, std::move(options), checksum_file);
|
||||
this->do_write_components(std::move(mr), estimated_partitions, std::move(schema), max_sstable_size, *w);
|
||||
w->close().get();
|
||||
_data_file = file(); // w->close() closed _data_file
|
||||
@@ -1546,7 +1568,9 @@ input_stream<char> sstable::data_stream_at(uint64_t pos, uint64_t buf_size) {
|
||||
return make_compressed_file_input_stream(
|
||||
_data_file, &_compression, pos);
|
||||
} else {
|
||||
return make_file_input_stream(_data_file, pos, buf_size);
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = buf_size;
|
||||
return make_file_input_stream(_data_file, 0, std::move(options));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,8 +32,8 @@ class file_writer {
|
||||
output_stream<char> _out;
|
||||
size_t _offset = 0;
|
||||
public:
|
||||
file_writer(file f, size_t buffer_size = 8192)
|
||||
: _out(make_file_output_stream(std::move(f), buffer_size)) {}
|
||||
file_writer(file f, file_output_stream_options options)
|
||||
: _out(make_file_output_stream(std::move(f), std::move(options))) {}
|
||||
|
||||
file_writer(output_stream<char>&& out)
|
||||
: _out(std::move(out)) {}
|
||||
@@ -60,15 +60,15 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
output_stream<char> make_checksummed_file_output_stream(file f, size_t buffer_size, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file);
|
||||
output_stream<char> make_checksummed_file_output_stream(file f, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file, file_output_stream_options options);
|
||||
|
||||
class checksummed_file_writer : public file_writer {
|
||||
checksum _c;
|
||||
uint32_t _full_checksum;
|
||||
public:
|
||||
checksummed_file_writer(file f, size_t buffer_size = 8192, bool checksum_file = false)
|
||||
: file_writer(make_checksummed_file_output_stream(std::move(f), buffer_size, _c, _full_checksum, checksum_file))
|
||||
, _c({uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size))})
|
||||
checksummed_file_writer(file f, file_output_stream_options options, bool checksum_file = false)
|
||||
: file_writer(make_checksummed_file_output_stream(std::move(f), _c, _full_checksum, checksum_file, options))
|
||||
, _c({uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), size_t(options.buffer_size)))})
|
||||
, _full_checksum(init_checksum_adler32()) {}
|
||||
|
||||
// Since we are exposing a reference to _full_checksum, we delete the move
|
||||
@@ -91,8 +91,8 @@ class checksummed_file_data_sink_impl : public data_sink_impl {
|
||||
uint32_t& _full_checksum;
|
||||
bool _checksum_file;
|
||||
public:
|
||||
checksummed_file_data_sink_impl(file f, size_t buffer_size, struct checksum& c, uint32_t& full_file_checksum, bool checksum_file)
|
||||
: _out(make_file_output_stream(std::move(f), buffer_size))
|
||||
checksummed_file_data_sink_impl(file f, struct checksum& c, uint32_t& full_file_checksum, bool checksum_file, file_output_stream_options options)
|
||||
: _out(make_file_output_stream(std::move(f), std::move(options)))
|
||||
, _c(c)
|
||||
, _full_checksum(full_file_checksum)
|
||||
, _checksum_file(checksum_file)
|
||||
@@ -127,13 +127,14 @@ public:
|
||||
|
||||
class checksummed_file_data_sink : public data_sink {
|
||||
public:
|
||||
checksummed_file_data_sink(file f, size_t buffer_size, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file)
|
||||
: data_sink(std::make_unique<checksummed_file_data_sink_impl>(std::move(f), buffer_size, cinfo, full_file_checksum, checksum_file)) {}
|
||||
checksummed_file_data_sink(file f, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file, file_output_stream_options options)
|
||||
: data_sink(std::make_unique<checksummed_file_data_sink_impl>(std::move(f), cinfo, full_file_checksum, checksum_file, std::move(options))) {}
|
||||
};
|
||||
|
||||
inline
|
||||
output_stream<char> make_checksummed_file_output_stream(file f, size_t buffer_size, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file) {
|
||||
return output_stream<char>(checksummed_file_data_sink(std::move(f), buffer_size, cinfo, full_file_checksum, checksum_file), buffer_size, true);
|
||||
output_stream<char> make_checksummed_file_output_stream(file f, struct checksum& cinfo, uint32_t& full_file_checksum, bool checksum_file, file_output_stream_options options) {
|
||||
auto buffer_size = options.buffer_size;
|
||||
return output_stream<char>(checksummed_file_data_sink(std::move(f), cinfo, full_file_checksum, checksum_file, std::move(options)), true, buffer_size);
|
||||
}
|
||||
|
||||
// compressed_file_data_sink_impl works as a filter for a file output stream,
|
||||
|
||||
Reference in New Issue
Block a user