Merge 'sstables: allow bypassing min max position metadata loading' from Botond Dénes

Said mechanism broke tools and tests to some extent: the read it executes on sstable load time means that if the sstable is broken enough to fail this read, it will fail to load, preventing diagnostic tools to load it and examine it and preventing tests from producing broken sstables for testing purposes.

Closes #12359

* github.com:scylladb/scylladb:
  sstables: allow bypassing first/last position metadata loading
  sstables: sstable::{load,open_data}(): fix indentation
  sstables: coroutinize sstable::open_data()
  sstables: sstable::open_data(): use clear_gently() to clear token ranges
  sstables: coroutinize sstable::load()
This commit is contained in:
Avi Kivity
2022-12-20 14:22:35 +02:00
3 changed files with 56 additions and 58 deletions

View File

@@ -22,6 +22,7 @@
#include <seastar/core/aligned_buffer.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/reactor.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/util/file.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/short_streams.hh>
@@ -59,6 +60,7 @@
#include "utils/bloom_filter.hh"
#include "utils/memory_data_sink.hh"
#include "utils/cached_file.hh"
#include "utils/stall_free.hh"
#include "checked-file-impl.hh"
#include "integrity_checked_file_impl.hh"
#include "db/extensions.hh"
@@ -1368,42 +1370,31 @@ future<> sstable::open_or_create_data(open_flags oflags, file_open_options optio
).discard_result();
}
future<> sstable::open_data() noexcept {
return open_or_create_data(open_flags::ro).then([this] {
return this->update_info_for_opened_data();
}).then([this] {
if (_shards.empty()) {
_shards = compute_shards_for_this_sstable();
}
auto* sm = _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>();
if (!sm) {
return make_ready_future<>();
}
auto c = &sm->token_ranges.elements;
future<> sstable::open_data(sstable_open_config cfg) noexcept {
co_await open_or_create_data(open_flags::ro);
co_await update_info_for_opened_data(cfg);
if (_shards.empty()) {
_shards = compute_shards_for_this_sstable();
}
auto* sm = _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>();
if (sm) {
// Sharding information uses a lot of memory and once we're doing with this computation we will no longer use it.
return do_until([c] { return c->empty(); }, [c] {
c->pop_back();
return make_ready_future<>();
}).then([this, c] () mutable {
*c = {};
return make_ready_future<>();
});
}).then([this] {
auto* ld_stats = _components->scylla_metadata->data.get<scylla_metadata_type::LargeDataStats, scylla_metadata::large_data_stats>();
if (ld_stats) {
_large_data_stats.emplace(*ld_stats);
}
auto* origin = _components->scylla_metadata->data.get<scylla_metadata_type::SSTableOrigin, scylla_metadata::sstable_origin>();
if (origin) {
_origin = sstring(to_sstring_view(bytes_view(origin->value)));
}
}).then([this] {
_open_mode.emplace(open_flags::ro);
_stats.on_open_for_reading();
});
co_await utils::clear_gently(sm->token_ranges.elements);
sm->token_ranges.elements = {};
}
auto* ld_stats = _components->scylla_metadata->data.get<scylla_metadata_type::LargeDataStats, scylla_metadata::large_data_stats>();
if (ld_stats) {
_large_data_stats.emplace(*ld_stats);
}
auto* origin = _components->scylla_metadata->data.get<scylla_metadata_type::SSTableOrigin, scylla_metadata::sstable_origin>();
if (origin) {
_origin = sstring(to_sstring_view(bytes_view(origin->value)));
}
_open_mode.emplace(open_flags::ro);
_stats.on_open_for_reading();
}
future<> sstable::update_info_for_opened_data() {
future<> sstable::update_info_for_opened_data(sstable_open_config cfg) {
struct stat st = co_await _data_file.stat();
if (this->has_component(component_type::CompressionInfo)) {
@@ -1453,7 +1444,9 @@ future<> sstable::update_info_for_opened_data() {
}));
_bytes_on_disk += bytes;
}
co_await load_first_and_last_position_in_partition();
if (cfg.load_first_and_last_position_metadata) {
co_await load_first_and_last_position_in_partition();
}
}
future<> sstable::create_data() noexcept {
@@ -1504,26 +1497,22 @@ void sstable::write_filter(const io_priority_class& pc) {
// This interface is only used during tests, snapshot loading and early initialization.
// No need to set tunable priorities for it.
future<> sstable::load(const io_priority_class& pc) noexcept {
return read_toc().then([this, &pc] {
// read scylla-meta after toc. Might need it to parse
// rest (hint extensions)
return read_scylla_metadata(pc).then([this, &pc] {
// Read statistics ahead of others - if summary is missing
// we'll attempt to re-generate it and we need statistics for that
return read_statistics(pc).then([this, &pc] {
return seastar::when_all_succeed(
read_compression(pc),
read_filter(pc),
read_summary(pc)).then_unpack([this] {
validate_min_max_metadata();
validate_max_local_deletion_time();
validate_partitioner();
return open_data();
});
});
});
});
future<> sstable::load(const io_priority_class& pc, sstable_open_config cfg) noexcept {
co_await read_toc();
// read scylla-meta after toc. Might need it to parse
// rest (hint extensions)
co_await read_scylla_metadata(pc);
// Read statistics ahead of others - if summary is missing
// we'll attempt to re-generate it and we need statistics for that
co_await read_statistics(pc);
co_await coroutine::all(
[&] { return read_compression(pc); },
[&] { return read_filter(pc); },
[&] { return read_summary(pc); });
validate_min_max_metadata();
validate_max_local_deletion_time();
validate_partitioner();
co_await open_data(cfg);
}
future<> sstable::load(sstables::foreign_sstable_open_info info) noexcept {

View File

@@ -130,6 +130,15 @@ constexpr auto table_subdirectories = std::to_array({
constexpr const char* repair_origin = "repair";
struct sstable_open_config {
// Load the first and last position in partition, populating the
// `_first_partition_first_position` and `_last_partition_last_position`
// fields respectively. Problematic sstables might fail to load. Set to
// false if you want to disable this, to be able to read such sstables.
// Should only be disabled for diagnostics purposes.
bool load_first_and_last_position_metadata = true;
};
class sstable : public enable_lw_shared_from_this<sstable> {
friend ::sstable_assertions;
public:
@@ -186,9 +195,9 @@ public:
// load all components from disk
// this variant will be useful for testing purposes and also when loading
// a new sstable from scratch for sharing its components.
future<> load(const io_priority_class& pc = default_priority_class()) noexcept;
future<> open_data() noexcept;
future<> update_info_for_opened_data();
future<> load(const io_priority_class& pc = default_priority_class(), sstable_open_config cfg = {}) noexcept;
future<> open_data(sstable_open_config cfg = {}) noexcept;
future<> update_info_for_opened_data(sstable_open_config cfg = {});
class delayed_commit_changes {
std::unordered_set<sstring> _dirs;

View File

@@ -151,7 +151,7 @@ const std::vector<sstables::shared_sstable> load_sstables(schema_ptr schema, sst
auto ed = sstables::entry_descriptor::make_descriptor(dir_path.c_str(), sst_filename.c_str(), schema->ks_name(), schema->cf_name());
auto sst = sst_man.make_sstable(schema, dir_path.c_str(), ed.generation, ed.version, ed.format);
co_await sst->load();
co_await sst->load(default_priority_class(), sstables::sstable_open_config{.load_first_and_last_position_metadata = false});
sstables[i] = std::move(sst);
}).get();