mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 00:13:31 +00:00
sstables: Process extensions on file open
Allowing them to wrap/replace an opened file, and add to/read from scylla metadata.
This commit is contained in:
@@ -63,6 +63,7 @@
|
||||
#include "checked-file-impl.hh"
|
||||
#include "integrity_checked_file_impl.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/extensions.hh"
|
||||
|
||||
thread_local disk_error_signal_type sstable_read_error;
|
||||
thread_local disk_error_signal_type sstable_write_error;
|
||||
@@ -92,7 +93,7 @@ read_monitor_generator& default_read_monitor_generator() {
|
||||
|
||||
static future<file> open_sstable_component_file(const io_error_handler& error_handler, sstring name, open_flags flags,
|
||||
file_open_options options) {
|
||||
if (get_config().enable_sstable_data_integrity_check()) {
|
||||
if (flags != open_flags::ro && get_config().enable_sstable_data_integrity_check()) {
|
||||
return open_integrity_checked_file_dma(name, flags, options).then([&error_handler] (auto f) {
|
||||
return make_checked_file(error_handler, std::move(f));
|
||||
});
|
||||
@@ -1310,12 +1311,38 @@ future<> sstable::read_summary(const io_priority_class& pc) {
|
||||
});
|
||||
}
|
||||
|
||||
future<file> sstable::open_file(component_type type, open_flags flags, file_open_options opts) {
|
||||
auto f = new_sstable_component_file(_read_error_handler, filename(type), flags, opts);
|
||||
if ((type != component_type::Data && type != component_type::Index)
|
||||
|| get_config().extensions().sstable_file_io_extensions().empty()) {
|
||||
return f;
|
||||
}
|
||||
return f.then([this, type, flags](file f) {
|
||||
return do_with(std::move(f), [this, type, flags](file& f) {
|
||||
auto ext_range = get_config().extensions().sstable_file_io_extensions();
|
||||
return do_for_each(ext_range.begin(), ext_range.end(), [this, &f, type, flags](auto& ext) {
|
||||
// note: we're potentially wrapping more than once. extension mechanism
|
||||
// is responsible for order being sane.
|
||||
return ext->wrap_file(*this, type, f, flags).then([&f](file of) {
|
||||
if (of) {
|
||||
f = std::move(of);
|
||||
}
|
||||
});
|
||||
}).then([&f] {
|
||||
return f;
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable::open_data() {
|
||||
return when_all(open_checked_file_dma(_read_error_handler, filename(component_type::Index), open_flags::ro),
|
||||
open_checked_file_dma(_read_error_handler, filename(component_type::Data), open_flags::ro))
|
||||
return when_all(open_file(component_type::Index, open_flags::ro),
|
||||
open_file(component_type::Data, open_flags::ro))
|
||||
.then([this] (auto files) {
|
||||
|
||||
_index_file = std::get<file>(std::get<0>(files).get());
|
||||
_data_file = std::get<file>(std::get<1>(files).get());
|
||||
_data_file = std::get<file>(std::get<1>(files).get());
|
||||
|
||||
return this->update_info_for_opened_data();
|
||||
}).then([this] {
|
||||
if (_shards.empty()) {
|
||||
@@ -1371,14 +1398,15 @@ future<> sstable::create_data() {
|
||||
file_open_options opt;
|
||||
opt.extent_allocation_size_hint = 32 << 20;
|
||||
opt.sloppy_size = true;
|
||||
return when_all(new_sstable_component_file(_write_error_handler, filename(component_type::Index), oflags, opt),
|
||||
new_sstable_component_file(_write_error_handler, filename(component_type::Data), oflags, opt)).then([this] (auto files) {
|
||||
return when_all(open_file(component_type::Index, oflags, opt),
|
||||
open_file(component_type::Data, oflags, opt)).then([this] (auto files) {
|
||||
// FIXME: If both files could not be created, the first get below will
|
||||
// throw an exception, and second get() will not be attempted, and
|
||||
// we'll get a warning about the second future being destructed
|
||||
// without its exception being examined.
|
||||
|
||||
_index_file = std::get<file>(std::get<0>(files).get());
|
||||
_data_file = std::get<file>(std::get<1>(files).get());
|
||||
_data_file = std::get<file>(std::get<1>(files).get());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2277,12 +2305,17 @@ sstable::write_scylla_metadata(const io_priority_class& pc, shard_id shard, ssta
|
||||
auto&& first_key = get_first_decorated_key();
|
||||
auto&& last_key = get_last_decorated_key();
|
||||
auto sm = create_sharding_metadata(_schema, first_key, last_key, shard);
|
||||
|
||||
// sstable write may fail to generate empty metadata if mutation source has only data from other shard.
|
||||
// see https://github.com/scylladb/scylla/issues/2932 for details on how it can happen.
|
||||
if (sm.token_ranges.elements.empty()) {
|
||||
throw std::runtime_error(sprint("Failed to generate sharding metadata for %s", get_filename()));
|
||||
}
|
||||
_components->scylla_metadata.emplace();
|
||||
|
||||
if (!_components->scylla_metadata) {
|
||||
_components->scylla_metadata.emplace();
|
||||
}
|
||||
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::Sharding>(std::move(sm));
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::Features>(std::move(features));
|
||||
|
||||
|
||||
@@ -543,6 +543,7 @@ private:
|
||||
const bool has_component(component_type f) const;
|
||||
|
||||
const sstring filename(component_type f) const;
|
||||
future<file> open_file(component_type, open_flags, file_open_options = {});
|
||||
|
||||
template <sstable::component_type Type, typename T>
|
||||
future<> read_simple(T& comp, const io_priority_class& pc);
|
||||
@@ -649,6 +650,16 @@ public:
|
||||
|
||||
future<> read_toc();
|
||||
|
||||
shareable_components& get_shared_components() {
|
||||
return *_components;
|
||||
}
|
||||
const shareable_components& get_shared_components() const {
|
||||
return *_components;
|
||||
}
|
||||
schema_ptr get_schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
bool has_scylla_component() const {
|
||||
return has_component(component_type::Scylla);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user