mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 00:50:35 +00:00
Densely populated pages have no promoted index (small partitions), so we can save space in such workloads by keeping promoted index in a separate vector. For workloads which do have a promoted index, pages have only one partition. There aren't many such pages and they are long-lived, so the extra allocation of the vector is amortized. promoted_index class is removed, and replaced with equivalent parsed_promoted_index_entry for simplicity. Because it's removed, make_cursor() is moved into the index_reader class. Reducing the size of index_entry is important for performence if pages are densly populated. It helps to reduce LSA allocator pressure and compaction/eviction speed. This change, combined with the earlier change "Shave-off 16 bytes from index_entry by using raw_token", gives significant improvement in throughput in perf_simple_query run where the index doesn't fit in memory: scylla perf-simple-query -c1 -m200M --partitions=1000000 Before: 9714.78 tps (170.9 allocs/op, 16.9 logallocs/op, 55.3 tasks/op, 494788 insns/op, 343920 cycles/op, 0 errors) 9603.13 tps (171.6 allocs/op, 17.0 logallocs/op, 55.6 tasks/op, 502358 insns/op, 348344 cycles/op, 0 errors) 9621.43 tps (171.9 allocs/op, 17.0 logallocs/op, 55.8 tasks/op, 500612 insns/op, 347508 cycles/op, 0 errors) 9597.75 tps (171.6 allocs/op, 17.0 logallocs/op, 55.6 tasks/op, 501428 insns/op, 348604 cycles/op, 0 errors) 9615.54 tps (171.6 allocs/op, 16.9 logallocs/op, 55.6 tasks/op, 501313 insns/op, 347935 cycles/op, 0 errors) 9577.03 tps (171.8 allocs/op, 17.0 logallocs/op, 55.7 tasks/op, 503283 insns/op, 349251 cycles/op, 0 errors) After: 15328.25 tps (150.0 allocs/op, 14.1 logallocs/op, 45.4 tasks/op, 286769 insns/op, 218134 cycles/op, 0 errors) 15279.01 tps (149.9 allocs/op, 14.1 logallocs/op, 45.3 tasks/op, 287696 insns/op, 218637 cycles/op, 0 errors) 15347.78 tps (149.7 allocs/op, 14.1 logallocs/op, 45.3 tasks/op, 285851 insns/op, 217795 cycles/op, 0 errors) 15403.68 tps (149.6 allocs/op, 14.1 logallocs/op, 45.2 tasks/op, 285111 insns/op, 216984 cycles/op, 0 errors) 15189.47 tps (150.0 allocs/op, 14.1 logallocs/op, 45.5 tasks/op, 289509 insns/op, 219602 cycles/op, 0 errors) 15295.04 tps (149.8 allocs/op, 14.1 logallocs/op, 45.3 tasks/op, 288021 insns/op, 218545 cycles/op, 0 errors) 15162.01 tps (149.8 allocs/op, 14.1 logallocs/op, 45.4 tasks/op, 291265 insns/op, 220451 cycles/op, 0 errors)
1317 lines
52 KiB
C++
1317 lines
52 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "version.hh"
|
|
#include "shared_sstable.hh"
|
|
#include "open_info.hh"
|
|
#include "sstables_registry.hh"
|
|
#include <seastar/core/file.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/enum.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <unordered_set>
|
|
#include <unordered_map>
|
|
#include <variant>
|
|
#include "schema/schema_fwd.hh"
|
|
#include <seastar/core/stream.hh>
|
|
#include "encoding_stats.hh"
|
|
#include "filter.hh"
|
|
#include "utils/disk-error-handler.hh"
|
|
#include "sstables/progress_monitor.hh"
|
|
#include "db/commitlog/replay_position.hh"
|
|
#include "component_type.hh"
|
|
#include "column_translation.hh"
|
|
#include "stats.hh"
|
|
#include "utils/observable.hh"
|
|
#include "sstables/shareable_components.hh"
|
|
#include "sstables/storage.hh"
|
|
#include "sstables/generation_type.hh"
|
|
#include "sstables/types.hh"
|
|
#include "sstables/checksummed_data_source.hh"
|
|
#include "mutation/mutation_fragment_stream_validator.hh"
|
|
#include "readers/mutation_reader_fwd.hh"
|
|
#include "readers/mutation_reader.hh"
|
|
#include "tracing/trace_state.hh"
|
|
#include "utils/updateable_value.hh"
|
|
#include "dht/decorated_key.hh"
|
|
#include "service/session.hh"
|
|
#include "sstables/trie/bti_index.hh"
|
|
#include "sstables/file_size_stats.hh"
|
|
|
|
#include <seastar/util/optimized_optional.hh>
|
|
|
|
class sstable_assertions;
|
|
class cached_file;
|
|
|
|
namespace data_dictionary {
|
|
class storage_options;
|
|
}
|
|
|
|
class in_memory_config_type;
|
|
|
|
namespace db {
|
|
class large_data_handler;
|
|
class corrupt_data_handler;
|
|
}
|
|
|
|
namespace sstables {
|
|
|
|
template <typename ChecksumType, bool calculate_chunk_checksums>
|
|
requires ChecksumUtils<ChecksumType>
|
|
class checksummed_file_writer;
|
|
using crc32_digest_file_writer = checksummed_file_writer<crc32_utils, false>;
|
|
|
|
struct abstract_index_reader;
|
|
class sstable_directory;
|
|
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
|
|
|
|
namespace mc {
|
|
class writer;
|
|
}
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
extern logging::logger sstlog;
|
|
class sstable_writer;
|
|
class sstables_manager;
|
|
|
|
struct foreign_sstable_open_info;
|
|
|
|
template<typename T>
|
|
concept ConsumeRowsContext =
|
|
requires(T c, indexable_element el, size_t s) {
|
|
{ c.consume_input() } -> std::same_as<future<>>;
|
|
{ c.reset(el) } -> std::same_as<void>;
|
|
{ c.fast_forward_to(s, s) } -> std::same_as<future<>>;
|
|
{ c.position() } -> std::same_as<uint64_t>;
|
|
{ c.skip_to(s) } -> std::same_as<future<>>;
|
|
{ c.reader_position() } -> std::same_as<const sstables::reader_position_tracker&>;
|
|
{ c.eof() } -> std::same_as<bool>;
|
|
{ c.close() } -> std::same_as<future<>>;
|
|
};
|
|
|
|
template <typename DataConsumeRowsContext>
|
|
requires ConsumeRowsContext<DataConsumeRowsContext>
|
|
class data_consume_context;
|
|
|
|
class index_reader;
|
|
class partition_index_cache;
|
|
|
|
extern size_t summary_byte_cost(double summary_ratio);
|
|
|
|
struct sstable_writer_config {
|
|
size_t promoted_index_block_size;
|
|
size_t promoted_index_auto_scale_threshold;
|
|
uint64_t max_sstable_size = std::numeric_limits<uint64_t>::max();
|
|
bool backup = false;
|
|
bool leave_unsealed = false;
|
|
mutation_fragment_stream_validation_level validation_level;
|
|
std::optional<db::replay_position> replay_position;
|
|
std::optional<int> sstable_level;
|
|
write_monitor* monitor = &default_write_monitor();
|
|
run_id run_identifier = run_id::create_random_id();
|
|
size_t summary_byte_cost;
|
|
sstring origin;
|
|
bool correct_pi_block_width = true;
|
|
|
|
private:
|
|
explicit sstable_writer_config() {}
|
|
friend class sstables_manager;
|
|
};
|
|
|
|
constexpr const char* normal_dir = "";
|
|
constexpr const char* staging_dir = "staging";
|
|
constexpr const char* upload_dir = "upload";
|
|
constexpr const char* snapshots_dir = "snapshots";
|
|
constexpr const char* quarantine_dir = "quarantine";
|
|
constexpr const char* pending_delete_dir = "pending_delete";
|
|
constexpr const char* tempdir_extension = ".sstable";
|
|
|
|
constexpr auto table_subdirectories = std::to_array({
|
|
staging_dir,
|
|
upload_dir,
|
|
snapshots_dir,
|
|
quarantine_dir,
|
|
pending_delete_dir,
|
|
});
|
|
|
|
inline std::string_view state_to_dir(sstable_state state) {
|
|
switch (state) {
|
|
case sstable_state::normal:
|
|
return normal_dir;
|
|
case sstable_state::staging:
|
|
return staging_dir;
|
|
case sstable_state::quarantine:
|
|
return quarantine_dir;
|
|
case sstable_state::upload:
|
|
return upload_dir;
|
|
}
|
|
}
|
|
|
|
inline sstable_state state_from_dir(std::string_view dir) {
|
|
if (dir == "") {
|
|
return sstable_state::normal;
|
|
}
|
|
if (dir == staging_dir) {
|
|
return sstable_state::staging;
|
|
}
|
|
if (dir == quarantine_dir) {
|
|
return sstable_state::quarantine;
|
|
}
|
|
if (dir == upload_dir) {
|
|
return sstable_state::upload;
|
|
}
|
|
|
|
throw std::runtime_error(seastar::format("Unknown sstable state dir {}", dir));
|
|
}
|
|
|
|
// FIXME -- temporary, move to fs storage after patching the rest
|
|
inline fs::path make_path(std::string_view table_dir, sstable_state state) {
|
|
fs::path ret(table_dir);
|
|
if (state != sstable_state::normal) {
|
|
ret /= state_to_dir(state);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
constexpr const char* repair_origin = "repair";
|
|
|
|
const open_flags sstable_write_open_flags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
|
|
|
class delayed_commit_changes {
|
|
std::unordered_set<sstring> _dirs;
|
|
friend class filesystem_storage;
|
|
public:
|
|
future<> commit();
|
|
};
|
|
|
|
class sstable_stream_sink_impl;
|
|
|
|
class sstable : public enable_lw_shared_from_this<sstable> {
|
|
friend ::sstable_assertions;
|
|
public:
|
|
using version_types = sstable_version_types;
|
|
using format_types = sstable_format_types;
|
|
using manager_list_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
|
using manager_set_link_type = bi::set_member_hook<bi::link_mode<bi::auto_unlink>>;
|
|
public:
|
|
sstable(schema_ptr schema,
|
|
const data_dictionary::storage_options& storage,
|
|
generation_type generation,
|
|
sstable_state state,
|
|
version_types v,
|
|
format_types f,
|
|
db::large_data_handler& large_data_handler,
|
|
db::corrupt_data_handler& corrupt_data_handler,
|
|
sstables_manager& manager,
|
|
db_clock::time_point now,
|
|
io_error_handler_gen error_handler_gen,
|
|
size_t buffer_size);
|
|
sstable& operator=(const sstable&) = delete;
|
|
sstable(const sstable&) = delete;
|
|
sstable(sstable&&) = delete;
|
|
|
|
// disk_read_range describes a byte ranges covering part of an sstable
|
|
// row that we need to read from disk. Usually this is the whole byte
|
|
// range covering a single sstable row, but in very large rows we might
|
|
// want to only read a subset of the atoms which we know contains the
|
|
// columns we are looking for.
|
|
struct disk_read_range {
|
|
// TODO: this should become a vector of ranges
|
|
uint64_t start;
|
|
uint64_t end;
|
|
|
|
disk_read_range() : start(0), end(0) {}
|
|
disk_read_range(uint64_t start, uint64_t end) :
|
|
start(start), end(end) { }
|
|
explicit operator bool() const {
|
|
return start != end;
|
|
}
|
|
};
|
|
|
|
static component_type component_from_sstring(version_types version, const sstring& s);
|
|
static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, component_type component);
|
|
static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, sstring component);
|
|
static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, component_type component);
|
|
static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, sstring component);
|
|
|
|
// load sstable using components shared by a shard
|
|
future<> load(foreign_sstable_open_info info) noexcept;
|
|
// Load metadata components from disk
|
|
future<> load_metadata(sstable_open_config cfg = {}) noexcept;
|
|
// load all components from disk
|
|
// this variant will be useful for testing purposes and also when loading
|
|
// a new sstable from scratch for sharing its components.
|
|
future<> load(const dht::sharder& sharder, sstable_open_config cfg = {}) noexcept;
|
|
future<> open_data(sstable_open_config cfg = {}) noexcept;
|
|
|
|
// Load set of shards that own the SSTable, while reading the minimum
|
|
// from disk to achieve that.
|
|
future<> load_owner_shards(const dht::sharder& sharder);
|
|
|
|
// Call as the last method before the object is destroyed.
|
|
// No other uses of the object can happen at this point.
|
|
future<> destroy();
|
|
|
|
// Move the sstable between states
|
|
//
|
|
// Known states are normal, staging, upload and quarantine.
|
|
// It's up to the storage driver how to implement this.
|
|
future<> change_state(sstable_state to, delayed_commit_changes* delay = nullptr);
|
|
|
|
sstable_state state() const {
|
|
return _state;
|
|
}
|
|
|
|
// Filesystem-specific call to grab an sstable from upload dir and
|
|
// put it into the desired destination assigning the given generation
|
|
future<> pick_up_from_upload(sstable_state to, generation_type new_generation);
|
|
|
|
generation_type generation() const {
|
|
return _generation;
|
|
}
|
|
|
|
// Returns a mutation_reader for given range of partitions.
|
|
//
|
|
// Precondition: if the slice is reversed, the schema must be reversed as well.
|
|
mutation_reader make_reader(
|
|
schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state = {},
|
|
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
|
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
|
|
read_monitor& monitor = default_read_monitor(),
|
|
integrity_check integrity = integrity_check::no,
|
|
const utils::hashed_key* single_partition_read_murmur_hash = nullptr
|
|
);
|
|
|
|
// A reader which doesn't use the index at all. It reads everything from the
|
|
// sstable and it doesn't support skipping.
|
|
mutation_reader make_full_scan_reader(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state = {},
|
|
read_monitor& monitor = default_read_monitor(),
|
|
integrity_check integrity = integrity_check::no);
|
|
|
|
// Returns mutation_source containing all writes contained in this sstable.
|
|
// The mutation_source shares ownership of this sstable.
|
|
mutation_source as_mutation_source();
|
|
|
|
future<> write_components(mutation_reader mr,
|
|
uint64_t estimated_partitions,
|
|
schema_ptr schema,
|
|
const sstable_writer_config&,
|
|
encoding_stats stats);
|
|
|
|
sstable_writer get_writer(const schema& s,
|
|
uint64_t estimated_partitions,
|
|
const sstable_writer_config&,
|
|
encoding_stats enc_stats,
|
|
shard_id shard = this_shard_id());
|
|
|
|
// Validates the content of the sstable.
|
|
// Reports all errors via the provided error handler.
|
|
// Returns the count of all validation errors found.
|
|
// Can be aborted via the abort-source parameter.
|
|
// If aborted, either via the abort-source or via unrecoverable errors
|
|
// (e.g. parse error), it will return with validation error count seen up to
|
|
// the abort. In the latter case it will call the error-handler before doing so.
|
|
future<uint64_t> validate(reader_permit permit, abort_source& abort,
|
|
std::function<void(sstring)> error_handler, sstables::read_monitor& monitor = default_read_monitor(), bool validate_index = false);
|
|
|
|
encoding_stats get_encoding_stats_for_compaction() const;
|
|
|
|
future<> seal_sstable(bool backup);
|
|
|
|
// Size at full sampling is calculated as if sampling were static, using minimum index as a strict sampling interval.
|
|
static uint64_t get_size_at_full_sampling(const uint64_t key_count, const uint32_t min_index_interval) {
|
|
return std::ceil(float(key_count) / min_index_interval) - 1;
|
|
}
|
|
|
|
uint64_t get_estimated_key_count() const {
|
|
return get_stats_metadata().estimated_partition_size.count();
|
|
}
|
|
|
|
future<uint64_t> estimated_keys_for_range(const dht::token_range& range);
|
|
|
|
// mark_for_deletion() specifies that a sstable isn't relevant to the
|
|
// current shard, and thus can be deleted by the deletion manager, if
|
|
// all shards sharing it agree. In case the sstable is unshared, it's
|
|
// guaranteed that all of its on-disk files will be deleted as soon as
|
|
// the in-memory object is destroyed.
|
|
void mark_for_deletion() {
|
|
_marked_for_deletion = mark_for_deletion::marked;
|
|
}
|
|
|
|
bool marked_for_deletion() const {
|
|
return _marked_for_deletion == mark_for_deletion::marked;
|
|
}
|
|
|
|
const std::set<generation_type>& compaction_ancestors() const {
|
|
return _compaction_ancestors;
|
|
}
|
|
|
|
void add_ancestor(generation_type generation) {
|
|
_compaction_ancestors.insert(generation);
|
|
}
|
|
|
|
// Returns true iff this sstable contains data which belongs to many shards.
|
|
bool is_shared() const;
|
|
|
|
// Returns uncompressed size of data component.
|
|
uint64_t data_size() const;
|
|
// Returns on-disk size of data component.
|
|
uint64_t ondisk_data_size() const;
|
|
|
|
uint64_t index_size() const {
|
|
return _index_file_size;
|
|
}
|
|
file& index_file() {
|
|
return _index_file;
|
|
}
|
|
file uncached_index_file();
|
|
file uncached_partitions_file();
|
|
file uncached_rows_file();
|
|
// Returns size of bloom filter data.
|
|
uint64_t filter_size() const;
|
|
|
|
db_clock::time_point data_file_write_time() const {
|
|
return _data_file_write_time;
|
|
}
|
|
|
|
uint64_t filter_memory_size() const {
|
|
return _components->filter->memory_size();
|
|
}
|
|
|
|
version_types get_version() const {
|
|
return _version;
|
|
}
|
|
|
|
// Returns the total bytes of all components.
|
|
uint64_t bytes_on_disk() const;
|
|
file_size_stats get_file_size_stats() const;
|
|
|
|
const partition_key& get_first_partition_key() const;
|
|
const partition_key& get_last_partition_key() const;
|
|
|
|
const dht::decorated_key& get_first_decorated_key() const;
|
|
const dht::decorated_key& get_last_decorated_key() const;
|
|
|
|
// SSTable comparator using the first key (decorated key).
|
|
std::strong_ordering compare_by_first_key(const sstable& other) const;
|
|
|
|
// SSTable comparator using the max timestamp.
|
|
// Return values are those of a trichotomic comparison.
|
|
int compare_by_max_timestamp(const sstable& other) const;
|
|
|
|
sstring component_basename(component_type f) const {
|
|
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
|
|
}
|
|
|
|
component_name get_filename(component_type f = component_type::Data) const {
|
|
return component_name(*this, f);
|
|
}
|
|
|
|
component_name toc_filename() const {
|
|
return component_name(*this, component_type::TOC);
|
|
}
|
|
|
|
component_name index_filename() const {
|
|
return component_name(*this, component_type::Index);
|
|
}
|
|
|
|
bool requires_view_building() const noexcept { return _state == sstable_state::staging; }
|
|
|
|
bool is_quarantined() const noexcept { return _state == sstable_state::quarantine; }
|
|
|
|
bool is_uploaded() const noexcept { return _state == sstable_state::upload; }
|
|
|
|
std::vector<std::pair<component_type, sstring>> all_components() const;
|
|
|
|
future<> snapshot(const sstring& name) const;
|
|
|
|
// Delete the sstable by unlinking all sstable files
|
|
// Ignores all errors.
|
|
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
|
|
// and make sure the directory is sync'ed on or after the last call.
|
|
future<> unlink(storage::sync_dir sync = storage::sync_dir::yes) noexcept;
|
|
|
|
db::large_data_handler& get_large_data_handler() {
|
|
return _large_data_handler;
|
|
}
|
|
|
|
db::corrupt_data_handler& get_corrupt_data_handler() {
|
|
return _corrupt_data_handler;
|
|
}
|
|
|
|
void assert_large_data_handler_is_running();
|
|
|
|
/**
|
|
* Note. This is using the Origin definition of
|
|
* max_data_age, which is load time. This could maybe
|
|
* be improved upon.
|
|
*/
|
|
db_clock::time_point max_data_age() const {
|
|
return _now;
|
|
}
|
|
|
|
utils::observer<sstable&> add_on_closed_handler(std::function<void (sstable&)> on_closed_handler) noexcept {
|
|
return _on_closed.observe(on_closed_handler);
|
|
}
|
|
|
|
utils::observer<sstable&> add_on_delete_handler(std::function<void (sstable&)> on_delete_handler) noexcept {
|
|
return _on_delete.observe(on_delete_handler);
|
|
}
|
|
|
|
template<typename Func, typename... Args>
|
|
requires std::is_nothrow_move_constructible_v<Func>
|
|
auto sstable_write_io_check(Func&& func, Args&&... args) const noexcept {
|
|
return do_io_check(_write_error_handler, std::forward<Func>(func), std::forward<Args>(args)...);
|
|
}
|
|
|
|
// required since touch_directory has an optional parameter
|
|
auto sstable_touch_directory_io_check(std::filesystem::path name) const noexcept {
|
|
return do_io_check(_write_error_handler, [name = std::move(name)] () mutable {
|
|
return touch_directory(name.native());
|
|
});
|
|
}
|
|
future<> close_files();
|
|
|
|
/* Returns a lower-bound for the set of `position_in_partition`s appearing in the sstable across all partitions.
|
|
*
|
|
* Might be `before_all_keys` if, for example, the sstable comes from outside Scylla and lacks sufficient metadata,
|
|
* or the sstable's schema does not have clustering columns.
|
|
*
|
|
* But if the schema has clustering columns and the sstable is sufficiently ``modern'',
|
|
* the returned value should be equal to the smallest clustering key occurring in the sstable (across all partitions).
|
|
*
|
|
* The lower bound is inclusive: there might be a clustering row with position equal to min_position.
|
|
*/
|
|
const position_in_partition& min_position() const {
|
|
return _min_max_position_range.start();
|
|
}
|
|
|
|
/* Similar to min_position, but returns an upper-bound.
|
|
* However, the upper-bound is exclusive: all positions are smaller than max_position.
|
|
*
|
|
* If certain conditions are satisfied (the same as for `min_position`, see above),
|
|
* the returned value should be equal to after_key(ck), where ck is the greatest clustering key
|
|
* occurring in the sstable (across all partitions).
|
|
*/
|
|
const position_in_partition& max_position() const {
|
|
return _min_max_position_range.end();
|
|
}
|
|
|
|
const position_in_partition& first_partition_first_position() const noexcept {
|
|
return _first_partition_first_position;
|
|
}
|
|
|
|
const position_in_partition& last_partition_last_position() const noexcept {
|
|
return _last_partition_last_position;
|
|
}
|
|
|
|
const storage& get_storage() const {
|
|
return *_storage;
|
|
}
|
|
|
|
private:
|
|
friend struct component_name;
|
|
friend class sstable_stream_sink_impl;
|
|
friend class filesystem_storage;
|
|
friend class object_storage_base;
|
|
friend class tiered_storage;
|
|
|
|
const size_t sstable_buffer_size;
|
|
|
|
component_name filename(component_type f) const {
|
|
return component_name(*this, f);
|
|
}
|
|
|
|
std::unordered_set<component_type, enum_hash<component_type>> _recognized_components;
|
|
std::vector<sstring> _unrecognized_components;
|
|
|
|
foreign_ptr<lw_shared_ptr<shareable_components>> _components = make_foreign(make_lw_shared<shareable_components>());
|
|
column_translation _column_translation;
|
|
std::optional<open_flags> _open_mode;
|
|
// _compaction_ancestors track which sstable generations were used to generate this sstable.
|
|
// it is then used to generate the ancestors metadata in the statistics or scylla components.
|
|
std::set<generation_type> _compaction_ancestors;
|
|
file _index_file;
|
|
seastar::shared_ptr<cached_file> _cached_index_file;
|
|
file _data_file;
|
|
file _partitions_file;
|
|
seastar::shared_ptr<cached_file> _cached_partitions_file;
|
|
std::optional<trie::bti_partitions_db_footer> _partitions_db_footer;
|
|
file _rows_file;
|
|
seastar::shared_ptr<cached_file> _cached_rows_file;
|
|
uint64_t _data_file_size;
|
|
uint64_t _index_file_size = 0;
|
|
uint64_t _partitions_file_size = 0;
|
|
uint64_t _rows_file_size = 0;
|
|
// on-disk size of components but data and index.
|
|
uint64_t _metadata_size_on_disk = 0;
|
|
db_clock::time_point _data_file_write_time;
|
|
position_range _min_max_position_range = position_range::all_clustered_rows();
|
|
position_in_partition _first_partition_first_position = position_in_partition::before_all_clustered_rows();
|
|
position_in_partition _last_partition_last_position = position_in_partition::after_all_clustered_rows();
|
|
std::vector<unsigned> _shards;
|
|
std::optional<dht::decorated_key> _first;
|
|
std::optional<dht::decorated_key> _last;
|
|
run_id _run_identifier;
|
|
utils::observable<sstable&> _on_closed;
|
|
utils::observable<sstable&> _on_delete;
|
|
|
|
lw_shared_ptr<file_input_stream_history> _single_partition_history = make_lw_shared<file_input_stream_history>();
|
|
lw_shared_ptr<file_input_stream_history> _partition_range_history = make_lw_shared<file_input_stream_history>();
|
|
lw_shared_ptr<file_input_stream_history> _index_history = make_lw_shared<file_input_stream_history>();
|
|
|
|
schema_ptr _schema;
|
|
generation_type _generation{0};
|
|
sstable_state _state;
|
|
sstable_enabled_features _features = {};
|
|
|
|
std::unique_ptr<storage> _storage;
|
|
|
|
const version_types _version;
|
|
const format_types _format;
|
|
|
|
filter_tracker _filter_tracker;
|
|
std::unique_ptr<partition_index_cache> _index_cache;
|
|
|
|
enum class mark_for_deletion {
|
|
implicit = -1,
|
|
none = 0,
|
|
marked = 1
|
|
} _marked_for_deletion = mark_for_deletion::none;
|
|
bool _active = true;
|
|
|
|
db_clock::time_point _now;
|
|
|
|
io_error_handler _read_error_handler;
|
|
io_error_handler _write_error_handler;
|
|
|
|
db::large_data_handler& _large_data_handler;
|
|
db::corrupt_data_handler& _corrupt_data_handler;
|
|
sstables_manager& _manager;
|
|
|
|
sstables_stats _stats;
|
|
// link used by the _active list of sstables manager
|
|
manager_list_link_type _manager_list_link;
|
|
// link used by the _reclaimed set of sstables manager
|
|
manager_set_link_type _manager_set_link;
|
|
|
|
|
|
// The _large_data_stats map stores e.g. largest partitions, rows, cells sizes,
|
|
// and max number of rows in a partition.
|
|
//
|
|
// It can be disengaged normally when loading legacy sstables that do not have this
|
|
// information in their scylla metadata.
|
|
std::optional<scylla_metadata::large_data_stats> _large_data_stats;
|
|
sstring _origin;
|
|
std::optional<scylla_metadata::ext_timestamp_stats> _ext_timestamp_stats;
|
|
optimized_optional<sstable_id> _sstable_identifier;
|
|
|
|
// Total reclaimable memory from all the components of the SSTable.
|
|
// It is initialized to 0 to prevent the sstables manager from reclaiming memory
|
|
// from the components before the SSTable has been fully loaded.
|
|
mutable std::optional<size_t> _total_reclaimable_memory{0};
|
|
// Total memory reclaimed so far from this sstable
|
|
size_t _total_memory_reclaimed{0};
|
|
bool _unlinked{false};
|
|
bool _ignore_component_digest_mismatch{false};
|
|
|
|
// The mutate semaphore is used to serialize operations like rewrite_statistics
|
|
// with linking or moving the sstable between directories.
|
|
mutable named_semaphore _mutate_sem{1, named_semaphore_exception_factory{"sstable mutate"}};
|
|
std::optional<sstring> _cloned_to_sstable_filename;
|
|
// Used only for writing sstable.
|
|
scylla_metadata::components_digests _components_digests;
|
|
uint32_t _toc_digest{};
|
|
public:
|
|
bool has_component(component_type f) const;
|
|
sstables_manager& manager() { return _manager; }
|
|
const sstables_manager& manager() const { return _manager; }
|
|
|
|
static future<std::pair<std::vector<sstring>, uint32_t>> read_and_parse_toc(file f);
|
|
private:
|
|
void unused(); // Called when reference count drops to zero
|
|
future<file> open_file(component_type, open_flags, file_open_options = {}) const noexcept;
|
|
|
|
template <component_type Type, typename T>
|
|
future<> read_simple(T& comp);
|
|
template <component_type Type, typename T>
|
|
future<std::optional<uint32_t>> read_simple_with_digest(T& comp);
|
|
template <component_type Type, typename T>
|
|
future<> read_simple_and_verify_digest(T& comp);
|
|
future<> do_read_simple(component_type type,
|
|
noncopyable_function<future<> (version_types, file&&, uint64_t sz)> read_component);
|
|
// this variant closes the file on parse completion
|
|
future<> do_read_simple(component_type type,
|
|
noncopyable_function<future<> (version_types, file)> read_component);
|
|
|
|
template <component_type Type, typename T>
|
|
void write_simple(const T& comp);
|
|
void do_write_simple(file_writer& writer,
|
|
noncopyable_function<void (version_types, file_writer&)> write_component);
|
|
void do_write_simple(component_type type,
|
|
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
|
|
unsigned buffer_size);
|
|
|
|
template <component_type Type, typename T>
|
|
uint32_t write_simple_with_digest(const T& comp);
|
|
uint32_t do_write_simple_with_digest(component_type type,
|
|
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
|
|
unsigned buffer_size);
|
|
|
|
void write_crc(const checksum& c);
|
|
void write_digest(uint32_t full_checksum);
|
|
|
|
future<file> new_sstable_component_file(const io_error_handler& error_handler, component_type f, open_flags flags, file_open_options options = {}) const noexcept;
|
|
|
|
future<> unlink_component(component_type type) noexcept;
|
|
|
|
future<file_writer> make_component_file_writer(component_type c, file_output_stream_options options,
|
|
open_flags oflags = sstable_write_open_flags) noexcept;
|
|
|
|
future<std::unique_ptr<crc32_digest_file_writer>> make_digests_component_file_writer(component_type c, file_output_stream_options options,
|
|
open_flags oflags = sstable_write_open_flags) noexcept;
|
|
|
|
void generate_toc();
|
|
void open_sstable(const sstring& origin);
|
|
|
|
future<> read_compression();
|
|
void write_compression();
|
|
|
|
future<> read_scylla_metadata() noexcept;
|
|
|
|
void write_scylla_metadata(shard_id shard,
|
|
run_identifier identifier,
|
|
std::optional<scylla_metadata::large_data_stats> ld_stats,
|
|
std::optional<scylla_metadata::ext_timestamp_stats> ts_stats);
|
|
|
|
future<> read_filter(sstable_open_config cfg = {});
|
|
|
|
void write_filter();
|
|
// Rebuild a bloom filter from the index with the given number of
|
|
// partitions, if the partition estimate provided during bloom
|
|
// filter initialisation was not good.
|
|
// This should be called only before an sstable is sealed.
|
|
void maybe_rebuild_filter_from_index(uint64_t num_partitions);
|
|
|
|
void build_delayed_filter(uint64_t num_partitions);
|
|
|
|
future<> update_info_for_opened_data(sstable_open_config cfg = {});
|
|
|
|
future<> read_toc(sstable_open_config cfg = {}) noexcept;
|
|
future<> read_summary() noexcept;
|
|
|
|
void write_summary() {
|
|
auto digest = write_simple_with_digest<component_type::Summary>(_components->summary);
|
|
_components_digests.map[component_type::Summary] = digest;
|
|
}
|
|
|
|
// To be called when we try to load an SSTable that lacks a Summary. Could
|
|
// happen if old tools are being used.
|
|
future<> generate_summary();
|
|
|
|
future<> read_partitions_db_footer();
|
|
|
|
future<> read_statistics();
|
|
void write_statistics();
|
|
// Validate metadata that's used to optimize reads when user specifies
|
|
// a clustering key range. If this specific metadata is incorrect, then
|
|
// it should be cleared. Otherwise, it could lead to bad decisions.
|
|
// Metadata is probably incorrect if generated by previous Scylla versions.
|
|
void validate_min_max_metadata();
|
|
// Validate metadata that's used to determine if sstable is fully expired
|
|
// sstable that doesn't contain scylla component may contain wrong metadata,
|
|
// and so max_local_deletion_time should be discarded for those.
|
|
void validate_max_local_deletion_time();
|
|
void validate_partitioner();
|
|
void validate_component_digest(component_type type, uint32_t computed_digest) const;
|
|
future<> validate_index_digest() const;
|
|
future<uint32_t> compute_component_file_digest(component_type type) const;
|
|
future<uint32_t> compute_component_file_digest(file f, size_t size) const;
|
|
|
|
// Loads first and last partition keys from appropriate components into `_first` and `_last`.
|
|
void set_first_and_last_keys();
|
|
|
|
// Create a position range based on the min/max_column_names metadata of this sstable.
|
|
// It does nothing if schema defines no clustering key, and it's supposed
|
|
// to be called when loading an existing sstable or after writing a new one.
|
|
void set_min_max_position_range();
|
|
|
|
// Loads first position of the first partition, and last position of the last
|
|
// partition. Does nothing if schema defines no clustering key.
|
|
future<> load_first_and_last_position_in_partition();
|
|
|
|
future<> create_data() noexcept;
|
|
|
|
// Note that only bloom filters are reclaimable by the following methods.
|
|
// Return the total reclaimable memory in this SSTable
|
|
size_t total_reclaimable_memory_size() const;
|
|
// Reclaim memory from the components back to the system.
|
|
size_t reclaim_memory_from_components();
|
|
// Return memory reclaimed so far from this sstable
|
|
size_t total_memory_reclaimed() const;
|
|
// Reload components from which memory was previously reclaimed
|
|
future<> reload_reclaimed_components();
|
|
// Disable reload of components for this sstable
|
|
void disable_component_memory_reload();
|
|
|
|
static bool is_component_rewrite_supported(component_type type);
|
|
// Must be called in a seastar thread
|
|
void write_component(component_type type);
|
|
public:
|
|
// Finds first position_in_partition in a given partition.
|
|
// If reversed is false, then the first position is actually the first row (can be the static one).
|
|
// If reversed is true, then the first position is the last row (can be static if partition has a single static row).
|
|
future<std::optional<position_in_partition>>
|
|
find_first_position_in_partition(reader_permit permit, const dht::decorated_key& key, bool reversed);
|
|
|
|
// Return an input_stream which reads exactly the specified byte range
|
|
// from the data file (after uncompression, if the file is compressed).
|
|
// Unlike data_read() below, this method does not read the entire byte
|
|
// range into memory all at once. Rather, this method allows reading the
|
|
// data incrementally as a stream. Knowing in advance the exact amount
|
|
// of bytes to be read using this stream, we can make better choices
|
|
// about the buffer size to read, and where exactly to stop reading
|
|
// (even when a large buffer size is used).
|
|
//
|
|
// When created with `raw_stream::yes`, the sstable data file will be
|
|
// streamed as-is, without decompressing (if compressed).
|
|
//
|
|
// When created with `raw_stream::compressed_chunks`, compressed sstable data
|
|
// will be streamed as raw compressed chunks with checksum verification but
|
|
// without decompression, and digests will be calculated.
|
|
//
|
|
// When created with `integrity_check::yes`, the integrity mechanisms
|
|
// of the underlying data streams will be enabled.
|
|
//
|
|
// The `error_handler` parameter allows to customize the error handling
|
|
// logic when a checksum or digest mismatch is detected on an
|
|
// integrity-checked stream with no compression. The parameter is ignored
|
|
// if integrity checking is disabled or the SSTable is compressed.
|
|
enum class raw_stream {
|
|
no,
|
|
yes,
|
|
compressed_chunks
|
|
};
|
|
future<input_stream<char>> data_stream(uint64_t pos, size_t len,
|
|
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history,
|
|
raw_stream raw = raw_stream::no, integrity_check integrity = integrity_check::no,
|
|
integrity_error_handler error_handler = throwing_integrity_error_handler);
|
|
|
|
future<input_stream<char>> data_stream(uint64_t pos, size_t len,
|
|
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history,
|
|
file_input_stream_options options,
|
|
raw_stream raw = raw_stream::no, integrity_check integrity = integrity_check::no,
|
|
integrity_error_handler error_handler = throwing_integrity_error_handler);
|
|
|
|
// Read exactly the specific byte range from the data file (after
|
|
// uncompression, if the file is compressed). This can be used to read
|
|
// a specific row from the data file (its position and length can be
|
|
// determined using the index file).
|
|
// This function is intended (and optimized for) random access, not
|
|
// for iteration through all the rows.
|
|
future<temporary_buffer<char>> data_read(uint64_t pos, size_t len, reader_permit permit);
|
|
|
|
private:
|
|
future<summary_entry&> read_summary_entry(size_t i);
|
|
|
|
bool filter_has_key(const schema& s, const dht::decorated_key& dk) { return filter_has_key(key::from_partition_key(s, dk._key)); }
|
|
|
|
std::optional<std::pair<uint64_t, uint64_t>> get_sample_indexes_for_range(const dht::token_range& range);
|
|
std::optional<std::pair<uint64_t, uint64_t>> get_index_pages_for_range(const dht::token_range& range);
|
|
|
|
std::vector<unsigned> compute_shards_for_this_sstable(const dht::sharder&) const;
|
|
template <typename Components>
|
|
static auto& get_mutable_serialization_header(Components& components) {
|
|
auto entry = components.statistics.contents.find(metadata_type::Serialization);
|
|
if (entry == components.statistics.contents.end()) {
|
|
throw std::runtime_error("Serialization header metadata not available");
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
serialization_header& s = *static_cast<serialization_header *>(p.get());
|
|
return s;
|
|
}
|
|
|
|
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
|
|
// runs in async context (called from storage::open)
|
|
void write_toc(std::unique_ptr<crc32_digest_file_writer> w);
|
|
static future<uint32_t> read_digest_from_file(file f);
|
|
static future<lw_shared_ptr<checksum>> read_checksum_from_file(file f);
|
|
public:
|
|
|
|
shareable_components& get_shared_components() const {
|
|
return *_components;
|
|
}
|
|
schema_ptr get_schema() const {
|
|
return _schema;
|
|
}
|
|
|
|
bool has_scylla_component() const {
|
|
return has_component(component_type::Scylla);
|
|
}
|
|
|
|
// Returns an optional boolean value set to true iff the
|
|
// sstable's `originating_host_id` in stats metadata equals
|
|
// this node's host_id.
|
|
//
|
|
// The returned value may be nullopt if:
|
|
// - The sstable format is older than version_types::me, or
|
|
// - The local host_id is unknown yet (may happen early in the start-up process)
|
|
std::optional<bool> originated_on_this_node() const;
|
|
|
|
void validate_originating_host_id() const;
|
|
|
|
bool has_correct_promoted_index_entries() const {
|
|
return _schema->is_compound() || !has_scylla_component() || has_feature(sstable_feature::NonCompoundPIEntries);
|
|
}
|
|
|
|
bool has_correct_non_compound_range_tombstones() const {
|
|
return _schema->is_compound() || !has_scylla_component() || has_feature(sstable_feature::NonCompoundRangeTombstones);
|
|
}
|
|
|
|
bool has_shadowable_tombstones() const {
|
|
return has_feature(sstable_feature::ShadowableTombstones);
|
|
}
|
|
|
|
sstable_enabled_features features() const {
|
|
return _features;
|
|
}
|
|
|
|
void set_features(sstable_enabled_features sef) {
|
|
_features = sef;
|
|
}
|
|
|
|
bool has_feature(sstable_feature f) const {
|
|
return features().is_enabled(f);
|
|
}
|
|
|
|
const scylla_metadata* get_scylla_metadata() const {
|
|
return _components->scylla_metadata ? &*_components->scylla_metadata : nullptr;
|
|
}
|
|
|
|
run_id run_identifier() const {
|
|
return _run_identifier;
|
|
}
|
|
|
|
bool has_correct_max_deletion_time() const {
|
|
return (_version >= sstable_version_types::mc) || has_scylla_component();
|
|
}
|
|
|
|
bool filter_has_key(const key& key) const {
|
|
return _components->filter->is_present(bytes_view(key));
|
|
}
|
|
|
|
/*!
|
|
* \brief check if the sstable contains the given key.
|
|
* The method would search that the key is actually
|
|
* found in the sstable not just in the filter.
|
|
*
|
|
*/
|
|
future<bool> has_partition_key(const utils::hashed_key& hk, const dht::decorated_key& dk);
|
|
|
|
bool filter_has_key(utils::hashed_key key) const {
|
|
return _components->filter->is_present(key);
|
|
}
|
|
|
|
bool filter_has_key(const schema& s, partition_key_view key) const {
|
|
return filter_has_key(key::from_partition_key(s, key));
|
|
}
|
|
|
|
static utils::hashed_key make_hashed_key(const schema& s, const partition_key& key);
|
|
|
|
filter_tracker& get_filter_tracker() { return _filter_tracker; }
|
|
|
|
uint64_t filter_get_false_positive() const {
|
|
return _filter_tracker.false_positive;
|
|
}
|
|
uint64_t filter_get_true_positive() const {
|
|
return _filter_tracker.true_positive;
|
|
}
|
|
uint64_t filter_get_recent_false_positive() {
|
|
auto t = _filter_tracker.false_positive - _filter_tracker.last_false_positive;
|
|
_filter_tracker.last_false_positive = _filter_tracker.false_positive;
|
|
return t;
|
|
}
|
|
uint64_t filter_get_recent_true_positive() {
|
|
auto t = _filter_tracker.true_positive - _filter_tracker.last_true_positive;
|
|
_filter_tracker.last_true_positive = _filter_tracker.true_positive;
|
|
return t;
|
|
}
|
|
|
|
const statistics& get_statistics() const {
|
|
return _components->statistics;
|
|
}
|
|
const stats_metadata& get_stats_metadata() const {
|
|
auto entry = _components->statistics.contents.find(metadata_type::Stats);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
throw std::runtime_error("Stats metadata not available");
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
const stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
|
return s;
|
|
}
|
|
const compaction_metadata& get_compaction_metadata() const {
|
|
auto entry = _components->statistics.contents.find(metadata_type::Compaction);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
throw std::runtime_error("Compaction metadata not available");
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
const compaction_metadata& s = *static_cast<compaction_metadata *>(p.get());
|
|
return s;
|
|
}
|
|
const serialization_header& get_serialization_header() const {
|
|
return get_mutable_serialization_header(*_components);
|
|
}
|
|
column_translation get_column_translation(
|
|
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
|
return _column_translation.get_for_schema(s, h, f);
|
|
}
|
|
column_translation get_column_translation(const schema& s) {
|
|
if (get_version() >= sstable_version_types::mc) [[likely]] {
|
|
return _column_translation.get_for_schema(s, get_serialization_header(), features());
|
|
} else {
|
|
return _column_translation.get_for_schema(s);
|
|
}
|
|
}
|
|
column_translation get_column_translation() {
|
|
if (!_column_translation.version()) {
|
|
return get_column_translation(*_schema);
|
|
}
|
|
return _column_translation;
|
|
}
|
|
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
|
return _shards;
|
|
}
|
|
|
|
gc_clock::time_point get_max_local_deletion_time() const {
|
|
return gc_clock::time_point(gc_clock::duration(get_stats_metadata().max_local_deletion_time));
|
|
}
|
|
|
|
uint32_t get_sstable_level() const {
|
|
return get_stats_metadata().sstable_level;
|
|
}
|
|
|
|
// This will change sstable level only in memory.
|
|
void set_sstable_level(uint32_t);
|
|
|
|
void generate_new_run_identifier() {
|
|
_run_identifier = run_id::create_random_id();
|
|
}
|
|
|
|
double get_compression_ratio() const;
|
|
|
|
const sstables::compression& get_compression() const {
|
|
return _components->compression;
|
|
}
|
|
|
|
void mutate_sstable_level(uint32_t);
|
|
bool should_mutate_sstable_level(uint32_t) const;
|
|
|
|
const summary& get_summary() const {
|
|
return _components->summary;
|
|
}
|
|
|
|
const lw_shared_ptr<const checksum> get_checksum() const {
|
|
return _components->checksum ? _components->checksum->shared_from_this() : nullptr;
|
|
}
|
|
|
|
std::optional<uint32_t> get_digest() const {
|
|
return _components->digest;
|
|
}
|
|
|
|
// Used only for writing sstable.
|
|
scylla_metadata::components_digests& get_components_digests() {
|
|
return _components_digests;
|
|
}
|
|
|
|
std::optional<uint32_t> get_component_digest(component_type c) const;
|
|
|
|
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
|
|
// for cells and tombstones expired before the time point "GC before", which
|
|
// is the point before which expiring data can be purged.
|
|
double estimate_droppable_tombstone_ratio(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
|
|
|
|
// get sstable open info from a loaded sstable, which can be used to quickly open a sstable
|
|
// at another shard.
|
|
future<foreign_sstable_open_info> get_open_info() &;
|
|
entry_descriptor get_descriptor(component_type c) const;
|
|
|
|
sstables_stats& get_stats() {
|
|
return _stats;
|
|
}
|
|
|
|
bool has_correct_min_max_column_names() const noexcept {
|
|
return _version >= sstable_version_types::md;
|
|
}
|
|
|
|
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
|
|
bool may_contain_rows(const query::clustering_row_ranges& ranges) const;
|
|
|
|
// false => there are no partition tombstones, true => we don't know
|
|
bool may_have_partition_tombstones() const {
|
|
return !has_correct_min_max_column_names()
|
|
|| _min_max_position_range.is_all_clustered_rows(*_schema);
|
|
}
|
|
|
|
// Return the large_data_stats_entry identified by large_data_type
|
|
// iff _large_data_stats is available and the requested entry is in
|
|
// the map. Otherwise, return a disengaged optional.
|
|
std::optional<large_data_stats_entry> get_large_data_stat(large_data_type t) const noexcept;
|
|
|
|
// Return the extended timestamp statistics map.
|
|
// Some or all entries may be missing if not present in scylla_metadata
|
|
scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept;
|
|
|
|
const sstring& get_origin() const noexcept {
|
|
return _origin;
|
|
}
|
|
|
|
// sstable_id is null iff not present in scylla_metadata
|
|
const optimized_optional<sstable_id>& sstable_identifier() const noexcept {
|
|
return _sstable_identifier;
|
|
}
|
|
|
|
// Drops all evictable in-memory caches of on-disk content.
|
|
future<> drop_caches();
|
|
|
|
// Returns a read-only file for all existing components of the sstable
|
|
future<std::unordered_map<component_type, file>> readable_file_for_all_components() const;
|
|
|
|
// Clones this sstable with a new generation, under the same location as the original one.
|
|
// If leave_unsealed is true, the destination sstable is left unsealed.
|
|
// Implementation is underlying storage specific.
|
|
future<entry_descriptor> clone(generation_type new_generation, bool leave_unsealed = false) const;
|
|
|
|
struct lesser_reclaimed_memory {
|
|
// comparator class to be used by the _reclaimed set in sstables manager
|
|
bool operator()(const sstable& sst1, const sstable& sst2) const {
|
|
return sst1.total_memory_reclaimed() < sst2.total_memory_reclaimed();
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<abstract_index_reader> make_index_reader(
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state = {},
|
|
use_caching caching = use_caching::yes,
|
|
bool single_partition_read = false);
|
|
|
|
// Allow the test cases from sstable_test.cc to test private methods. We use
|
|
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
|
// will then re-export as public every method it needs.
|
|
friend class test;
|
|
|
|
friend class mc::writer;
|
|
friend class index_reader;
|
|
friend class sstables_manager;
|
|
template <typename DataConsumeRowsContext>
|
|
friend future<std::unique_ptr<DataConsumeRowsContext>>
|
|
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t, integrity_check);
|
|
template <typename DataConsumeRowsContext>
|
|
friend future<std::unique_ptr<DataConsumeRowsContext>>
|
|
data_consume_single_partition(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, integrity_check);
|
|
template <typename DataConsumeRowsContext>
|
|
friend future<std::unique_ptr<DataConsumeRowsContext>>
|
|
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, integrity_check);
|
|
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
|
|
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
|
|
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
|
|
|
|
future<std::optional<uint32_t>> read_digest();
|
|
future<std::optional<uint32_t>> read_digest(file f);
|
|
future<lw_shared_ptr<checksum>> read_checksum(file f);
|
|
future<lw_shared_ptr<checksum>> read_checksum();
|
|
|
|
friend in_memory_config_type;
|
|
|
|
service::session_id being_repaired;
|
|
public:
|
|
void mark_as_being_repaired(const service::session_id& id);
|
|
int64_t update_repaired_at(int64_t repaired_at);
|
|
future<> copy_components(const sstable& src);
|
|
bool should_update_repaired_at(int64_t repaired_at) const;
|
|
|
|
// Creates a new sstable by linking all sstable components except for the specified component,
|
|
// which is created by calling the provided sstable_creator function and then written to the disc.
|
|
// The modifier function is called on the new sstable before writing the component
|
|
// Returns the newly created and sealed sstable.
|
|
future<shared_sstable> link_with_rewritten_component(std::function<shared_sstable(shared_sstable)> sstable_creator,
|
|
component_type component,
|
|
std::function<void(sstable&)> modifier,
|
|
bool update_sstable_id);
|
|
// Must be called in a seastar thread
|
|
void write_component_with_metadata(component_type type, scylla_metadata metadata);
|
|
};
|
|
|
|
// Validate checksums
|
|
//
|
|
// Sstables have two kind of checksums: per-chunk checksums and a
|
|
// full-checksum (digest) calculated over the entire content of Data.db.
|
|
//
|
|
// The full-checksum (digest) is stored in Digest.crc (component_type::Digest).
|
|
//
|
|
// When compression is used, the per-chunk checksum is stored directly inside
|
|
// Data.db, after each compressed chunk. These are validated on read, when
|
|
// decompressing the respective chunks.
|
|
// When no compression is used, the per-chunk checksum is stored separately
|
|
// in CRC.db (component_type::CRC). Chunk size is defined and stored in said
|
|
// component as well.
|
|
//
|
|
// In both compressed and uncompressed sstables, checksums are calculated
|
|
// on the data that is actually written to disk, so in case of compressed
|
|
// data, on the compressed data.
|
|
//
|
|
// This method validates both the full checksum and the per-chunk checksum
|
|
// for the entire Data.db.
|
|
//
|
|
// Returns `valid` if all checksums are valid.
|
|
// Returns `invalid` if at least one checksum is invalid.
|
|
// Returns `no_checksum` if the sstable is uncompressed and does not have
|
|
// a CRC component (CRC.db is missing from TOC.txt).
|
|
// Validation errors are logged individually.
|
|
enum class validate_checksums_status {
|
|
invalid = 0,
|
|
valid = 1,
|
|
no_checksum = 2
|
|
};
|
|
struct validate_checksums_result {
|
|
validate_checksums_status status;
|
|
bool has_digest;
|
|
};
|
|
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit);
|
|
|
|
struct index_sampling_state {
|
|
static constexpr size_t default_summary_byte_cost = 2000;
|
|
|
|
uint64_t next_data_offset_to_write_summary = 0;
|
|
uint64_t partition_count = 0;
|
|
// Enforces ratio of summary to data of 1 to N.
|
|
size_t summary_byte_cost = default_summary_byte_cost;
|
|
};
|
|
|
|
future<> init_metrics();
|
|
|
|
class file_io_extension {
|
|
public:
|
|
virtual ~file_io_extension() {}
|
|
virtual future<file> wrap_file(const sstable&, component_type, file, open_flags flags) = 0;
|
|
|
|
// same intent as wrap_file, but a data_sink, i.e. write-only, simplified
|
|
// output device. Default impl will call wrap_file and generate a wrapper object.
|
|
virtual future<data_sink> wrap_sink(const sstable&, component_type, data_sink);
|
|
|
|
virtual future<data_source>
|
|
wrap_source(const sstable&, component_type, data_source);
|
|
// optionally return a map of attributes for a given sstable,
|
|
// suitable for "describe".
|
|
// This would preferably be interesting info on what/why the extension did
|
|
// to this table.
|
|
using attr_value_type = std::variant<sstring, std::map<sstring, sstring>>;
|
|
using attr_value_map = std::map<sstring, attr_value_type>;
|
|
virtual attr_value_map get_attributes(const sstable&) const {
|
|
return {};
|
|
}
|
|
};
|
|
|
|
// safely removes the table directory.
|
|
// swallows all errors and just reports them to the log.
|
|
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir);
|
|
|
|
// makes sure the TOC file is temporary by moving existing TOC file or otherwise
|
|
// checking the temporary-TOC already exists
|
|
// resolves into temporary-TOC file name or empty string if neither TOC nor temp.
|
|
// TOC is there
|
|
future<sstring> make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes);
|
|
|
|
// This snapshot allows the sstable files to be read even if they were removed from the directory
|
|
struct sstable_files_snapshot {
|
|
shared_sstable sst;
|
|
std::unordered_map<component_type, file> files;
|
|
};
|
|
|
|
// A sstable_stream_source gives back
|
|
// component input streams suitable for streaming to other nodes,
|
|
// in appropriate order. Data will be decrypted and sanitized as required.
|
|
class sstable_stream_source {
|
|
protected:
|
|
shared_sstable _sst;
|
|
component_type _type;
|
|
public:
|
|
sstable_stream_source(shared_sstable, component_type);
|
|
virtual ~sstable_stream_source() = default;
|
|
|
|
// Input stream for data appropriate for stream transfer for this component
|
|
virtual future<input_stream<char>> input(const file_input_stream_options&) const = 0;
|
|
|
|
// source sstable
|
|
const sstable& source() const {
|
|
return *_sst;
|
|
}
|
|
// component
|
|
component_type type() const {
|
|
return _type;
|
|
}
|
|
std::string component_basename() const;
|
|
};
|
|
|
|
// Translates the result of gathering readable snapshot files into ordered items for streaming.
|
|
future<std::vector<std::unique_ptr<sstable_stream_source>>> create_stream_sources(const sstables::sstable_files_snapshot&, reader_permit);
|
|
|
|
class sstable_stream_sink {
|
|
public:
|
|
virtual ~sstable_stream_sink() = default;
|
|
// Stream to the component file
|
|
virtual future<output_stream<char>> output(const file_open_options&, const file_output_stream_options&) = 0;
|
|
// closes this component. If this is the last component in a set (see "last_component" in creating method below)
|
|
// the table on disk will be sealed.
|
|
// Returns sealed sstable if last, or nullptr otherwise.
|
|
virtual future<shared_sstable> close() = 0;
|
|
virtual future<> abort() = 0;
|
|
};
|
|
|
|
struct sstable_stream_sink_cfg {
|
|
bool last_component = false;
|
|
bool leave_unsealed = false;
|
|
};
|
|
|
|
// Creates a sink object which can receive a component file sourced from above source object data.
|
|
|
|
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, sstable_stream_sink_cfg cfg);
|
|
|
|
} // namespace sstables
|
|
|
|
template <> struct fmt::formatter<sstables::sstable_state> : fmt::formatter<string_view> {
|
|
auto format(sstables::sstable_state state, fmt::format_context& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{}", state_to_dir(state));
|
|
}
|
|
};
|