Add `ignore_component_digest_mismatch` option to `sstable_open_config` that logs a warning instead of throwing `malformed_sstable_exception` on component digest mismatch. This is useful for recovering sstables with corrupted non-vital components or working around bugs in digest calculation. Expose the option in scylla-sstable via the `--ignore-component-digest-mismatch` flag for the upgrade operation.
4392 lines
183 KiB
C++
4392 lines
183 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "utils/log.hh"
|
|
#include <concepts>
|
|
#include <vector>
|
|
#include <limits>
|
|
#include <algorithm>
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/shared_ptr_incomplete.hh>
|
|
#include <seastar/core/do_with.hh>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/byteorder.hh>
|
|
#include <seastar/core/aligned_buffer.hh>
|
|
#include <seastar/core/metrics.hh>
|
|
#include <seastar/core/reactor.hh>
|
|
#include <seastar/coroutine/all.hh>
|
|
#include <seastar/util/file.hh>
|
|
#include <seastar/util/closeable.hh>
|
|
#include <seastar/util/short_streams.hh>
|
|
#include <seastar/util/memory-data-source.hh>
|
|
#include <seastar/util/memory-data-sink.hh>
|
|
#include <iterator>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/coroutine/as_future.hh>
|
|
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/to_string.hh"
|
|
#include "data_dictionary/storage_options.hh"
|
|
#include "dht/sharder.hh"
|
|
#include "writer.hh"
|
|
#include "m_format_read_helpers.hh"
|
|
#include "open_info.hh"
|
|
#include "sstables.hh"
|
|
#include "sstable_writer.hh"
|
|
#include "sstable_version.hh"
|
|
#include "metadata_collector.hh"
|
|
#include "progress_monitor.hh"
|
|
#include "compress.hh"
|
|
#include "checksummed_data_source.hh"
|
|
#include "index_reader.hh"
|
|
#include "downsampling.hh"
|
|
#include <boost/algorithm/string.hpp>
|
|
#include <boost/regex.hpp>
|
|
#include <seastar/core/align.hh>
|
|
#include "mutation/range_tombstone_list.hh"
|
|
#include "binary_search.hh"
|
|
#include "utils/bloom_filter.hh"
|
|
#include "utils/cached_file.hh"
|
|
#include "utils/stall_free.hh"
|
|
#include "utils/checked-file-impl.hh"
|
|
#include "db/extensions.hh"
|
|
#include "sstables/partition_index_cache.hh"
|
|
#include "db/large_data_handler.hh"
|
|
#include "db/config.hh"
|
|
#include "sstables/random_access_reader.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "sstables/partition_index_cache.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "sstables_manager.hh"
|
|
#include "tracing/traced_file.hh"
|
|
#include "kl/reader.hh"
|
|
#include "mx/reader.hh"
|
|
#include "utils/bit_cast.hh"
|
|
#include "utils/cached_file.hh"
|
|
#include "tombstone_gc.hh"
|
|
#include "reader_concurrency_semaphore.hh"
|
|
#include "readers/mutation_source.hh"
|
|
#include "readers/reversing.hh"
|
|
#include "readers/forwardable.hh"
|
|
#include "sstables/trie/bti_index.hh"
|
|
#include "partition_slice_builder.hh"
|
|
|
|
#include "release.hh"
|
|
#include "utils/build_id.hh"
|
|
#include "utils/labels.hh"
|
|
#include "utils/io-wrappers.hh"
|
|
|
|
#include <boost/lexical_cast.hpp>
|
|
|
|
thread_local disk_error_signal_type sstable_read_error;
|
|
thread_local disk_error_signal_type sstable_write_error;
|
|
|
|
namespace sstables {
|
|
|
|
// The below flag governs the mode of index file page caching used by the index
|
|
// reader.
|
|
//
|
|
// If set to true, the reader will read and/or populate a common global cache,
|
|
// which shares its capacity with the row cache. If false, the reader will use
|
|
// BYPASS CACHE semantics for index caching.
|
|
//
|
|
// This flag is intended to be a temporary hack. The goal is to eventually
|
|
// solve index caching problems via a smart cache replacement policy.
|
|
//
|
|
thread_local utils::updateable_value<bool> global_cache_index_pages(true);
|
|
|
|
logging::logger sstlog("sstable");
|
|
|
|
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename) {
|
|
auto make_exception = [&] {
|
|
if (message.empty()) {
|
|
message = "parse_assert() failed";
|
|
}
|
|
if (filename) {
|
|
return malformed_sstable_exception(message, *filename);
|
|
}
|
|
return malformed_sstable_exception(message);
|
|
};
|
|
auto ex = std::make_exception_ptr(make_exception());
|
|
on_internal_error(sstlog, std::move(ex));
|
|
}
|
|
|
|
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos) {
|
|
on_internal_error(sstlog, fmt::format("BTI parse error for node at pos {}", pos));
|
|
}
|
|
|
|
template <typename T>
|
|
const char* nullsafe_typename(T* x) noexcept {
|
|
try {
|
|
return typeid(*x).name();
|
|
} catch (const std::bad_typeid&) {
|
|
return "nullptr";
|
|
}
|
|
}
|
|
|
|
// dynamic_cast, but calls on_internal_error on failure.
|
|
template <typename Derived, typename Base>
|
|
Derived* downcast_ptr(Base* x) {
|
|
if (auto casted = dynamic_cast<Derived*>(x)) {
|
|
return casted;
|
|
} else {
|
|
on_internal_error(sstlog, fmt::format("Bad downcast: expected {}, but got {}", typeid(Derived*).name(), nullsafe_typename(x)));
|
|
}
|
|
}
|
|
|
|
// Because this is a noop and won't hold any state, it is better to use a global than a
|
|
// thread_local. It will be faster, specially on non-x86.
|
|
struct noop_write_monitor final : public write_monitor {
|
|
virtual void on_write_started(const writer_offset_tracker&) override { };
|
|
virtual void on_data_write_completed() override { }
|
|
};
|
|
static noop_write_monitor default_noop_write_monitor;
|
|
write_monitor& default_write_monitor() {
|
|
return default_noop_write_monitor;
|
|
}
|
|
|
|
static noop_read_monitor default_noop_read_monitor;
|
|
read_monitor& default_read_monitor() {
|
|
return default_noop_read_monitor;
|
|
}
|
|
|
|
static no_read_monitoring noop_read_monitor_generator;
|
|
read_monitor_generator& default_read_monitor_generator() {
|
|
return noop_read_monitor_generator;
|
|
}
|
|
|
|
std::unique_ptr<crc32_digest_file_writer> make_calculate_digest_writer() noexcept {
|
|
return std::make_unique<crc32_digest_file_writer>(make_null_data_sink(default_sstable_buffer_size), default_sstable_buffer_size);
|
|
}
|
|
|
|
// Must be called in a seastar thread.
|
|
template <typename T>
|
|
uint32_t serialized_checksum(sstable_version_types v, const T& object) {
|
|
auto writer = make_calculate_digest_writer();
|
|
write(v, *writer, object);
|
|
writer->close();
|
|
return writer->full_checksum();
|
|
}
|
|
|
|
future<file> sstable::new_sstable_component_file(const io_error_handler& error_handler, component_type type, open_flags flags, file_open_options options) const noexcept {
|
|
try {
|
|
auto f = _storage->open_component(*this, type, flags, options, _manager.get_config().enable_data_integrity_check);
|
|
|
|
f = with_file_close_on_failure(std::move(f), [&error_handler] (file f) {
|
|
return make_checked_file(error_handler, std::move(f));
|
|
});
|
|
|
|
return f.handle_exception([this, type, &error_handler] (auto ep) {
|
|
sstlog.error("Could not create SSTable component {}. Found exception: {}", filename(type), ep);
|
|
try {
|
|
error_handler(ep);
|
|
} catch (...) {
|
|
ep = std::current_exception();
|
|
}
|
|
return make_exception_future<file>(ep);
|
|
});
|
|
} catch (...) {
|
|
return current_exception_as_future<file>();
|
|
}
|
|
}
|
|
|
|
future<> sstable::unlink_component(component_type type) noexcept {
|
|
return _storage->unlink_component(*this, type);
|
|
}
|
|
|
|
const std::unordered_map<sstable_version_types, sstring, enum_hash<sstable_version_types>> version_string = {
|
|
{ sstable_version_types::ka , "ka" },
|
|
{ sstable_version_types::la , "la" },
|
|
{ sstable_version_types::mc , "mc" },
|
|
{ sstable_version_types::md , "md" },
|
|
{ sstable_version_types::me , "me" },
|
|
{ sstable_version_types::ms , "ms" },
|
|
};
|
|
|
|
const std::unordered_map<sstable_format_types, sstring, enum_hash<sstable_format_types>> format_string = {
|
|
{ sstable_format_types::big , "big" }
|
|
};
|
|
|
|
// This assumes that the mappings are small enough, and called unfrequent
|
|
// enough. If that changes, it would be advisable to create a full static
|
|
// reverse mapping, even if it is done at runtime.
|
|
template <typename Map, std::equality_comparable_with<typename Map::mapped_type> Value>
|
|
static typename Map::key_type reverse_map(const Value& v, const Map& map) {
|
|
for (auto& [key, value]: map) {
|
|
if (value == v) {
|
|
return key;
|
|
}
|
|
}
|
|
throw std::out_of_range("unable to reverse map");
|
|
}
|
|
|
|
// This should be used every time we use read_exactly directly.
|
|
//
|
|
// read_exactly is a lot more convenient of an interface to use, because we'll
|
|
// be parsing known quantities.
|
|
//
|
|
// However, anything other than the size we have asked for, is certainly a bug,
|
|
// and we need to do something about it.
|
|
static void check_buf_size(temporary_buffer<char>& buf, size_t expected) {
|
|
if (buf.size() < expected) {
|
|
throw bufsize_mismatch_exception(buf.size(), expected);
|
|
}
|
|
}
|
|
|
|
template <typename T>
|
|
requires std::is_integral_v<T>
|
|
future<> parse(const schema&, sstable_version_types v, random_access_reader& in, T& i) {
|
|
return in.read_exactly(sizeof(T)).then([&i] (auto buf) {
|
|
check_buf_size(buf, sizeof(T));
|
|
i = net::ntoh(read_unaligned<T>(buf.get()));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
template <typename T>
|
|
requires std::is_enum_v<T>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, T& i) {
|
|
return in.read_exactly(sizeof(T)).then([&i] (auto buf) {
|
|
check_buf_size(buf, sizeof(T));
|
|
i = static_cast<T>(net::ntoh(read_unaligned<std::underlying_type_t<T>>(buf.get())));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, bool& i) {
|
|
return parse(s, v, in, reinterpret_cast<uint8_t&>(i));
|
|
}
|
|
|
|
future<> parse(const schema&, sstable_version_types, random_access_reader& in, double& d) {
|
|
return in.read_exactly(sizeof(double)).then([&d] (auto buf) {
|
|
check_buf_size(buf, sizeof(double));
|
|
unsigned long nr = read_unaligned<unsigned long>(buf.get());
|
|
d = std::bit_cast<double>(net::ntoh(nr));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
template <typename T>
|
|
future<> parse(const schema&, sstable_version_types, random_access_reader& in, T& len, bytes& s) {
|
|
return in.read_exactly(len).then([&s, len] (auto buf) {
|
|
check_buf_size(buf, len);
|
|
// Likely a different type of char. Most bufs are unsigned, whereas the bytes type is signed.
|
|
s = bytes(reinterpret_cast<const bytes::value_type *>(buf.get()), len);
|
|
});
|
|
}
|
|
|
|
// All composite parsers must come after this
|
|
template<typename First, typename... Rest>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, First& first, Rest&&... rest) {
|
|
auto fut = parse(s, v, in, first);
|
|
(..., (void)(fut = fut.then([&s, v, &in, &rest] () mutable {
|
|
return parse(s, v, in, std::forward<Rest>(rest));
|
|
})));
|
|
return fut;
|
|
}
|
|
|
|
// Intended to be used for a type that describes itself through describe_type().
|
|
template <self_describing T>
|
|
future<>
|
|
parse(const schema& s, sstable_version_types v, random_access_reader& in, T& t) {
|
|
return t.describe_type(v, [v, &s, &in] (auto&&... what) -> future<> {
|
|
return parse(s, v, in, what...);
|
|
});
|
|
}
|
|
|
|
template <class T>
|
|
future<> parse(const schema&, sstable_version_types v, random_access_reader& in, vint<T>& t) {
|
|
return read_vint(in, t.value);
|
|
}
|
|
|
|
future<> parse(const schema&, sstable_version_types, random_access_reader& in, utils::UUID& uuid) {
|
|
return in.read_exactly(uuid.serialized_size()).then([&uuid] (temporary_buffer<char> buf) {
|
|
check_buf_size(buf, utils::UUID::serialized_size());
|
|
|
|
uuid = utils::UUID_gen::get_UUID(const_cast<int8_t*>(reinterpret_cast<const int8_t*>(buf.get())));
|
|
});
|
|
}
|
|
|
|
template <typename Tag>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, utils::tagged_uuid<Tag>& id) {
|
|
// Read directly into the tagged_uuid `id` member
|
|
// This is ugly, but save an allocation or reimplementation
|
|
// of parse(..., utils::UUID&)
|
|
utils::UUID& uuid = *const_cast<utils::UUID*>(&id.uuid());
|
|
return parse(s, v, in, uuid);
|
|
}
|
|
|
|
// For all types that take a size, we provide a template that takes the type
|
|
// alone, and another, separate one, that takes a size parameter as well, of
|
|
// type Size. This is because although most of the time the size and the data
|
|
// are contiguous, it is not always the case. So we want to have the
|
|
// flexibility of parsing them separately.
|
|
template <typename Size>
|
|
future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, disk_string<Size>& s) {
|
|
auto len = std::make_unique<Size>();
|
|
auto f = parse(schema, v, in, *len);
|
|
return f.then([v, &schema, &in, &s, len = std::move(len)] {
|
|
return parse(schema, v, in, *len, s.value);
|
|
});
|
|
}
|
|
|
|
future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, disk_string_vint_size& s) {
|
|
auto len = std::make_unique<uint64_t>();
|
|
auto f = read_vint(in, *len);
|
|
return f.then([v, &schema, &in, &s, len = std::move(len)] {
|
|
return parse(schema, v, in, *len, s.value);
|
|
});
|
|
}
|
|
|
|
template <typename Members>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, disk_array_vint_size<Members>& arr) {
|
|
auto len = std::make_unique<uint64_t>();
|
|
auto f = read_vint(in, *len);
|
|
return f.then([v, &s, &in, &arr, len = std::move(len)] {
|
|
return parse(s, v, in, *len, arr.elements);
|
|
});
|
|
}
|
|
|
|
// We cannot simply read the whole array at once, because we don't know its
|
|
// full size. We know the number of elements, but if we are talking about
|
|
// disk_strings, for instance, we have no idea how much of the stream each
|
|
// element will take.
|
|
//
|
|
// Sometimes we do know the size, like the case of integers. There, all we have
|
|
// to do is to convert each member because they are all stored big endian.
|
|
// We'll offer a specialization for that case below.
|
|
template <typename Size, typename Members>
|
|
future<>
|
|
parse(const schema& s, sstable_version_types v, random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
|
|
for (auto count = len; count; count--) {
|
|
arr.emplace_back();
|
|
co_await parse(s, v, in, arr.back());
|
|
}
|
|
}
|
|
|
|
template <typename Size, std::integral Members>
|
|
future<>
|
|
parse(const schema&, sstable_version_types, random_access_reader& in, Size& len, utils::chunked_vector<Members>& arr) {
|
|
Size now = arr.max_chunk_capacity();
|
|
for (auto count = len; count; count -= now) {
|
|
if (now > count) {
|
|
now = count;
|
|
}
|
|
auto buf = co_await in.read_exactly(now * sizeof(Members));
|
|
check_buf_size(buf, now * sizeof(Members));
|
|
for (size_t i = 0; i < now; ++i) {
|
|
arr.push_back(net::ntoh(read_unaligned<Members>(buf.get() + i * sizeof(Members))));
|
|
}
|
|
}
|
|
}
|
|
|
|
template <typename Contents>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, std::optional<Contents>& opt) {
|
|
bool engaged;
|
|
co_await parse(s, v, in, engaged);
|
|
if (engaged) {
|
|
opt.emplace();
|
|
co_await parse(s, v, in, *opt);
|
|
} else {
|
|
opt.reset();
|
|
}
|
|
}
|
|
|
|
// We resize the array here, before we pass it to the integer / non-integer
|
|
// specializations
|
|
template <typename Size, typename Members>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, disk_array<Size, Members>& arr) {
|
|
Size len;
|
|
co_await parse(s, v, in, len);
|
|
arr.elements.reserve(len);
|
|
co_await parse(s, v, in, len, arr.elements);
|
|
}
|
|
|
|
template <typename Size, typename Key, typename Value>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, Size& len, std::unordered_map<Key, Value>& map) {
|
|
for (auto count = len; count; count--) {
|
|
Key key;
|
|
Value value;
|
|
co_await parse(s, v, in, key, value);
|
|
map.emplace(key, value);
|
|
}
|
|
}
|
|
|
|
template <typename First, typename Second>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, std::pair<First, Second>& p) {
|
|
return parse(s, v, in, p.first, p.second);
|
|
}
|
|
|
|
template <typename Size, typename Key, typename Value>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, disk_hash<Size, Key, Value>& h) {
|
|
Size w;
|
|
co_await parse(s, v, in, w);
|
|
co_await parse(s, v, in, w, h.map);
|
|
}
|
|
|
|
// Abstract parser/sizer/writer for a single tagged member of a tagged union
|
|
template <typename DiskSetOfTaggedUnion>
|
|
struct single_tagged_union_member_serdes {
|
|
using value_type = typename DiskSetOfTaggedUnion::value_type;
|
|
virtual ~single_tagged_union_member_serdes() {}
|
|
virtual future<> do_parse(const schema& s, sstable_version_types version, random_access_reader& in, value_type& v) const = 0;
|
|
virtual uint32_t do_size(sstable_version_types version, const value_type& v) const = 0;
|
|
virtual void do_write(sstable_version_types version, file_writer& out, const value_type& v) const = 0;
|
|
};
|
|
|
|
// Concrete parser for a single member of a tagged union; parses type "Member"
|
|
template <typename DiskSetOfTaggedUnion, typename Member>
|
|
struct single_tagged_union_member_serdes_for final : single_tagged_union_member_serdes<DiskSetOfTaggedUnion> {
|
|
using base = single_tagged_union_member_serdes<DiskSetOfTaggedUnion>;
|
|
using value_type = typename base::value_type;
|
|
virtual future<> do_parse(const schema& s, sstable_version_types version, random_access_reader& in, value_type& v) const override {
|
|
v = Member();
|
|
return parse(s, version, in, std::get<Member>(v).value);
|
|
}
|
|
virtual uint32_t do_size(sstable_version_types version, const value_type& v) const override {
|
|
return serialized_size(version, std::get<Member>(v).value);
|
|
}
|
|
virtual void do_write(sstable_version_types version, file_writer& out, const value_type& v) const override {
|
|
write(version, out, std::get<Member>(v).value);
|
|
}
|
|
};
|
|
|
|
template <typename TagType, typename... Members>
|
|
struct disk_set_of_tagged_union<TagType, Members...>::serdes {
|
|
using disk_set = disk_set_of_tagged_union<TagType, Members...>;
|
|
// We can't use unique_ptr, because we initialize from an std::intializer_list, which is not move compatible.
|
|
using serdes_map_type = std::unordered_map<TagType, shared_ptr<single_tagged_union_member_serdes<disk_set>>, typename disk_set::hash_type>;
|
|
using value_type = typename disk_set::value_type;
|
|
serdes_map_type map = {
|
|
{Members::tag(), make_shared<single_tagged_union_member_serdes_for<disk_set, Members>>()}...
|
|
};
|
|
future<> lookup_and_parse(const schema& schema, sstable_version_types v, random_access_reader& in, TagType tag, uint32_t& size, disk_set& s, value_type& value) const {
|
|
auto i = map.find(tag);
|
|
if (i == map.end()) {
|
|
return in.read_exactly(size).discard_result();
|
|
} else {
|
|
return i->second->do_parse(schema, v, in, value).then([tag, &s, &value] () mutable {
|
|
s.data.emplace(tag, std::move(value));
|
|
});
|
|
}
|
|
}
|
|
uint32_t lookup_and_size(sstable_version_types v, TagType tag, const value_type& value) const {
|
|
return map.at(tag)->do_size(v, value);
|
|
}
|
|
void lookup_and_write(sstable_version_types v, file_writer& out, TagType tag, const value_type& value) const {
|
|
return map.at(tag)->do_write(v, out, value);
|
|
}
|
|
};
|
|
|
|
template <typename TagType, typename... Members>
|
|
typename disk_set_of_tagged_union<TagType, Members...>::serdes disk_set_of_tagged_union<TagType, Members...>::s_serdes;
|
|
|
|
template <typename TagType, typename... Members>
|
|
future<>
|
|
parse(const schema& schema, sstable_version_types v, random_access_reader& in, disk_set_of_tagged_union<TagType, Members...>& s) {
|
|
using disk_set = disk_set_of_tagged_union<TagType, Members...>;
|
|
using key_type = typename disk_set::key_type;
|
|
using value_type = typename disk_set::value_type;
|
|
|
|
key_type nr_elements;
|
|
co_await parse(schema, v, in, nr_elements);
|
|
for ([[maybe_unused]] auto _ : std::views::iota(key_type(0), nr_elements)) {
|
|
key_type new_key;
|
|
unsigned new_size;
|
|
co_await parse(schema, v, in, new_key);
|
|
co_await parse(schema, v, in, new_size);
|
|
value_type new_value;
|
|
co_await disk_set::s_serdes.lookup_and_parse(schema, v, in, TagType(new_key), new_size, s, new_value);
|
|
}
|
|
}
|
|
|
|
template <typename TagType, typename... Members>
|
|
void write(sstable_version_types v, file_writer& out, const disk_set_of_tagged_union<TagType, Members...>& s) {
|
|
using disk_set = disk_set_of_tagged_union<TagType, Members...>;
|
|
write(v, out, uint32_t(s.data.size()));
|
|
for (auto&& kv : s.data) {
|
|
auto&& tag = kv.first;
|
|
auto&& value = kv.second;
|
|
write(v, out, tag);
|
|
write(v, out, uint32_t(disk_set::s_serdes.lookup_and_size(v, tag, value)));
|
|
disk_set::s_serdes.lookup_and_write(v, out, tag, value);
|
|
}
|
|
}
|
|
|
|
future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, summary& s) {
|
|
using pos_type = typename decltype(summary::positions)::value_type;
|
|
|
|
co_await parse(schema, v, in, s.header.min_index_interval,
|
|
s.header.size,
|
|
s.header.memory_size,
|
|
s.header.sampling_level,
|
|
s.header.size_at_full_sampling);
|
|
// Positions are encoded in little-endian.
|
|
s.positions.reserve(s.header.size + 1);
|
|
while (s.positions.size() != s.header.size) {
|
|
// random_access_reader::read_exactly internally maintains
|
|
// a 128K buffer, so it is okay to read one position at a time.
|
|
auto buf = co_await in.read_exactly(sizeof(pos_type));
|
|
check_buf_size(buf, sizeof(pos_type));
|
|
s.positions.push_back(seastar::read_le<pos_type>(buf.get()));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
// Since the keys in the index are not sized, we need to calculate
|
|
// the start position of the index i+1 to determine the boundaries
|
|
// of index i. The "memory_size" field in the header determines the
|
|
// total memory used by the map, so if we push it to the vector, we
|
|
// can guarantee that no conditionals are used, and we can always
|
|
// query the position of the "next" index.
|
|
s.positions.push_back(s.header.memory_size);
|
|
|
|
co_await in.seek(sizeof(summary::header) + s.header.memory_size);
|
|
co_await parse(schema, v, in, s.first_key, s.last_key);
|
|
co_await in.seek(s.positions[0] + sizeof(summary::header));
|
|
|
|
s.entries.reserve(s.header.size);
|
|
|
|
int idx = 0;
|
|
while (s.entries.size() != s.header.size) {
|
|
auto pos = s.positions[idx++];
|
|
auto next = s.positions[idx];
|
|
|
|
auto entrysize = next - pos;
|
|
auto buf = co_await in.read_exactly(entrysize);
|
|
check_buf_size(buf, entrysize);
|
|
|
|
auto keysize = entrysize - 8;
|
|
auto key_data = s.add_summary_data(bytes_view(reinterpret_cast<const int8_t*>(buf.get()), keysize));
|
|
buf.trim_front(keysize);
|
|
|
|
// position is little-endian encoded
|
|
auto position = seastar::read_le<uint64_t>(buf.get());
|
|
auto token = schema.get_partitioner().get_token(key_view(key_data));
|
|
s.entries.push_back(summary_entry{ token, key_data, position });
|
|
}
|
|
// Delete last element which isn't part of the on-disk format.
|
|
s.positions.pop_back();
|
|
}
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, scylla_metadata& metadata) {
|
|
co_await parse(s, v, in, metadata.data);
|
|
auto it = metadata.data.get<scylla_metadata_type::ComponentsDigests, scylla_metadata::components_digests>();
|
|
// if metadata contains component digests, also parse its own digest
|
|
if (it) {
|
|
metadata.digest.emplace();
|
|
co_await parse(s, v, in, metadata.digest.value());
|
|
}
|
|
}
|
|
|
|
inline void write(sstable_version_types v, file_writer& out, const summary_entry& entry) {
|
|
// FIXME: summary entry is supposedly written in memory order, but that
|
|
// would prevent portability of summary file between machines of different
|
|
// endianness. We can treat it as little endian to preserve portability.
|
|
write(v, out, entry.key);
|
|
auto p = seastar::cpu_to_le<uint64_t>(entry.position);
|
|
out.write(reinterpret_cast<const char*>(&p), sizeof(p));
|
|
}
|
|
|
|
inline void write(sstable_version_types v, file_writer& out, const summary& s) {
|
|
// NOTE: positions and entries must be stored in LITTLE-ENDIAN.
|
|
write(v, out, s.header.min_index_interval,
|
|
s.header.size,
|
|
s.header.memory_size,
|
|
s.header.sampling_level,
|
|
s.header.size_at_full_sampling);
|
|
for (auto&& e : s.positions) {
|
|
auto p = seastar::cpu_to_le(e);
|
|
out.write(reinterpret_cast<const char*>(&p), sizeof(p));
|
|
}
|
|
write(v, out, s.entries);
|
|
write(v, out, s.first_key, s.last_key);
|
|
}
|
|
|
|
inline void write(sstable_version_types v, file_writer& out, const scylla_metadata& s) {
|
|
write(v, out, s.data);
|
|
write(v, out, s.digest.value());
|
|
}
|
|
|
|
future<summary_entry&> sstable::read_summary_entry(size_t i) {
|
|
// The last one is the boundary marker
|
|
if (i >= (_components->summary.entries.size())) {
|
|
return make_exception_future<summary_entry&>(std::out_of_range(format("Invalid Summary index: {:d}", i)));
|
|
}
|
|
|
|
return make_ready_future<summary_entry&>(_components->summary.entries[i]);
|
|
}
|
|
|
|
template <typename Child>
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, std::unique_ptr<metadata>& p) {
|
|
p.reset(new Child);
|
|
return parse(s, v, in, *static_cast<Child *>(p.get()));
|
|
}
|
|
|
|
template <typename Child>
|
|
inline void write(sstable_version_types v, file_writer& out, const std::unique_ptr<metadata>& p) {
|
|
write(v, out, *static_cast<Child *>(p.get()));
|
|
}
|
|
|
|
future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, statistics& s) {
|
|
try {
|
|
co_await parse(schema, v, in, s.offsets);
|
|
// Old versions of Scylla do not respect the order.
|
|
// See https://github.com/scylladb/scylla/issues/3937
|
|
std::ranges::sort(s.offsets.elements, std::ranges::less(), std::mem_fn(&std::pair<metadata_type, unsigned int>::first));
|
|
for (auto val : s.offsets.elements) {
|
|
auto type = val.first;
|
|
co_await in.seek(val.second);
|
|
switch (type) {
|
|
case metadata_type::Validation:
|
|
co_await parse<validation_metadata>(schema, v, in, s.contents[type]);
|
|
break;
|
|
case metadata_type::Compaction:
|
|
co_await parse<compaction_metadata>(schema, v, in, s.contents[type]);
|
|
break;
|
|
case metadata_type::Stats:
|
|
co_await parse<stats_metadata>(schema, v, in, s.contents[type]);
|
|
break;
|
|
case metadata_type::Serialization:
|
|
if (v < sstable_version_types::mc) {
|
|
throw malformed_sstable_exception(
|
|
"Statistics is malformed: SSTable is in 2.x format but contains serialization header.");
|
|
} else {
|
|
co_await parse<serialization_header>(schema, v, in, s.contents[type]);
|
|
}
|
|
break;
|
|
default:
|
|
throw malformed_sstable_exception(fmt::format("Invalid metadata type at Statistics file: {} ", int(type)));
|
|
}
|
|
}
|
|
} catch (const malformed_sstable_exception&) {
|
|
throw;
|
|
} catch (...) {
|
|
throw malformed_sstable_exception(fmt::format("Statistics file is malformed: {}", std::current_exception()));
|
|
}
|
|
}
|
|
|
|
inline void write(sstable_version_types v, file_writer& out, const statistics& s) {
|
|
write(v, out, s.offsets);
|
|
for (auto&& e : s.offsets.elements) {
|
|
s.contents.at(e.first)->write(v, out);
|
|
}
|
|
}
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, utils::estimated_histogram& eh) {
|
|
auto len = std::make_unique<uint32_t>();
|
|
|
|
co_await parse(s, v, in, *len);
|
|
uint32_t length = *len;
|
|
|
|
if (length == 0) {
|
|
co_await coroutine::return_exception(malformed_sstable_exception("Estimated histogram with zero size found. Can't continue!"));
|
|
}
|
|
|
|
// Arrays are potentially pre-initialized by the estimated_histogram constructor.
|
|
eh.bucket_offsets.clear();
|
|
eh.buckets.clear();
|
|
|
|
eh.bucket_offsets.reserve(length - 1);
|
|
eh.buckets.reserve(length);
|
|
|
|
auto type_size = sizeof(uint64_t) * 2;
|
|
auto buf = co_await in.read_exactly(length * type_size);
|
|
check_buf_size(buf, length * type_size);
|
|
|
|
size_t j = 0;
|
|
while (eh.buckets.size() != length) {
|
|
auto offset = net::ntoh(read_unaligned<uint64_t>(buf.get() + (j++) * sizeof(uint64_t)));
|
|
auto bucket = net::ntoh(read_unaligned<uint64_t>(buf.get() + (j++) * sizeof(uint64_t)));
|
|
if (!eh.buckets.empty()) {
|
|
eh.bucket_offsets.push_back(offset);
|
|
}
|
|
eh.buckets.push_back(bucket);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
}
|
|
|
|
void write(sstable_version_types v, file_writer& out, const utils::estimated_histogram& eh) {
|
|
uint32_t len = 0;
|
|
check_truncate_and_assign(len, eh.buckets.size());
|
|
|
|
write(v, out, len);
|
|
struct element {
|
|
int64_t offsets;
|
|
int64_t buckets;
|
|
};
|
|
std::vector<element> elements;
|
|
elements.reserve(eh.buckets.size());
|
|
|
|
const int64_t* offsets_nr = eh.bucket_offsets.data();
|
|
const int64_t* buckets_nr = eh.buckets.data();
|
|
for (size_t i = 0; i < eh.buckets.size(); i++) {
|
|
auto offsets = net::hton(offsets_nr[i == 0 ? 0 : i - 1]);
|
|
auto buckets = net::hton(buckets_nr[i]);
|
|
elements.emplace_back(element{offsets, buckets});
|
|
if (need_preempt()) {
|
|
seastar::thread::yield();
|
|
}
|
|
}
|
|
|
|
auto p = reinterpret_cast<const char*>(elements.data());
|
|
auto bytes = elements.size() * sizeof(element);
|
|
out.write(p, bytes);
|
|
}
|
|
|
|
struct streaming_histogram_element {
|
|
using key_type = typename decltype(utils::streaming_histogram::bin)::key_type;
|
|
using value_type = typename decltype(utils::streaming_histogram::bin)::mapped_type;
|
|
key_type key;
|
|
value_type value;
|
|
|
|
template <typename Describer>
|
|
auto describe_type(sstable_version_types v, Describer f) { return f(key, value); }
|
|
};
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, utils::streaming_histogram& sh) {
|
|
auto a = disk_array<uint32_t, streaming_histogram_element>();
|
|
|
|
co_await parse(s, v, in, sh.max_bin_size, a);
|
|
auto length = a.elements.size();
|
|
if (length > sh.max_bin_size) {
|
|
co_await coroutine::return_exception(malformed_sstable_exception("Streaming histogram with more entries than allowed. Can't continue!"));
|
|
}
|
|
|
|
// Find bad histogram which had incorrect elements merged due to use of
|
|
// unordered map. The keys will be unordered. Histogram which size is
|
|
// less than max allowed will be correct because no entries needed to be
|
|
// merged, so we can avoid discarding those.
|
|
// look for commit with title 'streaming_histogram: fix update' for more details.
|
|
auto possibly_broken_histogram = length == sh.max_bin_size;
|
|
auto less_comp = [] (auto& x, auto& y) { return x.key < y.key; };
|
|
if (possibly_broken_histogram && !std::ranges::is_sorted(a.elements, less_comp)) {
|
|
co_return;
|
|
}
|
|
|
|
auto transform = [] (auto element) -> std::pair<streaming_histogram_element::key_type, streaming_histogram_element::value_type> {
|
|
return { element.key, element.value };
|
|
};
|
|
std::ranges::copy(a.elements | std::views::transform(transform), std::inserter(sh.bin, sh.bin.end()));
|
|
}
|
|
|
|
void write(sstable_version_types v, file_writer& out, const utils::streaming_histogram& sh) {
|
|
uint32_t max_bin_size;
|
|
check_truncate_and_assign(max_bin_size, sh.max_bin_size);
|
|
|
|
disk_array<uint32_t, streaming_histogram_element> a;
|
|
a.elements = sh.bin
|
|
| std::views::transform([&] (auto& kv) { return streaming_histogram_element{kv.first, kv.second}; })
|
|
| std::ranges::to<utils::chunked_vector<streaming_histogram_element>>();
|
|
|
|
write(v, out, max_bin_size, a);
|
|
}
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, commitlog_interval& ci) {
|
|
co_await parse(s, v, in, ci.start);
|
|
co_await parse(s, v, in, ci.end);
|
|
}
|
|
|
|
void write(sstable_version_types v, file_writer& out, const commitlog_interval& ci) {
|
|
write(v, out, ci.start);
|
|
write(v, out, ci.end);
|
|
}
|
|
|
|
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, compression& c) {
|
|
uint64_t data_len = 0;
|
|
uint32_t chunk_len = 0;
|
|
|
|
co_await parse(s, v, in, c.name, c.options, chunk_len, data_len);
|
|
if (chunk_len == 0) {
|
|
throw malformed_sstable_exception("CompressionInfo is malformed: zero chunk_len");
|
|
}
|
|
c.set_uncompressed_chunk_length(chunk_len);
|
|
c.set_uncompressed_file_length(data_len);
|
|
|
|
uint32_t len = 0;
|
|
compression::segmented_offsets::writer offsets = c.offsets.get_writer();
|
|
co_await parse(s, v, in, len);
|
|
auto eoarr = [&c, &len] { return c.offsets.size() == len; };
|
|
|
|
while (!eoarr()) {
|
|
auto now = std::min(len - c.offsets.size(), 100000 / sizeof(uint64_t));
|
|
auto buf = co_await in.read_exactly(now * sizeof(uint64_t));
|
|
for (size_t i = 0; i < now; ++i) {
|
|
uint64_t value = read_unaligned<uint64_t>(buf.get() + i * sizeof(uint64_t));
|
|
offsets.push_back(net::ntoh(value));
|
|
}
|
|
}
|
|
}
|
|
|
|
void write(sstable_version_types v, file_writer& out, const compression& c) {
|
|
write(v, out, c.name, c.options, c.uncompressed_chunk_length(), c.uncompressed_file_length());
|
|
|
|
write(v, out, static_cast<uint32_t>(c.offsets.size()));
|
|
|
|
std::vector<uint64_t> tmp;
|
|
const size_t per_loop = 100000 / sizeof(uint64_t);
|
|
tmp.resize(per_loop);
|
|
size_t idx = 0;
|
|
while (idx != c.offsets.size()) {
|
|
auto now = std::min(c.offsets.size() - idx, per_loop);
|
|
// copy offsets into tmp converting each entry into big-endian representation.
|
|
auto nr = c.offsets.begin() + idx;
|
|
for (size_t i = 0; i < now; i++) {
|
|
tmp[i] = net::hton(nr[i]);
|
|
}
|
|
auto p = reinterpret_cast<const char*>(tmp.data());
|
|
auto bytes = now * sizeof(uint64_t);
|
|
out.write(p, bytes);
|
|
idx += now;
|
|
}
|
|
}
|
|
|
|
static inline sstring parent_path(const sstring& fname) {
|
|
return fs::canonical(fs::path(fname)).parent_path().string();
|
|
}
|
|
|
|
future<std::pair<std::vector<sstring>, uint32_t>> sstable::read_and_parse_toc(file f) {
|
|
return with_closeable(make_file_input_stream(f), [] (input_stream<char>& in) -> future<std::pair<std::vector<sstring>, uint32_t>> {
|
|
std::vector<sstring> components;
|
|
auto all = co_await util::read_entire_stream_contiguous(in);
|
|
auto digest = crc32_utils::checksum(all.begin(), all.size());
|
|
boost::split(components, all, boost::is_any_of("\n"));
|
|
co_return std::make_pair(std::move(components), digest);
|
|
});
|
|
}
|
|
|
|
// This is small enough, and well-defined. Easier to just read it all
|
|
// at once
|
|
future<> sstable::read_toc(sstable_open_config cfg) noexcept {
|
|
if (_recognized_components.size()) {
|
|
co_return;
|
|
}
|
|
|
|
try {
|
|
auto toc_type = cfg.unsealed_sstable ? component_type::TemporaryTOC : component_type::TOC;
|
|
co_await do_read_simple(toc_type, [&] (version_types v, file f) -> future<> {
|
|
auto [comps, digest] = co_await read_and_parse_toc(f);
|
|
_toc_digest = digest;
|
|
for (auto& c: comps) {
|
|
// accept trailing newlines
|
|
if (c == "") {
|
|
continue;
|
|
}
|
|
try {
|
|
_recognized_components.insert(reverse_map(c, sstable_version_constants::get_component_map(_version)));
|
|
} catch (std::out_of_range& oor) {
|
|
_unrecognized_components.push_back(c);
|
|
sstlog.info("Unrecognized TOC component was found: {} in sstable {}", c, toc_filename());
|
|
}
|
|
}
|
|
if (!_recognized_components.size()) {
|
|
throw malformed_sstable_exception("Empty TOC", toc_filename());
|
|
}
|
|
});
|
|
} catch (std::system_error& e) {
|
|
if (e.code() == std::error_code(ENOENT, std::system_category())) {
|
|
throw malformed_sstable_exception(fmt::format("{}: file not found", toc_filename()));
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
void sstable::generate_toc() {
|
|
// Creating table of components.
|
|
_recognized_components.insert(component_type::TOC);
|
|
_recognized_components.insert(component_type::Statistics);
|
|
_recognized_components.insert(component_type::Digest);
|
|
if (has_summary_and_index(_version)) {
|
|
_recognized_components.insert(component_type::Index);
|
|
_recognized_components.insert(component_type::Summary);
|
|
} else {
|
|
_recognized_components.insert(component_type::Partitions);
|
|
_recognized_components.insert(component_type::Rows);
|
|
}
|
|
_recognized_components.insert(component_type::Data);
|
|
if (_schema->bloom_filter_fp_chance() != 1.0) {
|
|
_recognized_components.insert(component_type::Filter);
|
|
}
|
|
if (!_schema->get_compressor_params().compression_enabled()) {
|
|
_recognized_components.insert(component_type::CRC);
|
|
} else {
|
|
_recognized_components.insert(component_type::CompressionInfo);
|
|
}
|
|
_recognized_components.insert(component_type::Scylla);
|
|
}
|
|
|
|
future<std::unordered_map<component_type, file>> sstable::readable_file_for_all_components() const {
|
|
std::unordered_map<component_type, file> files;
|
|
for (auto c : _recognized_components) {
|
|
files.emplace(c, co_await open_file(c, open_flags::ro));
|
|
}
|
|
co_return std::move(files);
|
|
}
|
|
|
|
future<entry_descriptor> sstable::clone(generation_type new_generation, bool leave_unsealed) const {
|
|
co_await _storage->clone(*this, new_generation, leave_unsealed);
|
|
co_return entry_descriptor(new_generation, _version, _format, component_type::TOC, _state);
|
|
}
|
|
|
|
file_writer::~file_writer() {
|
|
if (_closed) {
|
|
return;
|
|
}
|
|
try {
|
|
// close() should be called by the owner of the file_writer.
|
|
// However it may not be called on exception handling paths
|
|
// so auto-close the output_stream so it won't be destructed while open.
|
|
_out.close().get();
|
|
} catch (...) {
|
|
sstlog.warn("Error while auto-closing {}: {}. Ignored.", _component, std::current_exception());
|
|
}
|
|
}
|
|
|
|
void file_writer::close() {
|
|
// Writing into sstable component output stream should be done with care.
|
|
// In particular -- flushing can happen only once right before closing
|
|
// the stream. Flushing the stream in between several writes is not going
|
|
// to work, because file stream would step on unaligned IO and S3 upload
|
|
// stream would send completion message to the server and would lose any
|
|
// subsequent write.
|
|
parse_assert(!_closed, _component, "file_writer already closed");
|
|
std::exception_ptr ex;
|
|
try {
|
|
_out.flush().get();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
try {
|
|
_closed = true;
|
|
_out.close().get();
|
|
} catch (...) {
|
|
auto e = std::current_exception();
|
|
sstlog.error("Error while closing {}: {}", _component, e);
|
|
if (!ex) {
|
|
ex = std::move(e);
|
|
}
|
|
}
|
|
if (ex) {
|
|
std::rethrow_exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<file_writer> sstable::make_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
|
|
// Note: file_writer::make closes the file if file_writer creation fails
|
|
// so we don't need to use with_file_close_on_failure here.
|
|
return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([comp = component_name(*this, c)] (data_sink sink) mutable {
|
|
return file_writer(output_stream<char>(std::move(sink)), std::move(comp));
|
|
});
|
|
}
|
|
|
|
future<std::unique_ptr<crc32_digest_file_writer>> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
|
|
auto comp = component_name(*this, c);
|
|
auto sink = co_await _storage->make_component_sink(*this, c, oflags, std::move(options));
|
|
co_return std::make_unique<crc32_digest_file_writer>(std::move(sink), sstable_buffer_size, std::move(comp));
|
|
}
|
|
|
|
void sstable::open_sstable(const sstring& origin) {
|
|
_origin = origin;
|
|
generate_toc();
|
|
_storage->open(*this);
|
|
}
|
|
|
|
void sstable::write_toc(std::unique_ptr<crc32_digest_file_writer> w) {
|
|
sstlog.debug("Writing TOC file {} ", toc_filename());
|
|
|
|
do_write_simple(*w, [&] (version_types v, file_writer& w) {
|
|
for (auto&& key : _recognized_components) {
|
|
// new line character is appended to the end of each component name.
|
|
auto value = sstable_version_constants::get_component_map(v).at(key) + "\n";
|
|
bytes b = bytes(reinterpret_cast<const bytes::value_type *>(value.c_str()), value.size());
|
|
write(v, w, b);
|
|
}
|
|
});
|
|
|
|
_components_digests.map[component_type::TOC] = w->full_checksum();
|
|
}
|
|
|
|
void sstable::write_crc(const checksum& c) {
|
|
unsigned buffer_size = 4096;
|
|
do_write_simple(component_type::CRC, [&] (version_types v, file_writer& w) {
|
|
write(v, w, c);
|
|
}, buffer_size);
|
|
}
|
|
|
|
// Digest file stores the full checksum of data file converted into a string.
|
|
void sstable::write_digest(uint32_t full_checksum) {
|
|
unsigned buffer_size = 4096;
|
|
do_write_simple(component_type::Digest, [&] (version_types v, file_writer& w) {
|
|
auto digest = to_sstring<bytes>(full_checksum);
|
|
write(v, w, digest);
|
|
}, buffer_size);
|
|
_components_digests.map[component_type::Data] = full_checksum;
|
|
}
|
|
|
|
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
|
|
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_original_index_cache;
|
|
|
|
future<> sstable::do_read_simple(component_type type,
|
|
noncopyable_function<future<> (version_types, file&&, uint64_t sz)> read_component) {
|
|
auto component_name = filename(type);
|
|
sstlog.debug("Reading {} file {}", sstable_version_constants::get_component_map(_version).at(type), component_name);
|
|
try {
|
|
file fi = co_await new_sstable_component_file(_read_error_handler, type, open_flags::ro);
|
|
uint64_t size = co_await fi.size();
|
|
|
|
co_await read_component(_version, std::move(fi), size);
|
|
_metadata_size_on_disk += size;
|
|
} catch (std::system_error& e) {
|
|
if (e.code() == std::error_code(ENOENT, std::system_category())) {
|
|
throw malformed_sstable_exception(fmt::format("{}: file not found", component_name));
|
|
}
|
|
throw;
|
|
} catch (malformed_sstable_exception& e) {
|
|
throw malformed_sstable_exception(e.what(), component_name);
|
|
}
|
|
}
|
|
|
|
future<> sstable::do_read_simple(component_type type,
|
|
noncopyable_function<future<> (version_types, file)> read_component) {
|
|
return do_read_simple(type, [read_component = std::move(read_component)] (version_types v, file&& f, uint64_t) -> future<> {
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await read_component(v, f);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await f.close();
|
|
|
|
maybe_rethrow_exception(std::move(ex));
|
|
});
|
|
}
|
|
|
|
template <component_type Type, typename T>
|
|
future<> sstable::read_simple(T& component) {
|
|
return do_read_simple(Type, [&] (version_types v, file&& f, uint64_t size) -> future<> {
|
|
std::exception_ptr ex;
|
|
auto r = file_random_access_reader(std::move(f), size, sstable_buffer_size);
|
|
try {
|
|
co_await parse(*_schema, v, r, component);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await r.close();
|
|
|
|
maybe_rethrow_exception(std::move(ex));
|
|
});
|
|
}
|
|
|
|
template <component_type Type, typename T>
|
|
future<std::optional<uint32_t>> sstable::read_simple_with_digest(T& component) {
|
|
std::optional<uint32_t> digest;
|
|
co_await do_read_simple(Type, [&] (version_types v, file&& f, uint64_t size) -> future<> {
|
|
std::exception_ptr ex;
|
|
auto r = digest_file_random_access_reader(f, size, sstable_buffer_size);
|
|
try {
|
|
co_await parse(*_schema, v, r, component);
|
|
digest = r.digest();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await r.close();
|
|
|
|
maybe_rethrow_exception(std::move(ex));
|
|
});
|
|
co_return digest;
|
|
}
|
|
|
|
template <component_type Type, typename T>
|
|
future<> sstable::read_simple_and_verify_digest(T& comp) {
|
|
auto component_digest = get_component_digest(Type);
|
|
if (component_digest) {
|
|
auto computed_digest_opt = co_await read_simple_with_digest<Type>(comp);
|
|
uint32_t computed_digest;
|
|
if (computed_digest_opt) {
|
|
computed_digest = *computed_digest_opt;
|
|
} else {
|
|
computed_digest = co_await compute_component_file_digest(Type);
|
|
}
|
|
validate_component_digest(Type, computed_digest);
|
|
} else {
|
|
co_await read_simple<Type>(comp);
|
|
}
|
|
}
|
|
|
|
void sstable::do_write_simple(file_writer& writer,
|
|
noncopyable_function<void (version_types, file_writer&)> write_component) {
|
|
write_component(_version, writer);
|
|
_metadata_size_on_disk += writer.offset();
|
|
writer.close();
|
|
}
|
|
|
|
void sstable::do_write_simple(component_type type,
|
|
noncopyable_function<void (version_types version, file_writer& writer)> write_component, unsigned buffer_size) {
|
|
auto file_path = filename(type);
|
|
sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path);
|
|
|
|
file_output_stream_options options;
|
|
options.buffer_size = buffer_size;
|
|
auto w = make_component_file_writer(type, std::move(options)).get();
|
|
do_write_simple(w, std::move(write_component));
|
|
}
|
|
|
|
template <component_type Type, typename T>
|
|
void sstable::write_simple(const T& component) {
|
|
do_write_simple(Type, [&component] (version_types v, file_writer& w) {
|
|
write(v, w, component);
|
|
}, sstable_buffer_size);
|
|
}
|
|
|
|
uint32_t sstable::do_write_simple_with_digest(component_type type,
|
|
noncopyable_function<void (version_types version, file_writer& writer)> write_component, unsigned buffer_size) {
|
|
auto file_path = filename(type);
|
|
sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path);
|
|
|
|
file_output_stream_options options;
|
|
options.buffer_size = buffer_size;
|
|
auto w = make_digests_component_file_writer(type, std::move(options)).get();
|
|
do_write_simple(*w, std::move(write_component));
|
|
return w->full_checksum();
|
|
}
|
|
|
|
template <component_type Type, typename T>
|
|
uint32_t sstable::write_simple_with_digest(const T& component) {
|
|
return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) {
|
|
write(v, w, component);
|
|
}, sstable_buffer_size);
|
|
}
|
|
|
|
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f);
|
|
template void sstable::write_simple<component_type::Filter>(const sstables::filter& f);
|
|
|
|
template void sstable::write_simple<component_type::Summary>(const sstables::summary_ka&);
|
|
template uint32_t sstable::write_simple_with_digest<component_type::Summary>(const sstables::summary&);
|
|
|
|
future<> sstable::read_compression() {
|
|
// FIXME: If there is no compression, we should expect a CRC file to be present.
|
|
if (!has_component(component_type::CompressionInfo)) {
|
|
co_return;
|
|
}
|
|
|
|
co_await read_simple_and_verify_digest<component_type::CompressionInfo>(_components->compression);
|
|
auto compressor = co_await manager().get_compressor_factory().make_compressor_for_reading(_components->compression);
|
|
_components->compression.set_compressor(std::move(compressor));
|
|
_components->compression.discard_hidden_options();
|
|
}
|
|
|
|
void sstable::write_compression() {
|
|
if (!has_component(component_type::CompressionInfo)) {
|
|
return;
|
|
}
|
|
|
|
auto digest = write_simple_with_digest<component_type::CompressionInfo>(_components->compression);
|
|
_components_digests.map[component_type::CompressionInfo] = digest;
|
|
}
|
|
|
|
void sstable::validate_partitioner() {
|
|
auto entry = _components->statistics.contents.find(metadata_type::Validation);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
throw std::runtime_error("Validation metadata not available");
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Validation is malformed");
|
|
}
|
|
|
|
validation_metadata& v = *static_cast<validation_metadata *>(p.get());
|
|
if (v.partitioner.value != to_bytes(_schema->get_partitioner().name())) {
|
|
throw std::runtime_error(
|
|
fmt::format(FMT_STRING("SSTable {} uses {} partitioner which is different than {} partitioner used by the database"),
|
|
get_filename(),
|
|
sstring(reinterpret_cast<char*>(v.partitioner.value.data()), v.partitioner.value.size()),
|
|
_schema->get_partitioner().name()));
|
|
}
|
|
|
|
}
|
|
|
|
future<uint32_t> sstable::compute_component_file_digest(component_type type) const {
|
|
auto f = co_await new_sstable_component_file(_read_error_handler, type, open_flags::ro);
|
|
auto size = co_await f.size();
|
|
if (type == component_type::Scylla && _components->scylla_metadata->digest) {
|
|
// For Scylla metadata, the digest is stored at the end of the file, so we need to exclude it from the checksum calculation.
|
|
size -= sizeof(uint32_t);
|
|
}
|
|
co_return co_await compute_component_file_digest(std::move(f), size);
|
|
}
|
|
|
|
future<uint32_t> sstable::compute_component_file_digest(file f, size_t size) const {
|
|
return with_closeable(make_file_input_stream(std::move(f), 0, size, {.buffer_size = sstable_buffer_size}), [] (input_stream<char>& in) -> future<uint32_t> {
|
|
uint32_t digest = crc32_utils::init_checksum();
|
|
while (auto buf = co_await in.read()) {
|
|
digest = crc32_utils::checksum(digest, buf.get(), buf.size());
|
|
}
|
|
co_return digest;
|
|
});
|
|
}
|
|
|
|
void sstable::validate_component_digest(component_type type, uint32_t computed_digest) const {
|
|
auto expected = get_component_digest(type);
|
|
if (expected && *expected != computed_digest) {
|
|
auto msg = fmt::format("{} digest mismatch in {}: expected {}, computed {}",
|
|
type, get_filename(), *expected, computed_digest);
|
|
if (_ignore_component_digest_mismatch) {
|
|
sstlog.warn("{}", msg);
|
|
} else {
|
|
throw malformed_sstable_exception(msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> sstable::validate_index_digest() const {
|
|
auto validate_component = [this] (component_type type, const file& f, size_t size) -> future<> {
|
|
auto expected_digest = get_component_digest(type);
|
|
if (!expected_digest) {
|
|
co_return;
|
|
}
|
|
auto computed_digest = co_await compute_component_file_digest(f, size);
|
|
if (*expected_digest != computed_digest) {
|
|
throw malformed_sstable_exception(
|
|
fmt::format("{} digest mismatch in {}: expected {}, computed {}",
|
|
type, get_filename(), *expected_digest, computed_digest));
|
|
}
|
|
};
|
|
|
|
if (_index_file) {
|
|
co_await validate_component(component_type::Index, _index_file, _index_file_size);
|
|
} else {
|
|
co_await validate_component(component_type::Partitions, _partitions_file, _partitions_file_size);
|
|
co_await validate_component(component_type::Rows, _rows_file, _rows_file_size);
|
|
}
|
|
}
|
|
|
|
void sstable::validate_min_max_metadata() {
|
|
auto entry = _components->statistics.contents.find(metadata_type::Stats);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
throw std::runtime_error("Stats metadata not available");
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
|
|
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
|
auto clear_incorrect_min_max_column_names = [&s] {
|
|
s.min_column_names.elements.clear();
|
|
s.max_column_names.elements.clear();
|
|
};
|
|
auto& min_column_names = s.min_column_names.elements;
|
|
auto& max_column_names = s.max_column_names.elements;
|
|
|
|
if (min_column_names.empty() && max_column_names.empty()) {
|
|
return;
|
|
}
|
|
|
|
// The min/max metadata is wrong if:
|
|
// - it's not empty and schema defines no clustering key.
|
|
//
|
|
// Notes:
|
|
// - we are going to rely on min/max column names for
|
|
// clustering filtering only from md-format sstables,
|
|
// see sstable::may_contain_rows().
|
|
// We choose not to clear_incorrect_min_max_column_names
|
|
// from older versions here as this disturbs sstable unit tests.
|
|
//
|
|
// - now that we store min/max metadata for range tombstones,
|
|
// their size may differ.
|
|
if (!_schema->clustering_key_size()) {
|
|
clear_incorrect_min_max_column_names();
|
|
return;
|
|
}
|
|
}
|
|
|
|
void sstable::validate_max_local_deletion_time() {
|
|
if (!has_correct_max_deletion_time()) {
|
|
auto& entry = _components->statistics.contents[metadata_type::Stats];
|
|
auto& s = *static_cast<stats_metadata*>(entry.get());
|
|
s.max_local_deletion_time = std::numeric_limits<int32_t>::max();
|
|
}
|
|
}
|
|
|
|
void sstable::set_min_max_position_range() {
|
|
if (!_schema->clustering_key_size()) {
|
|
return;
|
|
}
|
|
|
|
auto& min_elements = get_stats_metadata().min_column_names.elements;
|
|
auto& max_elements = get_stats_metadata().max_column_names.elements;
|
|
|
|
if (min_elements.empty() && max_elements.empty()) {
|
|
return;
|
|
}
|
|
|
|
auto pip = [] (const utils::chunked_vector<disk_string<uint16_t>>& column_names, bound_kind kind) {
|
|
std::vector<bytes> key_bytes;
|
|
key_bytes.reserve(column_names.size());
|
|
for (auto& value : column_names) {
|
|
key_bytes.emplace_back(bytes_view(value));
|
|
}
|
|
auto ckp = clustering_key_prefix(std::move(key_bytes));
|
|
return position_in_partition(position_in_partition::range_tag_t(), kind, std::move(ckp));
|
|
};
|
|
|
|
_min_max_position_range = position_range(pip(min_elements, bound_kind::incl_start), pip(max_elements, bound_kind::incl_end));
|
|
}
|
|
|
|
future<std::optional<position_in_partition>>
|
|
sstable::find_first_position_in_partition(reader_permit permit, const dht::decorated_key& key, bool reversed) {
|
|
using position_in_partition_opt = std::optional<position_in_partition>;
|
|
class position_finder {
|
|
position_in_partition_opt& _pos;
|
|
bool _reversed;
|
|
private:
|
|
// If consuming in reversed mode, range_tombstone_change or clustering_row will have
|
|
// its bound weight reversed, so we need to revert it here so the returned position
|
|
// can be correctly used to mark the end bound.
|
|
void on_position_found(position_in_partition&& pos, bool reverse_pos = false) {
|
|
_pos = reverse_pos ? std::move(pos).reversed() : std::move(pos);
|
|
}
|
|
public:
|
|
position_finder(position_in_partition_opt& pos, bool reversed) noexcept : _pos(pos), _reversed(reversed) {}
|
|
|
|
void consume_new_partition(const dht::decorated_key& dk) {}
|
|
|
|
stop_iteration consume(tombstone t) {
|
|
// Handle case partition contains only a partition_tombstone, so position_in_partition
|
|
// for this key should be before all rows.
|
|
on_position_found(position_in_partition::before_all_clustered_rows());
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
stop_iteration consume(range_tombstone_change&& rt) {
|
|
on_position_found(std::move(std::move(rt)).position(), _reversed);
|
|
return stop_iteration::yes;
|
|
}
|
|
|
|
stop_iteration consume(clustering_row&& cr) {
|
|
on_position_found(position_in_partition::for_key(std::move(cr.key())), _reversed);
|
|
return stop_iteration::yes;
|
|
}
|
|
|
|
stop_iteration consume(static_row&& sr) {
|
|
on_position_found(position_in_partition(sr.position()));
|
|
// If reversed == true, we shouldn't stop at static row as we want to find the last row.
|
|
// We don't want to ignore its position, to handle the case where partition contains only static row.
|
|
return stop_iteration(!_reversed);
|
|
}
|
|
|
|
stop_iteration consume_end_of_partition() {
|
|
// Handle case where partition has no rows.
|
|
if (!_pos) {
|
|
on_position_found(_reversed ? position_in_partition::after_all_clustered_rows() : position_in_partition::before_all_clustered_rows());
|
|
}
|
|
return stop_iteration::yes;
|
|
}
|
|
|
|
mutation_opt consume_end_of_stream() {
|
|
return std::nullopt;
|
|
}
|
|
};
|
|
|
|
auto pr = dht::partition_range::make_singular(key);
|
|
auto s = get_schema();
|
|
auto full_slice = s->full_slice();
|
|
if (reversed) {
|
|
s = s->make_reversed();
|
|
full_slice.options.set(query::partition_slice::option::reversed);
|
|
}
|
|
auto r = make_reader(s, std::move(permit), pr, full_slice, {}, streamed_mutation::forwarding::no,
|
|
mutation_reader::forwarding::no /* to avoid reading past the partition end */);
|
|
|
|
position_in_partition_opt ret = std::nullopt;
|
|
position_finder finder(ret, reversed);
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await r.consume(finder);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await r.close();
|
|
if (ex) {
|
|
co_await coroutine::exception(std::move(ex));
|
|
}
|
|
co_return std::move(ret);
|
|
}
|
|
|
|
future<> sstable::load_first_and_last_position_in_partition() {
|
|
if (!_schema->clustering_key_size()) {
|
|
co_return;
|
|
}
|
|
|
|
auto& sem = _manager.sstable_metadata_concurrency_sem();
|
|
reader_permit permit = co_await sem.obtain_permit(_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {});
|
|
auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false);
|
|
auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true);
|
|
|
|
// Allow loading to proceed even if we were unable to load this metadata as the lack of it
|
|
// will not affect correctness.
|
|
if (!first_pos_opt || !last_pos_opt) {
|
|
sstlog.warn("Unable to retrieve metadata for first and last keys of {}. Not a critical error.", get_filename());
|
|
co_return;
|
|
}
|
|
|
|
_first_partition_first_position = std::move(*first_pos_opt);
|
|
_last_partition_last_position = std::move(*last_pos_opt);
|
|
}
|
|
|
|
double sstable::estimate_droppable_tombstone_ratio(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const {
|
|
auto gc_before = get_gc_before_for_drop_estimation(compaction_time, gc_state, s);
|
|
|
|
auto& st = get_stats_metadata();
|
|
auto estimated_count = st.estimated_cells_count.mean() * st.estimated_cells_count.count();
|
|
if (estimated_count > 0) {
|
|
double droppable = st.estimated_tombstone_drop_time.sum(gc_before.time_since_epoch().count());
|
|
return droppable / estimated_count;
|
|
}
|
|
return 0.0f;
|
|
}
|
|
|
|
future<> sstable::read_statistics() {
|
|
co_await read_simple_and_verify_digest<component_type::Statistics>(_components->statistics);
|
|
}
|
|
|
|
future<> sstable::read_partitions_db_footer() {
|
|
if (has_component(component_type::Partitions) && !_partitions_db_footer) {
|
|
if (!_partitions_file) {
|
|
_partitions_file = co_await open_file(component_type::Partitions, open_flags::ro);
|
|
_partitions_file_size = co_await _partitions_file.size();
|
|
}
|
|
_partitions_db_footer = co_await trie::read_bti_partitions_db_footer(*_schema, _version, _partitions_file, _partitions_file_size);
|
|
}
|
|
}
|
|
|
|
void sstable::write_statistics() {
|
|
auto digest = write_simple_with_digest<component_type::Statistics>(_components->statistics);
|
|
_components_digests.map[component_type::Statistics] = digest;
|
|
}
|
|
|
|
void sstable::mark_as_being_repaired(const service::session_id& id) {
|
|
being_repaired = id;
|
|
}
|
|
|
|
std::optional<uint32_t> sstable::get_component_digest(component_type c) const {
|
|
if (!has_scylla_component()) {
|
|
return std::nullopt;
|
|
}
|
|
if (c == component_type::Scylla) {
|
|
return _components->scylla_metadata->digest;
|
|
}
|
|
const auto* cd = _components->scylla_metadata->get_components_digests();
|
|
if (!cd) {
|
|
return std::nullopt;
|
|
}
|
|
auto it = cd->map.find(c);
|
|
if (it == cd->map.end()) {
|
|
return std::nullopt;
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
int64_t sstable::update_repaired_at(int64_t repaired_at) {
|
|
const stats_metadata& old_stats = get_stats_metadata();
|
|
auto old_repaired_at = old_stats.repaired_at;
|
|
if (old_repaired_at == repaired_at) {
|
|
return old_repaired_at;
|
|
}
|
|
auto stats = std::make_unique<stats_metadata>(old_stats);
|
|
stats->repaired_at = repaired_at;
|
|
_components->statistics.contents[metadata_type::Stats] = std::move(stats);
|
|
return old_repaired_at;
|
|
}
|
|
|
|
future<> sstable::copy_components(const sstable& src) {
|
|
_components = co_await src._components.copy();
|
|
_recognized_components = src._recognized_components;
|
|
}
|
|
|
|
bool sstable::should_update_repaired_at(int64_t repaired_at) const {
|
|
const stats_metadata& stats = get_stats_metadata();
|
|
return stats.repaired_at != repaired_at;
|
|
}
|
|
|
|
// Creates a new SSTable generation by hard-linking existing components and rewriting a specific one.
|
|
// This efficiently creates a modified SSTable without copying all files.
|
|
// 1. Create a new SSTable object with a new generation number
|
|
// - The creator function determines the new generation
|
|
// 2. Hard-link all components EXCEPT the one being rewritten and Scylla metadata
|
|
// - The component being rewritten will be written fresh (not linked)
|
|
// - Scylla metadata must be rewritten to include the new component's digest
|
|
// 3. Copy in-memory component metadata from the source SSTable
|
|
// - This doesn't deep copy _components, just copies the foreign_ptr
|
|
// 4. Apply the modifier function to the new SSTable's components
|
|
// 5. Re-read the Scylla metadata from disk
|
|
// - Ensures we have the latest on-disk metadata (not potentially modified in-memory state)
|
|
// 6. If update_sstable_id is true, update the Scylla metadata's sstable identifier to a new value
|
|
// 7. Write the component with updated Scylla metadata
|
|
// - Uses write_component_with_metadata() which handles digest calculation and metadata updates
|
|
// 8. Finalize the new SSTable
|
|
// - Copy sharding information, seal the SSTable, and open data files
|
|
future<shared_sstable> sstable::link_with_rewritten_component(std::function<shared_sstable(shared_sstable)> sstable_creator,
|
|
component_type component,
|
|
std::function<void(sstable&)> modifier,
|
|
bool update_sstable_id) {
|
|
if (!is_component_rewrite_supported(component)) {
|
|
on_internal_error(sstlog, "Only Statistics component can be rewritten.");
|
|
}
|
|
|
|
if (!has_component(component)) {
|
|
on_internal_error(sstlog, fmt::format("SSTable does not have {} component to rewrite.", component_name(*this, component)));
|
|
}
|
|
|
|
if (!has_scylla_component()) {
|
|
on_internal_error(sstlog, "SSTable must have Scylla component to rewrite Statistics component.");
|
|
}
|
|
|
|
return seastar::async([this, creator = std::move(sstable_creator), component, modifier = std::move(modifier), update_sstable_id] {
|
|
auto new_sst = creator(shared_from_this());
|
|
auto generation = new_sst->generation();
|
|
|
|
_storage->link_with_excluded_components(*this, generation, {component, component_type::Scylla}).get();
|
|
new_sst->copy_components(*this).get();
|
|
|
|
modifier(*new_sst);
|
|
|
|
// FIXME: Optimize by re-reading metadata only if _components->scylla_metadata was modified after loading.
|
|
// If unchanged, reuse the existing _components->scylla_metadata instead.
|
|
scylla_metadata metadata;
|
|
read_simple<component_type::Scylla>(metadata).get();
|
|
if (update_sstable_id) {
|
|
metadata.set_sstable_identifier();
|
|
}
|
|
|
|
new_sst->write_component_with_metadata(component, std::move(metadata));
|
|
|
|
new_sst->_shards = this->_shards;
|
|
new_sst->seal_sstable(false).get();
|
|
new_sst->open_data().get();
|
|
|
|
_cloned_to_sstable_filename = new_sst->component_basename(component_type::Data);
|
|
return new_sst;
|
|
});
|
|
}
|
|
|
|
// Rewrites a single SSTable component along with updated Scylla metadata.
|
|
// This is used when modifying components (e.g., Statistics) without rewriting the entire SSTable.
|
|
// 1. Write the component file (e.g., Statistics-*.db)
|
|
// - This calculates and stores the component's digest in _components_digests.map[type]
|
|
// 2. Update the Scylla metadata's ComponentsDigests map with the new component digest
|
|
// 3. Calculate the Scylla metadata's own digest based on its updated data
|
|
// 4. Write the Scylla metadata component file
|
|
// 5. Set the in-memory Scylla metadata to the new metadata
|
|
void sstable::write_component_with_metadata(component_type type, scylla_metadata metadata) {
|
|
if (!is_component_rewrite_supported(type)) {
|
|
on_internal_error(sstlog, "Only Statistics component can be rewritten.");
|
|
}
|
|
|
|
write_component(type);
|
|
|
|
metadata.get_or_create_components_digests().map[type] = _components_digests.map[type];
|
|
metadata.digest = serialized_checksum(_version, metadata.data);
|
|
|
|
write_simple<component_type::Scylla>(metadata);
|
|
|
|
_components->scylla_metadata = std::move(metadata);
|
|
}
|
|
|
|
future<> sstable::read_summary() noexcept {
|
|
if (_components->summary) {
|
|
co_return;
|
|
}
|
|
|
|
co_await read_toc();
|
|
|
|
if (has_component(component_type::Summary)) {
|
|
// We'll try to keep the main code path exception free, but if an exception does happen
|
|
// we can try to regenerate the Summary.
|
|
try {
|
|
co_await read_simple_and_verify_digest<component_type::Summary>(_components->summary);
|
|
} catch (...) {
|
|
auto ep = std::current_exception();
|
|
sstlog.warn("Couldn't read summary file {}: {}.", this->filename(component_type::Summary), ep);
|
|
}
|
|
}
|
|
|
|
if (!has_component(component_type::Index)) {
|
|
co_return;
|
|
}
|
|
|
|
co_await generate_summary();
|
|
}
|
|
|
|
future<file> sstable::open_file(component_type type, open_flags flags, file_open_options opts) const noexcept {
|
|
return new_sstable_component_file(_read_error_handler, type, flags, opts);
|
|
}
|
|
|
|
future<> sstable::open_or_create_data(open_flags oflags, file_open_options options) noexcept {
|
|
utils::small_vector<future<>, 4> futures;
|
|
if (has_component(component_type::Index)) {
|
|
futures.push_back(open_file(component_type::Index, oflags, options).then([this] (file f) { _index_file = std::move(f); }));
|
|
}
|
|
futures.push_back(open_file(component_type::Data, oflags, options).then([this] (file f) { _data_file = std::move(f); }));
|
|
if (has_component(component_type::Partitions) && !_partitions_file) {
|
|
// FIXME: if _partitions_file is already opened, we are ignoring options and flags.
|
|
// (Although in practice the options are always default and flags are always `ro`).
|
|
// If we care about that, we should close _partition_file and reopen it here.
|
|
futures.push_back(open_file(component_type::Partitions, oflags, options).then([this] (file f) { _partitions_file = std::move(f); }));
|
|
}
|
|
if (has_component(component_type::Rows)) {
|
|
futures.push_back(open_file(component_type::Rows, oflags, options).then([this] (file f) { _rows_file = std::move(f); }));
|
|
}
|
|
return when_all_succeed(futures.begin(), futures.end()).discard_result();
|
|
}
|
|
|
|
future<> sstable::open_data(sstable_open_config cfg) noexcept {
|
|
co_await open_or_create_data(open_flags::ro);
|
|
co_await update_info_for_opened_data(cfg);
|
|
parse_assert(!_shards.empty(), get_filename());
|
|
auto* sm = _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>();
|
|
if (sm && !cfg.keep_sharding_metadata) {
|
|
// Sharding information uses a lot of memory and once we're doing with this computation we will no longer use it.
|
|
co_await utils::clear_gently(sm->token_ranges.elements);
|
|
sm->token_ranges.elements = {};
|
|
}
|
|
auto* ld_stats = _components->scylla_metadata->data.get<scylla_metadata_type::LargeDataStats, scylla_metadata::large_data_stats>();
|
|
if (ld_stats) {
|
|
_large_data_stats.emplace(*ld_stats);
|
|
}
|
|
auto* origin = _components->scylla_metadata->data.get<scylla_metadata_type::SSTableOrigin, scylla_metadata::sstable_origin>();
|
|
if (origin) {
|
|
_origin = sstring(to_string_view(bytes_view(origin->value)));
|
|
}
|
|
auto* ts_stats = _components->scylla_metadata->data.get<scylla_metadata_type::ExtTimestampStats, scylla_metadata::ext_timestamp_stats>();
|
|
if (ts_stats) {
|
|
_ext_timestamp_stats.emplace(*ts_stats);
|
|
}
|
|
_open_mode.emplace(open_flags::ro);
|
|
_stats.on_open_for_reading();
|
|
|
|
_total_reclaimable_memory.reset();
|
|
_manager.increment_total_reclaimable_memory(this);
|
|
}
|
|
|
|
future<> sstable::update_info_for_opened_data(sstable_open_config cfg) {
|
|
struct stat st = co_await _data_file.stat();
|
|
|
|
if (this->has_component(component_type::CompressionInfo)) {
|
|
_components->compression.update(st.st_size);
|
|
}
|
|
_data_file_size = st.st_size;
|
|
_data_file_write_time = db_clock::from_time_t(st.st_mtime);
|
|
|
|
if (_index_file) {
|
|
auto size = co_await _index_file.size();
|
|
_index_file_size = size;
|
|
parse_assert(!_cached_index_file, get_filename());
|
|
_cached_index_file = seastar::make_shared<cached_file>(_index_file,
|
|
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
|
_manager.get_cache_tracker().get_lru(),
|
|
_manager.get_cache_tracker().region(),
|
|
_index_file_size);
|
|
_index_file = make_cached_seastar_file(*_cached_index_file);
|
|
}
|
|
if (_partitions_file) {
|
|
auto size = co_await _partitions_file.size();
|
|
_partitions_file_size = size;
|
|
_cached_partitions_file = seastar::make_shared<cached_file>(
|
|
_partitions_file,
|
|
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
|
_manager.get_cache_tracker().get_lru(),
|
|
_manager.get_cache_tracker().region(),
|
|
size,
|
|
component_name(*this, component_type::Partitions).format()
|
|
);
|
|
_partitions_file = make_cached_seastar_file(*_cached_partitions_file);
|
|
co_await read_partitions_db_footer();
|
|
}
|
|
if (_rows_file) {
|
|
auto size = co_await _rows_file.size();
|
|
_rows_file_size = size;
|
|
_cached_rows_file = seastar::make_shared<cached_file>(
|
|
_rows_file,
|
|
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
|
_manager.get_cache_tracker().get_lru(),
|
|
_manager.get_cache_tracker().region(),
|
|
size,
|
|
component_name(*this, component_type::Rows).format()
|
|
);
|
|
_rows_file = make_cached_seastar_file(*_cached_rows_file);
|
|
}
|
|
|
|
this->set_min_max_position_range();
|
|
this->set_first_and_last_keys();
|
|
_run_identifier = _components->scylla_metadata->get_optional_run_identifier().value_or(run_id::create_random_id());
|
|
|
|
_sstable_identifier = _components->scylla_metadata->get_optional_sstable_identifier();
|
|
|
|
if (cfg.load_first_and_last_position_metadata) {
|
|
co_await load_first_and_last_position_in_partition();
|
|
}
|
|
}
|
|
|
|
future<> sstable::create_data() noexcept {
|
|
auto oflags = sstable_write_open_flags;
|
|
file_open_options opt;
|
|
opt.extent_allocation_size_hint = 32 << 20;
|
|
opt.sloppy_size = true;
|
|
return open_or_create_data(oflags, std::move(opt)).then([this, oflags] {
|
|
_open_mode.emplace(oflags);
|
|
});
|
|
}
|
|
|
|
future<> sstable::drop_caches() {
|
|
if (_cached_index_file) {
|
|
co_await _cached_index_file->evict_gently();
|
|
}
|
|
if (_cached_partitions_file) {
|
|
co_await _cached_partitions_file->evict_gently();
|
|
}
|
|
if (_cached_rows_file) {
|
|
co_await _cached_rows_file->evict_gently();
|
|
}
|
|
co_await _index_cache->evict_gently();
|
|
}
|
|
|
|
// Return the filter format for the given sstable version
|
|
static inline utils::filter_format get_filter_format(sstable_version_types version) {
|
|
return (version >= sstable_version_types::mc)
|
|
? utils::filter_format::m_format
|
|
: utils::filter_format::k_l_format;
|
|
}
|
|
|
|
future<> sstable::read_filter(sstable_open_config cfg) {
|
|
if (!cfg.load_bloom_filter || !has_component(component_type::Filter)) {
|
|
_components->filter = std::make_unique<utils::filter::always_present_filter>();
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
return seastar::async([this] () mutable {
|
|
sstables::filter filter;
|
|
read_simple_and_verify_digest<component_type::Filter>(filter).get();
|
|
auto nr_bits = filter.buckets.elements.size() * std::numeric_limits<typename decltype(filter.buckets.elements)::value_type>::digits;
|
|
large_bitset bs(nr_bits, std::move(filter.buckets.elements));
|
|
_components->filter = utils::filter::create_filter(filter.hashes, std::move(bs), get_filter_format(_version));
|
|
});
|
|
}
|
|
|
|
void sstable::write_filter() {
|
|
if (!has_component(component_type::Filter)) {
|
|
return;
|
|
}
|
|
|
|
auto f = downcast_ptr<utils::filter::murmur3_bloom_filter>(_components->filter.get());
|
|
|
|
auto&& bs = f->bits();
|
|
auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage());
|
|
auto digest = write_simple_with_digest<component_type::Filter>(filter_ref);
|
|
_components_digests.map[component_type::Filter] = digest;
|
|
}
|
|
|
|
void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) {
|
|
if (!has_component(component_type::Filter)) {
|
|
return;
|
|
}
|
|
if (!has_component(component_type::Index)) {
|
|
return;
|
|
}
|
|
|
|
// Skip rebuilding the bloom filter if the false positive rate based
|
|
// on the current bitset size is within 75% to 125% of the configured
|
|
// false positive rate.
|
|
auto curr_bitset_size = downcast_ptr<utils::filter::bloom_filter>(_components->filter.get())->bits().memory_size();
|
|
auto bitset_size_lower_bound = utils::i_filter::get_filter_size(num_partitions,
|
|
_schema->bloom_filter_fp_chance() * 1.25);
|
|
auto bitset_size_upper_bound = utils::i_filter::get_filter_size(num_partitions,
|
|
_schema->bloom_filter_fp_chance() * 0.75);
|
|
if (bitset_size_lower_bound <= curr_bitset_size && curr_bitset_size <= bitset_size_upper_bound) {
|
|
return;
|
|
}
|
|
|
|
// The initial partition estimate was inaccurate but perform resize only if it is worth doing, based on the following criteria.
|
|
// 1. Do not resize if the size difference is less than 10% of the current size, or less than 1K.
|
|
// - to prevent resizing for small sstables where the savings are minimal.
|
|
// 2. Do not resize if the current filter is larger than the optimal one but still under 16K.
|
|
// - to avoid downsizing when the savings are minimal.
|
|
// - the fp rate is also already at least at the configured value, so no gain there.
|
|
// 3. Do not resize filters of garbage_collected sstables.
|
|
const auto optimal_filter_size = utils::i_filter::get_filter_size(num_partitions, _schema->bloom_filter_fp_chance());
|
|
const auto filter_size_diff = std::abs<int64_t>(optimal_filter_size - curr_bitset_size);
|
|
if (filter_size_diff < 1024 || filter_size_diff < 0.1 * curr_bitset_size || // [1]
|
|
(curr_bitset_size > optimal_filter_size && curr_bitset_size < 16384) || // [2]
|
|
_origin == "garbage_collection") { // [3]
|
|
return;
|
|
}
|
|
|
|
// Consumer that adds the keys from index entries to the given bloom filter
|
|
class bloom_filter_builder {
|
|
utils::filter_ptr& _filter;
|
|
public:
|
|
bloom_filter_builder(utils::filter_ptr &filter) : _filter(filter) {}
|
|
void consume_entry(parsed_partition_index_entry&& e) {
|
|
_filter->add(to_bytes_view(e.key));
|
|
}
|
|
};
|
|
|
|
// Create a new filter that can optimally represent the given num_partitions.
|
|
auto optimal_filter = utils::i_filter::get_filter(num_partitions, _schema->bloom_filter_fp_chance(), get_filter_format(_version));
|
|
sstlog.info("Rebuilding bloom filter {}: resizing bitset from {} bytes to {} bytes. sstable origin: {}", filename(component_type::Filter), curr_bitset_size,
|
|
downcast_ptr<utils::filter::bloom_filter>(optimal_filter.get())->bits().memory_size(), _origin);
|
|
|
|
auto index_file = open_file(component_type::Index, open_flags::ro).get();
|
|
auto index_file_closer = deferred_action([&index_file] {
|
|
try {
|
|
index_file.close().get();
|
|
} catch (...) {
|
|
sstlog.warn("sstable close index_file failed: {}", std::current_exception());
|
|
general_disk_error();
|
|
}
|
|
});
|
|
auto index_file_size = index_file.size().get();
|
|
|
|
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstable::rebuild_filter_from_index",
|
|
reader_concurrency_semaphore::register_metrics::no);
|
|
auto sem_stopper = deferred_stop(sem);
|
|
|
|
// rebuild the filter using index_consume_entry_context
|
|
bloom_filter_builder bfb_consumer(optimal_filter);
|
|
index_consume_entry_context<bloom_filter_builder> consumer_ctx(
|
|
*this, sem.make_tracking_only_permit(_schema, "rebuild_filter_from_index", db::no_timeout, {}), bfb_consumer, trust_promoted_index::no,
|
|
make_file_input_stream(index_file, 0, index_file_size, {.buffer_size = sstable_buffer_size}), 0, index_file_size,
|
|
get_column_translation(*_schema), _manager._abort);
|
|
auto consumer_ctx_closer = deferred_close(consumer_ctx);
|
|
try {
|
|
consumer_ctx.consume_input().get();
|
|
} catch (...) {
|
|
sstlog.warn("Failed to rebuild bloom filter {} : {}. Existing bloom filter will be written to disk.", filename(component_type::Filter), std::current_exception());
|
|
return;
|
|
}
|
|
|
|
// Replace the existing filter with the new optimal filter.
|
|
_components->filter.swap(optimal_filter);
|
|
}
|
|
|
|
void sstable::build_delayed_filter(uint64_t num_partitions) {
|
|
auto optimal_filter = utils::i_filter::get_filter(num_partitions, _schema->bloom_filter_fp_chance(), get_filter_format(_version));
|
|
sstlog.debug("Building delayed bloom filter {}: {} filter bytes. sstable origin: {}", filename(component_type::Filter),
|
|
downcast_ptr<utils::filter::bloom_filter>(optimal_filter.get())->bits().memory_size(), _origin);
|
|
|
|
auto hashes_file = open_file(component_type::TemporaryHashes, open_flags::ro).get();
|
|
auto hashes_file_closer = deferred_close(hashes_file);
|
|
constexpr uint64_t murmur_hash_size_bytes = 16;
|
|
|
|
file_input_stream_options options = {
|
|
.buffer_size = sstable_buffer_size,
|
|
.read_ahead = 1,
|
|
};
|
|
auto in = make_file_input_stream(hashes_file, 0, num_partitions * murmur_hash_size_bytes, options);
|
|
auto in_closer = deferred_close(in);
|
|
|
|
constexpr uint64_t batch_size_bytes = 4096;
|
|
static_assert(batch_size_bytes % murmur_hash_size_bytes == 0, "Batch size must be a multiple of hash size");
|
|
|
|
size_t processed_hashes = 0;
|
|
while (processed_hashes < num_partitions) {
|
|
auto buf = in.read_exactly(batch_size_bytes).get();
|
|
auto p = buf.get();
|
|
for (uint64_t offset = 0; offset + murmur_hash_size_bytes <= buf.size(); offset += murmur_hash_size_bytes) {
|
|
std::array<uint64_t, 2> hash;
|
|
std::memcpy(hash.data(), p + offset, sizeof(hash));
|
|
hash[0] = seastar::le_to_cpu(hash[0]);
|
|
hash[1] = seastar::le_to_cpu(hash[1]);
|
|
auto hashed_key = utils::hashed_key(hash);
|
|
optimal_filter->add(hashed_key);
|
|
processed_hashes++;
|
|
}
|
|
if (buf.size() < batch_size_bytes) {
|
|
break;
|
|
}
|
|
}
|
|
if (processed_hashes != num_partitions) {
|
|
throw malformed_sstable_exception(fmt::format("Temporary hashes file {} was supposed to contain {} hashes, but it contains only {} hashes",
|
|
filename(component_type::TemporaryHashes), num_partitions, processed_hashes));
|
|
}
|
|
|
|
_components->filter.swap(optimal_filter);
|
|
unlink_component(component_type::TemporaryHashes).get();
|
|
}
|
|
|
|
size_t sstable::total_reclaimable_memory_size() const {
|
|
if (!_total_reclaimable_memory) {
|
|
_total_reclaimable_memory = _components->filter ? _components->filter->memory_size() : 0;
|
|
}
|
|
|
|
return _total_reclaimable_memory.value();
|
|
}
|
|
|
|
size_t sstable::reclaim_memory_from_components() {
|
|
size_t memory_reclaimed_this_iteration = 0;
|
|
|
|
if (_components->filter) {
|
|
auto filter_memory_size = _components->filter->memory_size();
|
|
if (filter_memory_size > 0) {
|
|
// Discard it from memory by replacing it with an always present variant.
|
|
// No need to remove it from _recognized_components as the filter is still in disk.
|
|
_components->filter = std::make_unique<utils::filter::always_present_filter>();
|
|
memory_reclaimed_this_iteration += filter_memory_size;
|
|
}
|
|
}
|
|
|
|
_total_reclaimable_memory.reset();
|
|
_total_memory_reclaimed += memory_reclaimed_this_iteration;
|
|
return memory_reclaimed_this_iteration;
|
|
}
|
|
|
|
size_t sstable::total_memory_reclaimed() const {
|
|
return _total_memory_reclaimed;
|
|
}
|
|
|
|
future<> sstable::reload_reclaimed_components() {
|
|
if (_total_memory_reclaimed == 0) {
|
|
// nothing to reload
|
|
co_return;
|
|
}
|
|
|
|
co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", utils::wait_for_message(std::chrono::seconds(5)));
|
|
|
|
co_await read_filter();
|
|
_total_reclaimable_memory.reset();
|
|
_total_memory_reclaimed -= _components->filter->memory_size();
|
|
sstlog.info("Reloaded bloom filter of {}", get_filename());
|
|
}
|
|
|
|
void sstable::disable_component_memory_reload() {
|
|
if (total_reclaimable_memory_size() > 0) {
|
|
// should be called only when the components have been dropped already
|
|
on_internal_error(sstlog, "disable_component_memory_reload() called with reclaimable memory");
|
|
}
|
|
|
|
_total_memory_reclaimed = 0;
|
|
}
|
|
|
|
bool sstable::is_component_rewrite_supported(component_type type) {
|
|
switch (type) {
|
|
case component_type::Statistics:
|
|
return true;
|
|
default:
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void sstable::write_component(component_type type) {
|
|
switch (type) {
|
|
case component_type::Statistics:
|
|
write_statistics();
|
|
break;
|
|
default:
|
|
on_internal_error(sstlog, fmt::format("Writing component {} is not supported.", component_name(*this, type)));
|
|
}
|
|
}
|
|
|
|
|
|
future<> sstable::load_metadata(sstable_open_config cfg) noexcept {
|
|
co_await read_toc(cfg);
|
|
// read scylla-meta after toc. Might need it to parse
|
|
// rest (hint extensions)
|
|
co_await read_scylla_metadata();
|
|
// Read statistics ahead of others - if summary is missing
|
|
// we'll attempt to re-generate it and we need statistics for that
|
|
co_await read_statistics();
|
|
co_await coroutine::all(
|
|
[&] { return read_compression(); },
|
|
[&] { return read_filter(cfg); },
|
|
[&] { return read_summary(); });
|
|
}
|
|
|
|
// This interface is only used during tests, snapshot loading and early initialization.
|
|
// No need to set tunable priorities for it.
|
|
future<> sstable::load(const dht::sharder& sharder, sstable_open_config cfg) noexcept {
|
|
_ignore_component_digest_mismatch = cfg.ignore_component_digest_mismatch;
|
|
co_await load_metadata(cfg);
|
|
validate_min_max_metadata();
|
|
validate_max_local_deletion_time();
|
|
validate_partitioner();
|
|
if (_shards.empty()) {
|
|
co_await read_partitions_db_footer();
|
|
set_first_and_last_keys();
|
|
_shards = cfg.current_shard_as_sstable_owner ?
|
|
std::vector<unsigned>{this_shard_id()} : compute_shards_for_this_sstable(sharder);
|
|
}
|
|
co_await open_data(cfg);
|
|
}
|
|
|
|
future<> sstable::load(sstables::foreign_sstable_open_info info) noexcept {
|
|
static_assert(std::is_nothrow_move_constructible_v<sstables::foreign_sstable_open_info>);
|
|
co_await read_toc();
|
|
_components = std::move(info.components);
|
|
_data_file = make_checked_file(_read_error_handler, info.data.to_file());
|
|
if (info.index) {
|
|
_index_file = make_checked_file(_read_error_handler, info.index->to_file());
|
|
}
|
|
if (info.partitions) {
|
|
_partitions_file = make_checked_file(_read_error_handler, info.partitions->to_file());
|
|
}
|
|
if (info.rows) {
|
|
_rows_file = make_checked_file(_read_error_handler, info.rows->to_file());
|
|
}
|
|
_shards = std::move(info.owners);
|
|
_metadata_size_on_disk = info.metadata_size_on_disk;
|
|
validate_min_max_metadata();
|
|
validate_max_local_deletion_time();
|
|
validate_partitioner();
|
|
co_await update_info_for_opened_data();
|
|
_total_reclaimable_memory.reset();
|
|
_manager.increment_total_reclaimable_memory(this);
|
|
}
|
|
|
|
future<foreign_sstable_open_info> sstable::get_open_info() & {
|
|
return _components.copy().then([this] (auto c) mutable {
|
|
return foreign_sstable_open_info{
|
|
.components = std::move(c),
|
|
.owners = this->get_shards_for_this_sstable(),
|
|
.data = _data_file.dup(),
|
|
.index = _index_file ? std::optional<seastar::file_handle>(_index_file.dup()) : std::nullopt,
|
|
.partitions = _partitions_file ? std::optional<seastar::file_handle>(_partitions_file.dup()) : std::nullopt,
|
|
.rows = _rows_file ? std::optional<seastar::file_handle>(_rows_file.dup()) : std::nullopt,
|
|
.generation = _generation,
|
|
.version = _version,
|
|
.format = _format,
|
|
.uncompressed_data_size = data_size(),
|
|
.metadata_size_on_disk = _metadata_size_on_disk
|
|
};
|
|
});
|
|
}
|
|
|
|
entry_descriptor sstable::get_descriptor(component_type c) const {
|
|
return entry_descriptor(_generation, _version, _format, c);
|
|
}
|
|
|
|
future<>
|
|
sstable::load_owner_shards(const dht::sharder& sharder) {
|
|
if (!_shards.empty()) {
|
|
sstlog.trace("{}: shards={}", get_filename(), _shards);
|
|
co_return;
|
|
}
|
|
co_await read_scylla_metadata();
|
|
|
|
auto has_valid_sharding_metadata = std::invoke([this] {
|
|
if (!has_component(component_type::Scylla)) {
|
|
return false;
|
|
}
|
|
|
|
auto& scylla_metadata = _components->scylla_metadata;
|
|
const auto* sm = scylla_metadata
|
|
? scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>()
|
|
: nullptr;
|
|
return sm && sm->token_ranges.elements.size();
|
|
});
|
|
// Statistics is needed for SSTable loading validation and possible Summary regeneration.
|
|
co_await read_statistics();
|
|
|
|
// If sharding metadata is not available, we must load first and last keys from summary
|
|
// for sstable::compute_shards_for_this_sstable() to operate on them.
|
|
if (!has_valid_sharding_metadata) {
|
|
sstlog.warn("Sharding metadata not available for {}, so Summary will be read to allow Scylla to compute shards owning the SSTable.", get_filename());
|
|
co_await read_summary();
|
|
co_await read_partitions_db_footer();
|
|
set_first_and_last_keys();
|
|
}
|
|
|
|
_shards = compute_shards_for_this_sstable(sharder);
|
|
sstlog.trace("{}: shards={}", get_filename(), _shards);
|
|
}
|
|
|
|
void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) {
|
|
parse_assert(expected_partition_count >= 1);
|
|
|
|
s.header.min_index_interval = min_index_interval;
|
|
s.header.sampling_level = downsampling::BASE_SAMPLING_LEVEL;
|
|
uint64_t max_expected_entries =
|
|
(expected_partition_count / min_index_interval) +
|
|
!!(expected_partition_count % min_index_interval);
|
|
// FIXME: handle case where max_expected_entries is greater than max value stored by uint32_t.
|
|
if (max_expected_entries > std::numeric_limits<uint32_t>::max()) {
|
|
throw malformed_sstable_exception("Current sampling level (" + to_sstring(downsampling::BASE_SAMPLING_LEVEL) + ") not enough to generate summary.");
|
|
}
|
|
|
|
s.header.memory_size = 0;
|
|
}
|
|
|
|
future<> seal_summary(summary& s,
|
|
std::optional<key>&& first_key,
|
|
std::optional<key>&& last_key,
|
|
const index_sampling_state& state) {
|
|
s.header.size = s.entries.size();
|
|
s.header.size_at_full_sampling = sstable::get_size_at_full_sampling(state.partition_count, s.header.min_index_interval);
|
|
|
|
parse_assert(bool(first_key), {}, "attempted to seal summary of empty sstable");
|
|
s.first_key.value = first_key->get_bytes();
|
|
|
|
if (last_key) {
|
|
s.last_key.value = last_key->get_bytes();
|
|
} else {
|
|
// An empty last_mutation indicates we had just one partition
|
|
s.last_key.value = s.first_key.value;
|
|
}
|
|
|
|
s.header.memory_size = s.header.size * sizeof(uint32_t);
|
|
s.positions.reserve(s.entries.size());
|
|
return do_for_each(s.entries, [&s] (summary_entry& e) {
|
|
s.positions.push_back(s.header.memory_size);
|
|
s.header.memory_size += e.key.size() + sizeof(e.position);
|
|
});
|
|
}
|
|
|
|
static
|
|
void
|
|
populate_statistics_offsets(sstable_version_types v, statistics& s) {
|
|
// copy into a sorted vector to guarantee consistent order
|
|
auto types = s.contents | std::views::keys | std::ranges::to<std::vector>();
|
|
std::ranges::sort(types);
|
|
|
|
// populate the hash with garbage so we can calculate its size
|
|
for (auto t : types) {
|
|
s.offsets.elements.emplace_back(t, -1);
|
|
}
|
|
|
|
auto offset = serialized_size(v, s.offsets);
|
|
s.offsets.elements.clear();
|
|
for (auto t : types) {
|
|
s.offsets.elements.emplace_back(t, offset);
|
|
offset += s.contents[t]->serialized_size(v);
|
|
}
|
|
}
|
|
|
|
static
|
|
sharding_metadata
|
|
create_sharding_metadata(utils::chunked_vector<dht::partition_range> ranges) {
|
|
auto sm = sharding_metadata();
|
|
sm.token_ranges.elements.reserve(ranges.size());
|
|
for (auto&& range : std::move(ranges)) {
|
|
if (true) { // keep indentation
|
|
// we know left/right are not infinite
|
|
auto&& left = range.start()->value();
|
|
auto&& right = range.end()->value();
|
|
auto&& left_token = left.token();
|
|
auto left_exclusive = !left.has_key() && left.bound() == dht::ring_position::token_bound::end;
|
|
auto&& right_token = right.token();
|
|
auto right_exclusive = !right.has_key() && right.bound() == dht::ring_position::token_bound::start;
|
|
sm.token_ranges.elements.push_back(disk_token_range{
|
|
{left_exclusive, {left_token.data()}},
|
|
{right_exclusive, {right_token.data()}}});
|
|
}
|
|
}
|
|
return sm;
|
|
}
|
|
|
|
static
|
|
sharding_metadata
|
|
create_sharding_metadata(schema_ptr schema, const dht::static_sharder& sharder, const dht::decorated_key& first_key, const dht::decorated_key& last_key, shard_id shard) {
|
|
auto prange = dht::partition_range::make(dht::ring_position(first_key), dht::ring_position(last_key));
|
|
auto ranges = dht::split_range_to_single_shard(*schema, sharder, prange, shard).get();
|
|
if (ranges.empty()) {
|
|
auto split_ranges_all_shards = dht::split_range_to_shards(prange, *schema, sharder);
|
|
sstlog.warn("create_sharding_metadata: range={} has no intersection with shard={} first_key={} last_key={} ranges_single_shard={} ranges_all_shards={}",
|
|
prange, shard, first_key, last_key, ranges, split_ranges_all_shards);
|
|
}
|
|
return create_sharding_metadata(std::move(ranges));
|
|
}
|
|
|
|
static
|
|
sharding_metadata
|
|
create_sharding_metadata(schema_ptr schema, const dht::decorated_key& first_key, const dht::decorated_key& last_key, shard_id shard) {
|
|
auto* sharder_opt = schema->try_get_static_sharder();
|
|
if (sharder_opt) {
|
|
return create_sharding_metadata(std::move(schema), *sharder_opt, first_key, last_key, shard);
|
|
}
|
|
utils::chunked_vector<dht::partition_range> r;
|
|
r.push_back(dht::partition_range::make(dht::ring_position(first_key), dht::ring_position(last_key)));
|
|
return create_sharding_metadata(std::move(r));
|
|
}
|
|
|
|
// In the beginning of the statistics file, there is a disk_hash used to
|
|
// map each metadata type to its correspondent position in the file.
|
|
void seal_statistics(sstable_version_types v, statistics& s, metadata_collector& collector,
|
|
const sstring partitioner, double bloom_filter_fp_chance, schema_ptr schema,
|
|
const dht::decorated_key& first_key, const dht::decorated_key& last_key,
|
|
const encoding_stats& enc_stats, const std::set<int>& compaction_ancestors) {
|
|
validation_metadata validation;
|
|
compaction_metadata compaction;
|
|
stats_metadata stats;
|
|
|
|
validation.partitioner.value = to_bytes(partitioner);
|
|
validation.filter_chance = bloom_filter_fp_chance;
|
|
s.contents[metadata_type::Validation] = std::make_unique<validation_metadata>(std::move(validation));
|
|
|
|
collector.construct_compaction(compaction);
|
|
if (v < sstable_version_types::mc && !compaction_ancestors.empty()) {
|
|
compaction.ancestors.elements = utils::chunked_vector<uint32_t>(compaction_ancestors.begin(), compaction_ancestors.end());
|
|
}
|
|
s.contents[metadata_type::Compaction] = std::make_unique<compaction_metadata>(std::move(compaction));
|
|
|
|
collector.construct_stats(stats);
|
|
s.contents[metadata_type::Stats] = std::make_unique<stats_metadata>(std::move(stats));
|
|
|
|
populate_statistics_offsets(v, s);
|
|
}
|
|
|
|
void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset,
|
|
uint64_t index_offset, index_sampling_state& state) {
|
|
state.partition_count++;
|
|
// generates a summary entry when possible (= keep summary / data size ratio within reasonable limits)
|
|
if (data_offset >= state.next_data_offset_to_write_summary) {
|
|
auto entry_size = 8 + 2 + key.size(); // offset + key_size.size + key.size
|
|
state.next_data_offset_to_write_summary += state.summary_byte_cost * entry_size;
|
|
auto key_data = s.add_summary_data(key);
|
|
s.entries.push_back(summary_entry{ token, key_data, index_offset });
|
|
}
|
|
}
|
|
|
|
// Returns the cost for writing a byte to summary such that the ratio of summary
|
|
// to data will be 1 to cost by the time sstable is sealed.
|
|
size_t summary_byte_cost(double summary_ratio) {
|
|
return summary_ratio ? (1 / summary_ratio) : index_sampling_state::default_summary_byte_cost;
|
|
}
|
|
|
|
future<>
|
|
sstable::read_scylla_metadata() noexcept {
|
|
if (_components->scylla_metadata) {
|
|
return make_ready_future<>();
|
|
}
|
|
return read_toc().then([this] {
|
|
_components->scylla_metadata.emplace(); // engaged optional means we won't try to re-read this again
|
|
if (!has_component(component_type::Scylla)) {
|
|
return make_ready_future<>();
|
|
}
|
|
return read_simple<component_type::Scylla>(*_components->scylla_metadata).then([this] -> future<> {
|
|
_features = _components->scylla_metadata->get_features();
|
|
_components->digest = get_component_digest(component_type::Data);
|
|
|
|
validate_component_digest(component_type::TOC, _toc_digest);
|
|
|
|
auto computed_digest = co_await compute_component_file_digest(component_type::Scylla);
|
|
validate_component_digest(component_type::Scylla, computed_digest);
|
|
});
|
|
});
|
|
}
|
|
|
|
static sstable_column_kind to_sstable_column_kind(column_kind k) {
|
|
switch (k) {
|
|
case column_kind::partition_key: return sstable_column_kind::partition_key;
|
|
case column_kind::clustering_key: return sstable_column_kind::clustering_key;
|
|
case column_kind::static_column: return sstable_column_kind::static_column;
|
|
case column_kind::regular_column: return sstable_column_kind::regular_column;
|
|
}
|
|
on_internal_error(sstlog, format("to_sstable_column_kind(): unknown column kind {}", static_cast<std::underlying_type_t<column_kind>>(k)));
|
|
}
|
|
|
|
void
|
|
sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
|
std::optional<scylla_metadata::large_data_stats> ld_stats, std::optional<scylla_metadata::ext_timestamp_stats> ts_stats) {
|
|
auto&& first_key = get_first_decorated_key();
|
|
auto&& last_key = get_last_decorated_key();
|
|
|
|
auto sm = create_sharding_metadata(_schema, first_key, last_key, shard);
|
|
|
|
// sstable write may fail to generate empty metadata if mutation source has only data from other shard.
|
|
// see https://github.com/scylladb/scylla/issues/2932 for details on how it can happen.
|
|
if (sm.token_ranges.elements.empty()) {
|
|
throw std::runtime_error(format("Failed to generate sharding metadata for {}", get_filename()));
|
|
}
|
|
|
|
if (!_components->scylla_metadata) {
|
|
_components->scylla_metadata.emplace();
|
|
}
|
|
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::Sharding>(std::move(sm));
|
|
// Note: data.set() wants an rvalue, so we have to make a copy. It's a uint64 anyway.
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::Features>(sstable_enabled_features(_features));
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::RunIdentifier>(std::move(identifier));
|
|
if (ld_stats) {
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::LargeDataStats>(std::move(*ld_stats));
|
|
}
|
|
if (!_origin.empty()) {
|
|
scylla_metadata::sstable_origin o;
|
|
o.value = bytes(to_bytes_view(std::string_view(_origin)));
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableOrigin>(std::move(o));
|
|
}
|
|
|
|
scylla_metadata::scylla_version version;
|
|
version.value = bytes(to_bytes_view(std::string_view(scylla_version())));
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::ScyllaVersion>(std::move(version));
|
|
scylla_metadata::scylla_build_id build_id;
|
|
build_id.value = bytes(to_bytes_view(std::string_view(get_build_id())));
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::ScyllaBuildId>(std::move(build_id));
|
|
if (ts_stats) {
|
|
if (sstlog.is_enabled(log_level::debug)) {
|
|
std::optional<api::timestamp_type> min_live_timestamp;
|
|
std::optional<api::timestamp_type> min_live_row_marker_timestamp;
|
|
if (auto it = ts_stats->map.find(sstables::ext_timestamp_stats_type::min_live_timestamp); it != ts_stats->map.end()) {
|
|
min_live_timestamp = it->second;
|
|
}
|
|
if (auto it = ts_stats->map.find(sstables::ext_timestamp_stats_type::min_live_row_marker_timestamp); it != ts_stats->map.end()) {
|
|
min_live_row_marker_timestamp = it->second;
|
|
}
|
|
sstlog.debug("Storing sstable {}: min_timestamp={} min_live_timestamp={} min_live_row_marker_timestamp={}",
|
|
get_filename(),
|
|
get_stats_metadata().min_timestamp,
|
|
min_live_timestamp,
|
|
min_live_row_marker_timestamp);
|
|
}
|
|
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::ExtTimestampStats>(std::move(*ts_stats));
|
|
}
|
|
|
|
sstable_id sid;
|
|
if (generation().is_uuid_based()) {
|
|
sid = sstable_id(generation().as_uuid());
|
|
} else {
|
|
sid = sstable_id(utils::UUID_gen::get_time_UUID());
|
|
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
|
|
}
|
|
_components->scylla_metadata->set_sstable_identifier(sid);
|
|
|
|
sstable_schema_type sstable_schema;
|
|
sstable_schema.id = _schema->id();
|
|
sstable_schema.version = _schema->version();
|
|
sstable_schema.keyspace_name.value = to_bytes(_schema->ks_name());
|
|
sstable_schema.table_name.value = to_bytes(_schema->cf_name());
|
|
for (const auto& col : _schema->all_columns()) {
|
|
sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}});
|
|
}
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::Schema>(std::move(sstable_schema));
|
|
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(scylla_metadata::components_digests{_components_digests});
|
|
|
|
_components->scylla_metadata->digest = serialized_checksum(_version, _components->scylla_metadata->data);
|
|
|
|
write_simple<component_type::Scylla>(*_components->scylla_metadata);
|
|
}
|
|
|
|
bool sstable::may_contain_rows(const query::clustering_row_ranges& ranges) const {
|
|
if (_version < sstables::sstable_version_types::md) {
|
|
return true;
|
|
}
|
|
|
|
// Include sstables with tombstones that are not scylla's since
|
|
// they may contain partition tombstones that are not taken into
|
|
// account in min/max column names metadata.
|
|
// We clear min/max metadata for partition tombstones so they
|
|
// will match as containing the rows we're looking for.
|
|
if (!has_scylla_component()) {
|
|
if (get_stats_metadata().estimated_tombstone_drop_time.bin.size()) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return std::ranges::any_of(ranges, [this] (const query::clustering_range& range) {
|
|
return _min_max_position_range.overlaps(*_schema,
|
|
position_in_partition_view::for_range_start(range),
|
|
position_in_partition_view::for_range_end(range));
|
|
});
|
|
}
|
|
|
|
future<> sstable::seal_sstable(bool backup)
|
|
{
|
|
co_await _storage->seal(*this);
|
|
if (_marked_for_deletion == mark_for_deletion::implicit) {
|
|
_marked_for_deletion = mark_for_deletion::none;
|
|
}
|
|
if (backup) {
|
|
co_await _storage->snapshot(*this, "backups");
|
|
}
|
|
}
|
|
|
|
sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions,
|
|
const sstable_writer_config& cfg, encoding_stats enc_stats, shard_id shard)
|
|
{
|
|
// Mark sstable for implicit deletion if destructed before it is sealed.
|
|
_marked_for_deletion = mark_for_deletion::implicit;
|
|
return sstable_writer(*this, s, estimated_partitions, cfg, enc_stats, shard);
|
|
}
|
|
|
|
future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
|
|
std::function<void(sstring)> error_handler, sstables::read_monitor& monitor, bool validate_index) {
|
|
auto handle_sstable_exception = [&error_handler](const malformed_sstable_exception& e, uint64_t& errors) -> std::exception_ptr {
|
|
std::exception_ptr ex;
|
|
try {
|
|
error_handler(seastar::format("unrecoverable error: {}", e));
|
|
++errors;
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
return ex;
|
|
};
|
|
|
|
uint64_t errors = 0;
|
|
std::exception_ptr ex;
|
|
lw_shared_ptr<checksum> checksum;
|
|
try {
|
|
checksum = co_await read_checksum();
|
|
co_await read_digest();
|
|
if (validate_index) {
|
|
co_await validate_index_digest();
|
|
}
|
|
} catch (const malformed_sstable_exception& e) {
|
|
ex = handle_sstable_exception(e, errors);
|
|
}
|
|
if (ex) {
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
if (errors) {
|
|
co_return errors;
|
|
}
|
|
co_await utils::get_local_injector().inject("sstable_validate/pause", utils::wait_for_message(std::chrono::seconds(5)));
|
|
|
|
if (_version >= sstable_version_types::mc) {
|
|
co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor);
|
|
}
|
|
|
|
auto reader = make_full_scan_reader(_schema, permit, nullptr, monitor, integrity_check::yes);
|
|
|
|
try {
|
|
auto validator = mutation_fragment_stream_validator(*_schema);
|
|
|
|
while (auto mf_opt = co_await reader()) {
|
|
if (abort.abort_requested()) [[unlikely]] {
|
|
break;
|
|
}
|
|
|
|
const auto& mf = *mf_opt;
|
|
|
|
if (auto res = validator(mf); !res) {
|
|
error_handler(res.what());
|
|
validator.reset(mf);
|
|
++errors;
|
|
}
|
|
|
|
if (mf.is_partition_start()) {
|
|
const auto& ps = mf.as_partition_start();
|
|
if (auto res = validator(ps.key()); !res) {
|
|
error_handler(res.what());
|
|
validator.reset(ps.key());
|
|
++errors;
|
|
}
|
|
}
|
|
}
|
|
if (auto res = validator.on_end_of_stream(); !res) {
|
|
error_handler(res.what());
|
|
++errors;
|
|
}
|
|
} catch (const malformed_sstable_exception& e) {
|
|
ex = handle_sstable_exception(e, errors);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await reader.close();
|
|
|
|
if (ex) {
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
|
|
co_return errors;
|
|
}
|
|
|
|
// Encoding stats for compaction are based on the sstable's stats metadata
|
|
// since, in contract to the mc-format encoding_stats that are evaluated
|
|
// before the sstable data is written, the stats metadata is updated during
|
|
// writing so it provides actual minimum values of the written timestamps.
|
|
encoding_stats sstable::get_encoding_stats_for_compaction() const {
|
|
encoding_stats enc_stats;
|
|
|
|
auto& stats = get_stats_metadata();
|
|
enc_stats.min_timestamp = stats.min_timestamp;
|
|
enc_stats.min_local_deletion_time = gc_clock::time_point(gc_clock::duration(stats.min_local_deletion_time));
|
|
enc_stats.min_ttl = gc_clock::duration(stats.min_ttl);
|
|
|
|
return enc_stats;
|
|
}
|
|
|
|
void sstable::assert_large_data_handler_is_running() {
|
|
if (!get_large_data_handler().running()) {
|
|
on_internal_error(sstlog, "The large data handler is not running");
|
|
}
|
|
}
|
|
|
|
future<> sstable::write_components(
|
|
mutation_reader mr,
|
|
uint64_t estimated_partitions,
|
|
schema_ptr schema,
|
|
const sstable_writer_config& cfg,
|
|
encoding_stats stats) {
|
|
assert_large_data_handler_is_running();
|
|
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats] () mutable {
|
|
auto close_mr = deferred_close(mr);
|
|
auto wr = get_writer(*schema, estimated_partitions, cfg, stats);
|
|
mr.consume_in_thread(std::move(wr));
|
|
}).finally([this] {
|
|
assert_large_data_handler_is_running();
|
|
});
|
|
}
|
|
|
|
future<> sstable::generate_summary() {
|
|
if (_components->summary) {
|
|
co_return;
|
|
}
|
|
|
|
sstlog.info("Summary file {} not found. Generating Summary...", filename(component_type::Summary));
|
|
class summary_generator {
|
|
const dht::i_partitioner& _partitioner;
|
|
summary& _summary;
|
|
index_sampling_state _state;
|
|
public:
|
|
std::optional<key> first_key, last_key;
|
|
|
|
summary_generator(const dht::i_partitioner& p, summary& s, double summary_ratio) : _partitioner(p), _summary(s) {
|
|
_state.summary_byte_cost = summary_byte_cost(summary_ratio);
|
|
}
|
|
bool should_continue() {
|
|
return true;
|
|
}
|
|
void consume_entry(parsed_partition_index_entry&& e) {
|
|
auto token = _partitioner.get_token(key_view(to_bytes_view(e.key)));
|
|
maybe_add_summary_entry(_summary, token, to_bytes_view(e.key), e.data_file_offset, e.index_offset, _state);
|
|
if (!first_key) {
|
|
first_key = key(to_bytes(to_bytes_view(e.key)));
|
|
} else {
|
|
last_key = key(to_bytes(to_bytes_view(e.key)));
|
|
}
|
|
}
|
|
const index_sampling_state& state() const {
|
|
return _state;
|
|
}
|
|
};
|
|
|
|
auto index_file = co_await new_sstable_component_file(_read_error_handler, component_type::Index, open_flags::ro);
|
|
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::generate_summary()",
|
|
reader_concurrency_semaphore::register_metrics::no);
|
|
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
auto index_size = co_await index_file.size();
|
|
// an upper bound. Surely to be less than this.
|
|
auto estimated_partitions = std::max<uint64_t>(index_size / sizeof(uint64_t), 1);
|
|
prepare_summary(_components->summary, estimated_partitions, _schema->min_index_interval());
|
|
|
|
file_input_stream_options options;
|
|
options.buffer_size = sstable_buffer_size;
|
|
|
|
auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.get_config().sstable_summary_ratio);
|
|
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
|
*this, sem.make_tracking_only_permit(_schema, "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes,
|
|
make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size,
|
|
get_column_translation(*_schema), _manager._abort);
|
|
|
|
try {
|
|
co_await ctx->consume_input();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await ctx->close();
|
|
|
|
if (ex) {
|
|
std::rethrow_exception(std::exchange(ex, {}));
|
|
}
|
|
|
|
co_await seal_summary(_components->summary, std::move(s.first_key), std::move(s.last_key), s.state());
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await sem.stop();
|
|
|
|
try {
|
|
co_await index_file.close();
|
|
} catch (...) {
|
|
sstlog.warn("sstable close index_file failed: {}", std::current_exception());
|
|
general_disk_error();
|
|
}
|
|
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
bool sstable::is_shared() const {
|
|
if (_shards.empty()) {
|
|
on_internal_error(sstlog, format("Shards weren't computed for SSTable: {}", get_filename()));
|
|
}
|
|
return _shards.size() > 1;
|
|
}
|
|
|
|
uint64_t sstable::data_size() const {
|
|
if (has_component(component_type::CompressionInfo)) {
|
|
return _components->compression.uncompressed_file_length();
|
|
}
|
|
return _data_file_size;
|
|
}
|
|
|
|
uint64_t sstable::ondisk_data_size() const {
|
|
return _data_file_size;
|
|
}
|
|
|
|
file_size_stats sstable::get_file_size_stats() const {
|
|
if (!_metadata_size_on_disk) {
|
|
on_internal_error(sstlog, "On-disk size of sstable metadata was not set");
|
|
}
|
|
if (!_data_file_size) {
|
|
on_internal_error(sstlog, "On-disk size of sstable data was not set");
|
|
}
|
|
if (!_index_file_size && !_partitions_file_size) {
|
|
on_internal_error(sstlog, "On-disk size of sstable index was not set");
|
|
}
|
|
uint64_t size_without_data = _metadata_size_on_disk + _index_file_size + _partitions_file_size + _rows_file_size;
|
|
|
|
file_size_stats stats;
|
|
stats.on_disk = size_without_data + _data_file_size;
|
|
stats.before_compression = size_without_data + data_size();
|
|
return stats;
|
|
}
|
|
|
|
uint64_t sstable::bytes_on_disk() const {
|
|
return get_file_size_stats().on_disk;
|
|
}
|
|
|
|
uint64_t sstable::filter_size() const {
|
|
return _components->filter->memory_size();
|
|
}
|
|
|
|
bool sstable::has_component(component_type f) const {
|
|
return _recognized_components.contains(f);
|
|
}
|
|
|
|
std::optional<bool> sstable::originated_on_this_node() const {
|
|
if (_version < version_types::me) {
|
|
// earlier formats do not store originating host id
|
|
return std::nullopt;
|
|
}
|
|
|
|
auto originating_host_id = get_stats_metadata().originating_host_id;
|
|
if (!originating_host_id) {
|
|
// Scylla always fills in originating host id when writing
|
|
// sstables, so an ME-and-up sstable that does not have it is
|
|
// invalid
|
|
on_internal_error(sstlog, format("No originating host id in SSTable: {}. Load foreign SSTables via the upload dir instead.", get_filename()));
|
|
}
|
|
|
|
auto local_host_id = _manager.get_local_host_id();
|
|
if (!local_host_id) {
|
|
// we don't know the local host id before it is loaded from
|
|
// (or generated and written to) system.local, but some system
|
|
// sstable reads must happen before the bootstrap process gets
|
|
// there (like in the resharding case)
|
|
return std::nullopt;
|
|
}
|
|
|
|
return *originating_host_id == local_host_id;
|
|
}
|
|
|
|
void sstable::validate_originating_host_id() const {
|
|
if (_version < version_types::me) {
|
|
// earlier formats do not store originating host id
|
|
return;
|
|
}
|
|
|
|
auto originating_host_id = get_stats_metadata().originating_host_id;
|
|
if (!originating_host_id) {
|
|
// Scylla always fills in originating host id when writing
|
|
// sstables, so an ME-and-up sstable that does not have it is
|
|
// invalid
|
|
throw std::runtime_error(format("No originating host id in SSTable: {}. Load foreign SSTables via the upload dir instead.", get_filename()));
|
|
}
|
|
|
|
auto local_host_id = _manager.get_local_host_id();
|
|
if (!local_host_id) {
|
|
// we don't know the local host id before it is loaded from
|
|
// (or generated and written to) system.local, but some system
|
|
// sstable reads must happen before the bootstrap process gets
|
|
// there, if that's not the case, it's a sign of bug.
|
|
auto msg = format("Unknown local host id while validating SSTable: {}", get_filename());
|
|
if (is_system_keyspace(_schema->ks_name())) {
|
|
sstlog.trace("{}", msg);
|
|
} else {
|
|
on_internal_error(sstlog, msg);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, sstring component) {
|
|
sstring v = fmt::to_string(version);
|
|
sstring g = to_sstring(generation);
|
|
sstring f = fmt::to_string(format);
|
|
switch (version) {
|
|
case sstable::version_types::ka:
|
|
return ks + "-" + cf + "-" + v + "-" + g + "-" + component;
|
|
case sstable::version_types::la:
|
|
return v + "-" + g + "-" + f + "-" + component;
|
|
case sstable::version_types::mc:
|
|
case sstable::version_types::md:
|
|
case sstable::version_types::me:
|
|
case sstable::version_types::ms:
|
|
return v + "-" + g + "-" + f + "-" + component;
|
|
}
|
|
on_internal_error(sstlog, seastar::format("invalid version {} for sstable: table={}.{}, generation={}, format={}, component={}",
|
|
static_cast<std::underlying_type<version_types>::type>(version), ks, cf, g, f, component));
|
|
}
|
|
|
|
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, component_type component) {
|
|
return component_basename(ks, cf, version, generation, format,
|
|
sstable_version_constants::get_component_map(version).at(component));
|
|
}
|
|
|
|
sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, component_type component) {
|
|
return dir + "/" + component_basename(ks, cf, version, generation, format, component);
|
|
}
|
|
|
|
sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
|
format_types format, sstring component) {
|
|
return dir + "/" + component_basename(ks, cf, version, generation, format, component);
|
|
}
|
|
|
|
std::vector<std::pair<component_type, sstring>> sstable::all_components() const {
|
|
std::vector<std::pair<component_type, sstring>> all;
|
|
all.reserve(_recognized_components.size() + _unrecognized_components.size());
|
|
for (auto& c : _recognized_components) {
|
|
all.push_back(std::make_pair(c, sstable_version_constants::get_component_map(_version).at(c)));
|
|
}
|
|
for (auto& c : _unrecognized_components) {
|
|
all.push_back(std::make_pair(component_type::Unknown, c));
|
|
}
|
|
return all;
|
|
}
|
|
|
|
future<> sstable::snapshot(const sstring& name) const {
|
|
auto lock = co_await get_units(_mutate_sem, 1);
|
|
co_await _storage->snapshot(*this, format("{}/{}", sstables::snapshots_dir, name));
|
|
}
|
|
|
|
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {
|
|
auto lock = co_await get_units(_mutate_sem, 1);
|
|
co_await _storage->change_state(*this, to, _generation, delay_commit);
|
|
_state = to;
|
|
}
|
|
|
|
future<> sstable::pick_up_from_upload(sstable_state to, generation_type new_generation) {
|
|
// just in case, not really needed as the sstable is not yet in use while in the upload dir
|
|
auto lock = co_await get_units(_mutate_sem, 1);
|
|
co_await _storage->change_state(*this, to, new_generation, nullptr);
|
|
_generation = std::move(new_generation);
|
|
_state = to;
|
|
}
|
|
|
|
future<> delayed_commit_changes::commit() {
|
|
return parallel_for_each(_dirs, [] (sstring dir) {
|
|
return sync_directory(dir);
|
|
});
|
|
}
|
|
|
|
mutation_reader
|
|
sstable::make_reader(
|
|
schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor& mon,
|
|
integrity_check integrity,
|
|
const utils::hashed_key* single_partition_read_murmur_hash
|
|
) {
|
|
const auto reversed = slice.is_reversed();
|
|
|
|
auto index_caching = use_caching(global_cache_index_pages && !slice.options.contains(query::partition_slice::option::bypass_cache));
|
|
auto index_reader = make_index_reader(permit, trace_state, index_caching, range.is_singular());
|
|
|
|
if (_version >= version_types::mc && (!reversed || range.is_singular())) {
|
|
return mx::make_reader(
|
|
shared_from_this(),
|
|
std::move(query_schema),
|
|
std::move(permit),
|
|
range,
|
|
slice,
|
|
std::move(trace_state),
|
|
fwd,
|
|
fwd_mr,
|
|
mon,
|
|
integrity,
|
|
std::move(index_reader),
|
|
single_partition_read_murmur_hash);
|
|
}
|
|
|
|
// Multi-partition reversed queries are not yet supported natively in the mx reader.
|
|
// Therefore in this case we use `make_reversing_reader` over the forward reader.
|
|
// FIXME: remove this workaround eventually.
|
|
auto max_result_size = permit.max_result_size();
|
|
|
|
if (_version >= version_types::mc) {
|
|
// The only mx case falling through here is reversed multi-partition reader
|
|
auto rd = make_reversing_reader(mx::make_reader(shared_from_this(), query_schema->make_reversed(), std::move(permit),
|
|
range, reverse_slice(*query_schema, slice), std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon,
|
|
integrity, std::move(index_reader), single_partition_read_murmur_hash),
|
|
max_result_size);
|
|
if (fwd) {
|
|
rd = make_forwardable(std::move(rd));
|
|
}
|
|
return rd;
|
|
}
|
|
|
|
if (reversed) {
|
|
// The kl reader does not support reversed queries at all.
|
|
// Perform a forward query on it, then reverse the result.
|
|
auto rd = make_reversing_reader(kl::make_reader(shared_from_this(), query_schema->make_reversed(), std::move(permit),
|
|
range, reverse_slice(*query_schema, slice), std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon), max_result_size);
|
|
if (fwd) {
|
|
rd = make_forwardable(std::move(rd));
|
|
}
|
|
return rd;
|
|
}
|
|
|
|
return kl::make_reader(shared_from_this(), query_schema, std::move(permit),
|
|
range, slice, std::move(trace_state), fwd, fwd_mr, mon);
|
|
}
|
|
|
|
mutation_reader
|
|
sstable::make_full_scan_reader(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state,
|
|
read_monitor& monitor,
|
|
integrity_check integrity) {
|
|
if (_version >= version_types::mc) {
|
|
return mx::make_full_scan_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
|
|
}
|
|
return kl::make_full_scan_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
|
|
}
|
|
|
|
static std::tuple<entry_descriptor, sstring, sstring> make_entry_descriptor(const std::filesystem::path& sst_path, sstring* const provided_ks, sstring* const provided_cf) {
|
|
// examples of fname look like
|
|
// la-42-big-Data.db
|
|
// ka-42-big-Data.db
|
|
// me-3g8w_00qf_4pbog2i7h2c7am0uoe-big-Data.db
|
|
static boost::regex la_mx("(la|m[cdes])-([^-]+)-(\\w+)-(.*)");
|
|
static boost::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
|
|
|
|
// Use non-greedy match so that a snapshot tag that ressembles a name-<uuid> wouldn't match
|
|
// the keyspace/table-<uuid> part of the regular expression.
|
|
// See https://github.com/scylladb/scylladb/issues/25242
|
|
static boost::regex dir(format(".*?/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/({}|{}|{}|{})(?:/[^/]+)?)?/?",
|
|
sstables::staging_dir, sstables::quarantine_dir, sstables::upload_dir, sstables::snapshots_dir).c_str());
|
|
|
|
boost::smatch match;
|
|
|
|
sstable::version_types version;
|
|
|
|
const auto ks_cf_provided = provided_ks && provided_cf;
|
|
|
|
sstring generation;
|
|
sstring format;
|
|
sstring component;
|
|
sstring ks;
|
|
sstring cf;
|
|
if (ks_cf_provided) {
|
|
ks = std::move(*provided_ks);
|
|
cf = std::move(*provided_cf);
|
|
}
|
|
|
|
const auto sstdir = sst_path.parent_path();
|
|
const auto fname = sst_path.filename();
|
|
sstlog.debug("Make descriptor sstdir: {}; fname: {}", sstdir, fname);
|
|
std::string s(fname);
|
|
if (boost::regex_match(s, match, la_mx)) {
|
|
std::string sdir(sstdir);
|
|
boost::smatch dirmatch;
|
|
if (!ks_cf_provided) {
|
|
if (boost::regex_match(sdir, dirmatch, dir)) {
|
|
ks = dirmatch[1].str();
|
|
cf = dirmatch[2].str();
|
|
} else {
|
|
throw malformed_sstable_exception(seastar::format("invalid path for file {}: {}. Path doesn't match known pattern.", fname, sstdir));
|
|
}
|
|
}
|
|
version = version_from_string(match[1].str());
|
|
generation = match[2].str();
|
|
format = sstring(match[3].str());
|
|
component = sstring(match[4].str());
|
|
} else if (boost::regex_match(s, match, ka)) {
|
|
if (!ks_cf_provided) {
|
|
ks = match[1].str();
|
|
cf = match[2].str();
|
|
}
|
|
version = sstable::version_types::ka;
|
|
format = sstring("big");
|
|
generation = match[3].str();
|
|
component = sstring(match[4].str());
|
|
} else {
|
|
throw malformed_sstable_exception(seastar::format("invalid version for file {}. Name doesn't match any known version.", fname));
|
|
}
|
|
return std::make_tuple(entry_descriptor(generation_type::from_string(generation), version, format_from_string(format), sstable::component_from_sstring(version, component)), ks, cf);
|
|
}
|
|
|
|
std::tuple<entry_descriptor, sstring, sstring> parse_path(const std::filesystem::path& sst_path) {
|
|
return make_entry_descriptor(sst_path, nullptr, nullptr);
|
|
}
|
|
|
|
entry_descriptor parse_path(const std::filesystem::path& sst_path, sstring ks, sstring cf) {
|
|
auto full = make_entry_descriptor(sst_path, &ks, &cf);
|
|
return std::get<0>(full);
|
|
}
|
|
|
|
sstable_version_types version_from_string(std::string_view s) {
|
|
try {
|
|
return reverse_map(s, version_string);
|
|
} catch (std::out_of_range&) {
|
|
throw std::out_of_range(seastar::format("Unknown sstable version: {}", s));
|
|
}
|
|
}
|
|
|
|
sstable_format_types format_from_string(std::string_view s) {
|
|
try {
|
|
return reverse_map(s, format_string);
|
|
} catch (std::out_of_range&) {
|
|
throw std::out_of_range(seastar::format("Unknown sstable format: {}", s));
|
|
}
|
|
}
|
|
|
|
bool has_summary_and_index(sstable_version_types v) {
|
|
return v != sstable_version_types::ms;
|
|
}
|
|
|
|
component_type sstable::component_from_sstring(version_types v, const sstring &s) {
|
|
try {
|
|
return reverse_map(s, sstable_version_constants::get_component_map(v));
|
|
} catch (std::out_of_range&) {
|
|
return component_type::Unknown;
|
|
}
|
|
}
|
|
|
|
future<input_stream<char>> sstable::data_stream(uint64_t pos, size_t len,
|
|
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history, raw_stream raw,
|
|
integrity_check integrity, integrity_error_handler error_handler) {
|
|
file_input_stream_options options;
|
|
options.buffer_size = sstable_buffer_size;
|
|
options.read_ahead = 4;
|
|
options.dynamic_adjustments = std::move(history);
|
|
return data_stream(pos, len, permit, std::move(trace_state), history, std::move(options), raw, integrity, std::move(error_handler));
|
|
}
|
|
|
|
future<input_stream<char>> sstable::data_stream(uint64_t pos, size_t len,
|
|
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history,
|
|
file_input_stream_options options,
|
|
raw_stream raw, integrity_check integrity,
|
|
integrity_error_handler error_handler) {
|
|
|
|
file f = make_tracked_file(_data_file, permit);
|
|
if (trace_state) {
|
|
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
|
|
}
|
|
|
|
std::optional<uint32_t> digest;
|
|
if (integrity == integrity_check::yes) {
|
|
digest = get_digest();
|
|
}
|
|
auto stream_creator = [this, f](uint64_t pos, uint64_t len, file_input_stream_options options) mutable -> future<input_stream<char>> {
|
|
co_return input_stream<char>(co_await _storage->make_data_or_index_source(*this, component_type::Data, std::move(f), pos, len, std::move(options)));
|
|
};
|
|
if (_components->compression && raw == raw_stream::no) {
|
|
if (_version >= sstable_version_types::mc) {
|
|
co_return make_compressed_file_m_format_input_stream(stream_creator, &_components->compression,
|
|
pos, len, std::move(options), permit, digest);
|
|
} else {
|
|
co_return make_compressed_file_k_l_format_input_stream(stream_creator, &_components->compression,
|
|
pos, len, std::move(options), permit, digest);
|
|
}
|
|
}
|
|
|
|
if (_components->compression && raw == raw_stream::compressed_chunks && _version >= sstable_version_types::mc) {
|
|
co_return make_compressed_raw_file_input_stream(stream_creator, &_components->compression, std::move(options), permit, digest);
|
|
}
|
|
|
|
if (_components->checksum && integrity == integrity_check::yes) {
|
|
auto checksum = get_checksum();
|
|
auto file_len = data_size();
|
|
if (_version >= sstable_version_types::mc) {
|
|
co_return make_checksummed_file_m_format_input_stream(stream_creator, file_len,
|
|
*checksum, pos, len, std::move(options), digest, error_handler);
|
|
} else {
|
|
co_return make_checksummed_file_k_l_format_input_stream(stream_creator, file_len,
|
|
*checksum, pos, len, std::move(options), digest, error_handler);
|
|
}
|
|
}
|
|
co_return co_await stream_creator(pos, len, std::move(options));
|
|
}
|
|
|
|
future<temporary_buffer<char>> sstable::data_read(uint64_t pos, size_t len, reader_permit permit) {
|
|
auto stream = co_await data_stream(pos, len, std::move(permit), tracing::trace_state_ptr(), {});
|
|
auto buff = co_await stream.read_exactly(len);
|
|
co_await stream.close();
|
|
co_return buff;
|
|
}
|
|
|
|
template <typename ChecksumType>
|
|
static future<bool> do_validate_compressed(input_stream<char>& stream, const sstables::compression& c, bool checksum_all, std::optional<uint32_t> expected_digest) {
|
|
bool valid = true;
|
|
uint64_t offset = 0;
|
|
uint32_t actual_full_checksum = ChecksumType::init_checksum();
|
|
|
|
auto accessor = c.offsets.get_accessor();
|
|
for (size_t i = 0; i < c.offsets.size(); ++i) {
|
|
auto current_pos = accessor.at(i);
|
|
auto next_pos = i + 1 == c.offsets.size() ? c.compressed_file_length() : accessor.at(i + 1);
|
|
auto chunk_len = next_pos - current_pos;
|
|
auto buf = co_await stream.read_exactly(chunk_len);
|
|
|
|
if (!chunk_len) {
|
|
sstlog.error("Found unexpected chunk of length 0 at offset {}", offset);
|
|
valid = false;
|
|
break;
|
|
}
|
|
|
|
if (buf.size() < chunk_len) {
|
|
sstlog.error("Truncated file at offset {}: expected to get chunk of size {}, got {}", offset, chunk_len, buf.size());
|
|
valid = false;
|
|
break;
|
|
}
|
|
|
|
auto compressed_len = chunk_len - 4;
|
|
auto expected_checksum = read_be<uint32_t>(buf.get() + compressed_len);
|
|
auto actual_checksum = ChecksumType::checksum(buf.get(), compressed_len);
|
|
if (actual_checksum != expected_checksum) {
|
|
sstlog.error("Compressed chunk checksum mismatch at offset {}, for chunk #{} of size {}: expected={}, actual={}", offset, i, chunk_len, expected_checksum, actual_checksum);
|
|
valid = false;
|
|
}
|
|
|
|
if (expected_digest) {
|
|
actual_full_checksum = checksum_combine_or_feed<ChecksumType>(actual_full_checksum, actual_checksum, buf.get(), compressed_len);
|
|
if (checksum_all) {
|
|
uint32_t be_actual_checksum = cpu_to_be(actual_checksum);
|
|
actual_full_checksum = ChecksumType::checksum(actual_full_checksum,
|
|
reinterpret_cast<const char*>(&be_actual_checksum), sizeof(be_actual_checksum));
|
|
}
|
|
}
|
|
|
|
offset += chunk_len;
|
|
}
|
|
|
|
if (expected_digest && actual_full_checksum != *expected_digest) {
|
|
sstlog.error("Full checksum mismatch: expected={}, actual={}", *expected_digest, actual_full_checksum);
|
|
valid = false;
|
|
}
|
|
|
|
co_return valid;
|
|
}
|
|
|
|
static future<bool> do_validate_uncompressed(input_stream<char>& stream, size_t expected_chunks, uint32_t chunk_size) {
|
|
uint64_t offset = 0;
|
|
do {
|
|
auto buf = co_await stream.read();
|
|
offset += buf.size();
|
|
} while (!stream.eof());
|
|
|
|
uint64_t actual_chunks = (offset + chunk_size - 1) / chunk_size;
|
|
if (actual_chunks != expected_chunks) {
|
|
sstlog.error("Chunk count mismatch between CRC and Data.db at offset {}: expected {} chunks of size {} but data file has {}", offset, expected_chunks, chunk_size, actual_chunks);
|
|
co_return false;
|
|
}
|
|
co_return true;
|
|
}
|
|
|
|
future<uint32_t> sstable::read_digest_from_file(file f) {
|
|
sstring digest_str;
|
|
file_input_stream_options options;
|
|
options.buffer_size = 4096;
|
|
|
|
auto digest_stream = make_file_input_stream(std::move(f), options);
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
digest_str = co_await util::read_entire_stream_contiguous(digest_stream);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await digest_stream.close();
|
|
maybe_rethrow_exception(std::move(ex));
|
|
|
|
co_return boost::lexical_cast<uint32_t>(digest_str);
|
|
}
|
|
|
|
|
|
future<std::optional<uint32_t>> sstable::read_digest() {
|
|
if (_components->digest) {
|
|
co_return *_components->digest;
|
|
}
|
|
if (!has_component(component_type::Digest) || _unlinked) {
|
|
co_return std::nullopt;
|
|
}
|
|
uint32_t digest;
|
|
|
|
co_await do_read_simple(component_type::Digest, [&] (version_types v, file digest_file) -> future<> {
|
|
digest = co_await read_digest_from_file(std::move(digest_file));
|
|
});
|
|
|
|
_components->digest = digest;
|
|
co_return _components->digest;
|
|
}
|
|
|
|
future<std::optional<uint32_t>> sstable::read_digest(file f) {
|
|
if (_components->digest) {
|
|
co_return *_components->digest;
|
|
}
|
|
if (!has_component(component_type::Digest)) {
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
_components->digest = co_await read_digest_from_file(std::move(f));
|
|
co_return _components->digest;
|
|
}
|
|
|
|
future<lw_shared_ptr<checksum>> sstable::read_checksum_from_file(file f) {
|
|
auto checksum = make_lw_shared<sstables::checksum>();
|
|
|
|
file_input_stream_options options;
|
|
options.buffer_size = 4096;
|
|
|
|
auto crc_stream = make_file_input_stream(std::move(f), options);
|
|
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
const auto size = sizeof(uint32_t);
|
|
|
|
auto buf = co_await crc_stream.read_exactly(size);
|
|
check_buf_size(buf, size);
|
|
checksum->chunk_size = net::ntoh(read_unaligned<uint32_t>(buf.get()));
|
|
|
|
buf = co_await crc_stream.read_exactly(size);
|
|
while (!buf.empty()) {
|
|
check_buf_size(buf, size);
|
|
checksum->checksums.push_back(net::ntoh(read_unaligned<uint32_t>(buf.get())));
|
|
buf = co_await crc_stream.read_exactly(size);
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await crc_stream.close();
|
|
maybe_rethrow_exception(std::move(ex));
|
|
|
|
co_return checksum;
|
|
}
|
|
|
|
|
|
future<lw_shared_ptr<checksum>> sstable::read_checksum(file f) {
|
|
auto checksum = co_await read_checksum_from_file(std::move(f));
|
|
|
|
if (!_components->checksum) {
|
|
_components->checksum = checksum->weak_from_this();
|
|
} else {
|
|
// Race condition: Another fiber/thread has called `read_checksum()`
|
|
// while we were loading the component from disk. Discard our local
|
|
// copy and use theirs.
|
|
checksum = _components->checksum->shared_from_this();
|
|
}
|
|
|
|
co_return checksum;
|
|
}
|
|
|
|
future<lw_shared_ptr<checksum>> sstable::read_checksum() {
|
|
if (_components->checksum) {
|
|
co_return _components->checksum->shared_from_this();
|
|
}
|
|
if (!has_component(component_type::CRC) || _unlinked) {
|
|
co_return nullptr;
|
|
}
|
|
lw_shared_ptr<checksum> checksum;
|
|
co_await do_read_simple(component_type::CRC, [&checksum, this] (version_types v, file crc_file) -> future<> {
|
|
checksum = co_await read_checksum_from_file(std::move(crc_file));
|
|
|
|
if (!_components->checksum) {
|
|
_components->checksum = checksum->weak_from_this();
|
|
} else {
|
|
// Race condition: Another fiber/thread has called `read_checksum()`
|
|
// while we were loading the component from disk. Discard our local
|
|
// copy and use theirs.
|
|
checksum = _components->checksum->shared_from_this();
|
|
}
|
|
});
|
|
|
|
co_return std::move(checksum);
|
|
}
|
|
|
|
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit) {
|
|
const auto digest = co_await sst->read_digest();
|
|
validate_checksums_result ret = {
|
|
validate_checksums_status::valid,
|
|
digest.has_value()
|
|
};
|
|
|
|
auto checksum = co_await sst->read_checksum();
|
|
if (!checksum && !sst->get_compression()) {
|
|
sstlog.warn("No checksums available for SSTable: {}", sst->get_filename());
|
|
ret.status = validate_checksums_status::no_checksum;
|
|
co_return ret;
|
|
}
|
|
|
|
input_stream<char> data_stream = co_await (sst->get_compression()
|
|
? sst->data_stream(0, sst->ondisk_data_size(), permit,
|
|
nullptr, nullptr, sstable::raw_stream::yes)
|
|
: sst->data_stream(0, sst->data_size(), permit,
|
|
nullptr, nullptr, sstable::raw_stream::no,
|
|
integrity_check::yes, [&ret](sstring msg) {
|
|
sstlog.error("{}", msg);
|
|
ret.status = validate_checksums_status::invalid;
|
|
})
|
|
);
|
|
|
|
auto valid = true;
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
if (sst->get_compression()) {
|
|
if (sst->get_version() >= sstable_version_types::mc) {
|
|
valid = co_await do_validate_compressed<crc32_utils>(data_stream, sst->get_compression(), true, digest);
|
|
} else {
|
|
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, digest);
|
|
}
|
|
} else {
|
|
valid = co_await do_validate_uncompressed(data_stream, checksum->checksums.size(), checksum->chunk_size);
|
|
}
|
|
} catch (malformed_sstable_exception& e) {
|
|
valid = false;
|
|
sstlog.error("{}", e.what());
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await data_stream.close();
|
|
maybe_rethrow_exception(std::move(ex));
|
|
|
|
if (!valid) {
|
|
ret.status = validate_checksums_status::invalid;
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
void sstable::set_first_and_last_keys() {
|
|
if (_first && _last) {
|
|
return;
|
|
}
|
|
auto decorate_key = [this] (const char *m, const bytes& value) {
|
|
auto pk = key::from_bytes(value).to_partition_key(*_schema);
|
|
return dht::decorate_key(*_schema, std::move(pk));
|
|
};
|
|
std::optional<dht::decorated_key> first;
|
|
std::optional<dht::decorated_key> last;
|
|
if (_components->summary) {
|
|
first = decorate_key("first", _components->summary.first_key.value);
|
|
last = decorate_key("last", _components->summary.last_key.value);
|
|
} else if (_partitions_db_footer) {
|
|
first = decorate_key("first", _partitions_db_footer->first_key.get_bytes());
|
|
last = decorate_key("last", _partitions_db_footer->last_key.get_bytes());
|
|
} else {
|
|
throw malformed_sstable_exception(format("{}: neither Summary.db nor Partitions.db component is present, can't determine first and last partition key", get_filename()));
|
|
}
|
|
if (first.value().tri_compare(*_schema, last.value()) > 0) {
|
|
throw malformed_sstable_exception(format("{}: first and last keys of summary are misordered: first={} > last={}", get_filename(), first.value(), last.value()));
|
|
}
|
|
_first = std::move(first.value());
|
|
_last = std::move(last.value());
|
|
}
|
|
|
|
const partition_key& sstable::get_first_partition_key() const {
|
|
return get_first_decorated_key().key();
|
|
}
|
|
|
|
const partition_key& sstable::get_last_partition_key() const {
|
|
return get_last_decorated_key().key();
|
|
}
|
|
|
|
const dht::decorated_key& sstable::get_first_decorated_key() const {
|
|
if (!_first) {
|
|
throw std::runtime_error(format("first key of {} wasn't set", get_filename()));
|
|
}
|
|
return *_first;
|
|
}
|
|
|
|
const dht::decorated_key& sstable::get_last_decorated_key() const {
|
|
if (!_last) {
|
|
throw std::runtime_error(format("last key of {} wasn't set", get_filename()));
|
|
}
|
|
return *_last;
|
|
}
|
|
|
|
std::strong_ordering sstable::compare_by_first_key(const sstable& other) const {
|
|
return get_first_decorated_key().tri_compare(*_schema, other.get_first_decorated_key());
|
|
}
|
|
|
|
double sstable::get_compression_ratio() const {
|
|
if (this->has_component(component_type::CompressionInfo)) {
|
|
return double(_components->compression.compressed_file_length()) / _components->compression.uncompressed_file_length();
|
|
} else {
|
|
return metadata_collector::NO_COMPRESSION_RATIO;
|
|
}
|
|
}
|
|
|
|
void sstable::set_sstable_level(uint32_t new_level) {
|
|
auto entry = _components->statistics.contents.find(metadata_type::Stats);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
return;
|
|
}
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
|
sstlog.debug("set level of {} with generation {} from {} to {}", get_filename(), _generation, s.sstable_level, new_level);
|
|
s.sstable_level = new_level;
|
|
}
|
|
|
|
void sstable::mutate_sstable_level(uint32_t new_level) {
|
|
if (!has_component(component_type::Statistics)) {
|
|
return;
|
|
}
|
|
|
|
auto entry = _components->statistics.contents.find(metadata_type::Stats);
|
|
if (entry == _components->statistics.contents.end()) {
|
|
return;
|
|
}
|
|
|
|
auto& p = entry->second;
|
|
if (!p) {
|
|
throw std::runtime_error("Statistics is malformed");
|
|
}
|
|
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
|
if (s.sstable_level == new_level) {
|
|
return;
|
|
}
|
|
|
|
s.sstable_level = new_level;
|
|
}
|
|
|
|
bool sstable::should_mutate_sstable_level(uint32_t new_level) const {
|
|
try {
|
|
const auto& stats = get_stats_metadata();
|
|
return stats.sstable_level != new_level;
|
|
} catch (...) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
int sstable::compare_by_max_timestamp(const sstable& other) const {
|
|
auto ts1 = get_stats_metadata().max_timestamp;
|
|
auto ts2 = other.get_stats_metadata().max_timestamp;
|
|
return (ts1 > ts2 ? 1 : (ts1 == ts2 ? 0 : -1));
|
|
}
|
|
|
|
future<> sstable::close_files() {
|
|
utils::small_vector<future<>, 4> close_futures;
|
|
if (_index_file) {
|
|
close_futures.push_back(_index_file.close().handle_exception([me = shared_from_this()] (auto ep) {
|
|
sstlog.warn("sstable close index_file failed: {}", ep);
|
|
general_disk_error();
|
|
}));
|
|
}
|
|
if (_data_file) {
|
|
close_futures.push_back(_data_file.close().handle_exception([me = shared_from_this()] (auto ep) {
|
|
sstlog.warn("sstable close data_file failed: {}", ep);
|
|
general_disk_error();
|
|
}));
|
|
}
|
|
if (_partitions_file) {
|
|
close_futures.push_back(_partitions_file.close().handle_exception([me = shared_from_this()] (auto ep) {
|
|
sstlog.warn("sstable close partitions_db failed: {}", ep);
|
|
general_disk_error();
|
|
}));
|
|
}
|
|
if (_rows_file) {
|
|
close_futures.push_back(_rows_file.close().handle_exception([me = shared_from_this()] (auto ep) {
|
|
sstlog.warn("sstable close rows_db failed: {}", ep);
|
|
general_disk_error();
|
|
}));
|
|
}
|
|
|
|
auto unlinked = make_ready_future<>();
|
|
if (_marked_for_deletion != mark_for_deletion::none) {
|
|
// If a deletion fails for some reason we
|
|
// log and ignore this failure, because on startup we'll again try to
|
|
// clean up unused sstables, and because we'll never reuse the same
|
|
// generation number anyway.
|
|
sstlog.debug("Deleting sstable that is {}marked for deletion", _marked_for_deletion == mark_for_deletion::implicit ? "implicitly " : "");
|
|
try {
|
|
unlinked = unlink().handle_exception(
|
|
[me = shared_from_this()] (std::exception_ptr eptr) {
|
|
try {
|
|
std::rethrow_exception(eptr);
|
|
} catch (...) {
|
|
sstlog.warn("Exception when deleting sstable file: {}", eptr);
|
|
}
|
|
});
|
|
} catch (...) {
|
|
sstlog.warn("Exception when deleting sstable file: {}", std::current_exception());
|
|
}
|
|
}
|
|
|
|
_on_closed(*this);
|
|
|
|
return when_all_succeed(close_futures.begin(), close_futures.end()).discard_result().then([this, me = shared_from_this()] {
|
|
if (_open_mode) {
|
|
if (_open_mode.value() == open_flags::ro) {
|
|
_stats.on_close_for_reading();
|
|
}
|
|
}
|
|
_open_mode.reset();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Returns a pair of positions [p1, p2) in the summary file corresponding to entries
|
|
* covered by the specified range, or a disengaged optional if no such pair exists.
|
|
*/
|
|
std::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_range(const dht::token_range& range) {
|
|
auto entries_size = _components->summary.entries.size();
|
|
auto search = [this](bool before, const dht::token& token) {
|
|
auto kind = before ? key::kind::before_all_keys : key::kind::after_all_keys;
|
|
key k(kind);
|
|
// Binary search will never returns positive values.
|
|
return uint64_t((binary_search(_schema->get_partitioner(), _components->summary.entries, k, token) + 1) * -1);
|
|
};
|
|
uint64_t left = 0;
|
|
if (range.start()) {
|
|
left = search(range.start()->is_inclusive(), range.start()->value());
|
|
if (left == entries_size) {
|
|
// left is past the end of the sampling.
|
|
return std::nullopt;
|
|
}
|
|
}
|
|
uint64_t right = entries_size;
|
|
if (range.end()) {
|
|
right = search(!range.end()->is_inclusive(), range.end()->value());
|
|
if (right == 0) {
|
|
// The first key is strictly greater than right.
|
|
return std::nullopt;
|
|
}
|
|
}
|
|
if (left < right) {
|
|
return std::optional<std::pair<uint64_t, uint64_t>>(std::in_place_t(), left, right);
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
future<sstring> make_toc_temporary(sstring sstable_toc_name, storage::sync_dir sync) {
|
|
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
|
|
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
|
|
|
|
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
|
|
if (co_await sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name)) {
|
|
// If new_toc_name exists it will be atomically replaced. See rename(2)
|
|
co_await sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name);
|
|
if (sync) {
|
|
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
|
}
|
|
} else {
|
|
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
|
|
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
|
co_return "";
|
|
}
|
|
}
|
|
co_return new_toc_name;
|
|
}
|
|
|
|
/**
|
|
* Returns a pair of positions [p1, p2) in the summary file corresponding to
|
|
* pages which may include keys covered by the specified range, or a disengaged
|
|
* optional if the sstable does not include any keys from the range.
|
|
*/
|
|
std::optional<std::pair<uint64_t, uint64_t>> sstable::get_index_pages_for_range(const dht::token_range& range) {
|
|
const auto& entries = _components->summary.entries;
|
|
auto entries_size = entries.size();
|
|
index_comparator cmp(*_schema);
|
|
dht::ring_position_comparator rp_cmp(*_schema);
|
|
uint64_t left = 0;
|
|
if (range.start()) {
|
|
dht::ring_position_view pos = range.start()->is_inclusive()
|
|
? dht::ring_position_view::starting_at(range.start()->value())
|
|
: dht::ring_position_view::ending_at(range.start()->value());
|
|
|
|
// There is no summary entry for the last key, so in order to determine
|
|
// if pos overlaps with the sstable or not we have to compare with the
|
|
// last key.
|
|
if (rp_cmp(pos, get_last_decorated_key()) > 0) {
|
|
// left is past the end of the sampling.
|
|
return std::nullopt;
|
|
}
|
|
|
|
left = std::distance(std::begin(entries),
|
|
std::lower_bound(entries.begin(), entries.end(), pos, cmp));
|
|
|
|
if (left) {
|
|
--left;
|
|
}
|
|
}
|
|
uint64_t right = entries_size;
|
|
if (range.end()) {
|
|
dht::ring_position_view pos = range.end()->is_inclusive()
|
|
? dht::ring_position_view::ending_at(range.end()->value())
|
|
: dht::ring_position_view::starting_at(range.end()->value());
|
|
|
|
right = std::distance(std::begin(entries),
|
|
std::lower_bound(entries.begin(), entries.end(), pos, cmp));
|
|
if (right == 0) {
|
|
// The first key is strictly greater than right.
|
|
return std::nullopt;
|
|
}
|
|
}
|
|
if (left < right) {
|
|
return std::optional<std::pair<uint64_t, uint64_t>>(std::in_place_t(), left, right);
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
future<uint64_t> sstable::estimated_keys_for_range(const dht::token_range& range) {
|
|
if (_components->summary) {
|
|
auto page_range = get_index_pages_for_range(range);
|
|
if (!page_range) {
|
|
co_return 0;
|
|
}
|
|
using uint128_t = unsigned __int128;
|
|
uint64_t range_pages = page_range->second - page_range->first;
|
|
auto total_keys = get_estimated_key_count();
|
|
auto total_pages = _components->summary.entries.size();
|
|
uint64_t estimated_keys = (uint128_t)range_pages * total_keys / total_pages;
|
|
co_return std::max(uint64_t(1), estimated_keys);
|
|
} else if (_partitions_db_footer) {
|
|
// This is an extra conditional for the special case when the given range
|
|
// doesn't overlap with the sstable's range at all.
|
|
//
|
|
// In this case, if the ranges are adjacent, the main code path could easily
|
|
// return "1 partition" instead of "0 partitions",
|
|
// due to the inexactness of BTI indexes for range queries.
|
|
// Returning something non-zero in this case would be unfortunate,
|
|
// so the extra conditional makes sure that we return 0.
|
|
auto local_tr = dht::token_range::make({get_first_decorated_key().token()}, {get_last_decorated_key().token()});
|
|
if (!local_tr.overlaps(range, dht::token_comparator())) {
|
|
co_return 0;
|
|
}
|
|
|
|
uint64_t result;
|
|
|
|
auto& sem = _manager.sstable_metadata_concurrency_sem();
|
|
uint64_t estimated_memory = 16 * 1024; // Value pulled from thin air
|
|
reader_permit permit = co_await sem.obtain_permit(_schema, "sstable::estimated_keys_for_range", estimated_memory, db::no_timeout, {});
|
|
auto ir = make_index_reader(std::move(permit));
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await ir->advance_to(dht::to_partition_range(range));
|
|
auto data_file_range = ir->data_file_positions();
|
|
auto uncompressed_data_size = data_size();
|
|
auto start = data_file_range.start;
|
|
auto end = data_file_range.end.value_or(uncompressed_data_size);
|
|
auto total_count = get_estimated_key_count();
|
|
sstlog.debug("estimated_keys_for_range(sst={}, range={}): data_start: {}, data_end: {}, data_size: {}, estimated_key_count: {}",
|
|
get_filename(), range, start, end, uncompressed_data_size, total_count);
|
|
if (start == end) {
|
|
result = 0;
|
|
} else {
|
|
result = std::ceil(double(end - start) / uncompressed_data_size * total_count);
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await ir->close();
|
|
if (ex) {
|
|
co_return coroutine::exception(std::move(ex));
|
|
} else {
|
|
co_return result;
|
|
}
|
|
} else {
|
|
co_return coroutine::exception(std::make_exception_ptr(malformed_sstable_exception(
|
|
format("{}: neither Summary.db nor Partitions.db component is present, can't estimate number of partitions in range", get_filename()))));
|
|
}
|
|
}
|
|
|
|
std::vector<unsigned>
|
|
sstable::compute_shards_for_this_sstable(const dht::sharder& sharder_) const {
|
|
std::unordered_set<unsigned> shards;
|
|
utils::chunked_vector<dht::partition_range> token_ranges;
|
|
const auto* sm = _components->scylla_metadata
|
|
? _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>()
|
|
: nullptr;
|
|
if (!sm || sm->token_ranges.elements.empty()) {
|
|
token_ranges.push_back(dht::partition_range::make(
|
|
dht::ring_position::starting_at(get_first_decorated_key().token()),
|
|
dht::ring_position::ending_at(get_last_decorated_key().token())));
|
|
} else {
|
|
auto disk_token_range_to_ring_position_range = [] (const disk_token_range& dtr) {
|
|
auto t1 = dht::token(dht::token::kind::key, bytes_view(dtr.left.token));
|
|
auto t2 = dht::token(dht::token::kind::key, bytes_view(dtr.right.token));
|
|
return dht::partition_range::make(
|
|
(dtr.left.exclusive ? dht::ring_position::ending_at : dht::ring_position::starting_at)(std::move(t1)),
|
|
(dtr.right.exclusive ? dht::ring_position::starting_at : dht::ring_position::ending_at)(std::move(t2)));
|
|
};
|
|
token_ranges = sm->token_ranges.elements
|
|
| std::views::transform(disk_token_range_to_ring_position_range)
|
|
| std::ranges::to<utils::chunked_vector<dht::partition_range>>();
|
|
}
|
|
sstlog.trace("{}: token_ranges={}", get_filename(), token_ranges);
|
|
auto sharder = dht::ring_position_range_vector_sharder(sharder_, std::move(token_ranges));
|
|
auto rpras = sharder.next(*_schema);
|
|
while (rpras) {
|
|
shards.insert(rpras->shard);
|
|
rpras = sharder.next(*_schema);
|
|
}
|
|
return shards | std::ranges::to<std::vector>();
|
|
}
|
|
|
|
future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::decorated_key& dk) {
|
|
shared_sstable s = shared_from_this();
|
|
if (!filter_has_key(hk)) {
|
|
co_return false;
|
|
}
|
|
bool present;
|
|
std::exception_ptr ex;
|
|
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()",
|
|
reader_concurrency_semaphore::register_metrics::no);
|
|
|
|
auto slice = partition_slice_builder(*_schema).with_no_regular_columns().with_no_static_columns().build();
|
|
auto pr = dht::partition_range::make_singular(dk);
|
|
auto reader = make_reader(
|
|
_schema,
|
|
sem.make_tracking_only_permit(_schema, fmt::to_string(s->get_filename()), db::no_timeout, {}),
|
|
pr,
|
|
slice);
|
|
try {
|
|
reader.set_max_buffer_size(1);
|
|
present = bool(co_await reader.peek());
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await reader.close();
|
|
co_await sem.stop();
|
|
if (ex) {
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
co_return present;
|
|
}
|
|
|
|
utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key& key) {
|
|
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
|
|
}
|
|
|
|
future<>
|
|
sstable::unlink(storage::sync_dir sync) noexcept {
|
|
// Serialize with other calls to unlink or potentially ongoing mutations.
|
|
auto lock = co_await get_units(_mutate_sem, 1);
|
|
|
|
_unlinked = true;
|
|
_on_delete(*this);
|
|
|
|
auto remove_fut = _storage->wipe(*this, sync);
|
|
|
|
try {
|
|
if (_cloned_to_sstable_filename) {
|
|
co_await get_large_data_handler().maybe_update_large_data_entries_sstable_name(shared_from_this(), _cloned_to_sstable_filename.value());
|
|
} else {
|
|
co_await get_large_data_handler().maybe_delete_large_data_entries(shared_from_this());
|
|
}
|
|
} catch (...) {
|
|
memory::scoped_critical_alloc_section _;
|
|
// Just log and ignore failures to delete large data entries.
|
|
// They are not critical to the operation of the database.
|
|
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", toc_filename(), std::current_exception());
|
|
}
|
|
|
|
co_await std::move(remove_fut);
|
|
_stats.on_delete();
|
|
_manager.on_unlink(this);
|
|
}
|
|
|
|
thread_local sstables_stats::stats sstables_stats::_shard_stats;
|
|
thread_local mc::cached_promoted_index::metrics promoted_index_cache_metrics;
|
|
static thread_local seastar::metrics::metric_groups metrics;
|
|
|
|
void register_index_page_cache_metrics(seastar::metrics::metric_groups& metrics, cached_file_stats& m) {
|
|
namespace sm = seastar::metrics;
|
|
metrics.add_group("sstables", {
|
|
sm::make_counter("index_page_cache_hits", [&m] { return m.page_hits; },
|
|
sm::description("Index page cache requests which were served from cache")),
|
|
sm::make_counter("index_page_cache_misses", [&m] { return m.page_misses; },
|
|
sm::description("Index page cache requests which had to perform I/O")),
|
|
sm::make_counter("index_page_cache_evictions", [&m] { return m.page_evictions; },
|
|
sm::description("Total number of index page cache pages which have been evicted")),
|
|
sm::make_counter("index_page_cache_populations", [&m] { return m.page_populations; },
|
|
sm::description("Total number of index page cache pages which were inserted into the cache")),
|
|
sm::make_gauge("index_page_cache_bytes", [&m] { return m.cached_bytes; },
|
|
sm::description("Total number of bytes cached in the index page cache")),
|
|
sm::make_gauge("index_page_cache_bytes_in_std", [&m] { return m.bytes_in_std; },
|
|
sm::description("Total number of bytes in temporary buffers which live in the std allocator")),
|
|
});
|
|
}
|
|
|
|
void register_index_page_metrics(seastar::metrics::metric_groups& metrics, partition_index_cache_stats& m) {
|
|
namespace sm = seastar::metrics;
|
|
metrics.add_group("sstables", {
|
|
sm::make_counter("index_page_hits", [&m] { return m.hits; },
|
|
sm::description("Index page requests which could be satisfied without waiting")),
|
|
sm::make_counter("index_page_misses", [&m] { return m.misses; },
|
|
sm::description("Index page requests which initiated a read from disk")),
|
|
sm::make_counter("index_page_blocks", [&m] { return m.blocks; },
|
|
sm::description("Index page requests which needed to wait due to page not being loaded yet")),
|
|
sm::make_counter("index_page_evictions", [&m] { return m.evictions; },
|
|
sm::description("Index pages which got evicted from memory")),
|
|
sm::make_counter("index_page_populations", [&m] { return m.populations; },
|
|
sm::description("Index pages which got populated into memory")),
|
|
sm::make_gauge("index_page_used_bytes", [&m] { return m.used_bytes; },
|
|
sm::description("Amount of bytes used by index pages in memory")),
|
|
|
|
});
|
|
}
|
|
|
|
future<> init_metrics() {
|
|
return seastar::smp::invoke_on_all([] {
|
|
namespace sm = seastar::metrics;
|
|
metrics.add_group("sstables", {
|
|
sm::make_counter("pi_cache_hits_l0", [] { return promoted_index_cache_metrics.hits_l0; },
|
|
sm::description("Number of requests for promoted index block in state l0 which didn't have to go to the page cache")),
|
|
sm::make_counter("pi_cache_hits_l1", [] { return promoted_index_cache_metrics.hits_l1; },
|
|
sm::description("Number of requests for promoted index block in state l1 which didn't have to go to the page cache")),
|
|
sm::make_counter("pi_cache_hits_l2", [] { return promoted_index_cache_metrics.hits_l2; },
|
|
sm::description("Number of requests for promoted index block in state l2 which didn't have to go to the page cache")),
|
|
sm::make_counter("pi_cache_misses_l0", [] { return promoted_index_cache_metrics.misses_l0; },
|
|
sm::description("Number of requests for promoted index block in state l0 which had to go to the page cache")),
|
|
sm::make_counter("pi_cache_misses_l1", [] { return promoted_index_cache_metrics.misses_l1; },
|
|
sm::description("Number of requests for promoted index block in state l1 which had to go to the page cache")),
|
|
sm::make_counter("pi_cache_misses_l2", [] { return promoted_index_cache_metrics.misses_l2; },
|
|
sm::description("Number of requests for promoted index block in state l2 which had to go to the page cache")),
|
|
sm::make_counter("pi_cache_populations", [] { return promoted_index_cache_metrics.populations; },
|
|
sm::description("Number of promoted index blocks which got inserted")),
|
|
sm::make_counter("pi_cache_evictions", [] { return promoted_index_cache_metrics.evictions; },
|
|
sm::description("Number of promoted index blocks which got evicted")),
|
|
sm::make_gauge("pi_cache_bytes", [] { return promoted_index_cache_metrics.used_bytes; },
|
|
sm::description("Number of bytes currently used by cached promoted index blocks")),
|
|
sm::make_gauge("pi_cache_block_count", [] { return promoted_index_cache_metrics.block_count; },
|
|
sm::description("Number of promoted index blocks currently cached")),
|
|
|
|
sm::make_counter("partition_writes", [] { return sstables_stats::get_shard_stats().partition_writes; },
|
|
sm::description("Number of partitions written")),
|
|
sm::make_counter("static_row_writes", [] { return sstables_stats::get_shard_stats().static_row_writes; },
|
|
sm::description("Number of static rows written")),
|
|
sm::make_counter("row_writes", [] { return sstables_stats::get_shard_stats().row_writes; },
|
|
sm::description("Number of clustering rows written")),
|
|
sm::make_counter("cell_writes", [] { return sstables_stats::get_shard_stats().cell_writes; },
|
|
sm::description("Number of cells written")),
|
|
sm::make_counter("tombstone_writes", [] { return sstables_stats::get_shard_stats().tombstone_writes; },
|
|
sm::description("Number of tombstones written"))(basic_level),
|
|
sm::make_counter("range_tombstone_writes", [] { return sstables_stats::get_shard_stats().range_tombstone_writes; },
|
|
sm::description("Number of range tombstones written"))(basic_level),
|
|
sm::make_counter("pi_auto_scale_events", [] { return sstables_stats::get_shard_stats().promoted_index_auto_scale_events; },
|
|
sm::description("Number of promoted index auto-scaling events")),
|
|
|
|
sm::make_counter("range_tombstone_reads", [] { return sstables_stats::get_shard_stats().range_tombstone_reads; },
|
|
sm::description("Number of range tombstones read"))(basic_level),
|
|
sm::make_counter("row_tombstone_reads", [] { return sstables_stats::get_shard_stats().row_tombstone_reads; },
|
|
sm::description("Number of row tombstones read"))(basic_level),
|
|
sm::make_counter("cell_tombstone_writes", [] { return sstables_stats::get_shard_stats().cell_tombstone_writes; },
|
|
sm::description("Number of cell tombstones written"))(basic_level),
|
|
sm::make_counter("single_partition_reads", [] { return sstables_stats::get_shard_stats().single_partition_reads; },
|
|
sm::description("Number of single partition flat mutation reads")),
|
|
sm::make_counter("range_partition_reads", [] { return sstables_stats::get_shard_stats().range_partition_reads; },
|
|
sm::description("Number of partition range flat mutation reads")),
|
|
sm::make_counter("partition_reads", [] { return sstables_stats::get_shard_stats().partition_reads; },
|
|
sm::description("Number of partitions read")),
|
|
sm::make_counter("partition_seeks", [] { return sstables_stats::get_shard_stats().partition_seeks; },
|
|
sm::description("Number of partitions seeked")),
|
|
sm::make_counter("row_reads", [] { return sstables_stats::get_shard_stats().row_reads; },
|
|
sm::description("Number of rows read")),
|
|
|
|
sm::make_counter("capped_local_deletion_time", [] { return sstables_stats::get_shard_stats().capped_local_deletion_time; },
|
|
sm::description("Number of SStables with tombstones whose local deletion time was capped at the maximum allowed value in Statistics")),
|
|
sm::make_counter("capped_tombstone_deletion_time", [] { return sstables_stats::get_shard_stats().capped_tombstone_deletion_time; },
|
|
sm::description("Number of tombstones whose local deletion time was capped at the maximum allowed value")),
|
|
|
|
sm::make_counter("total_open_for_reading", [] { return sstables_stats::get_shard_stats().open_for_reading; },
|
|
sm::description("Counter of sstables open for reading")),
|
|
sm::make_counter("total_open_for_writing", [] { return sstables_stats::get_shard_stats().open_for_writing; },
|
|
sm::description("Counter of sstables open for writing")),
|
|
|
|
sm::make_gauge("currently_open_for_reading", [] {
|
|
return sstables_stats::get_shard_stats().open_for_reading -
|
|
sstables_stats::get_shard_stats().closed_for_reading;
|
|
}, sm::description("Number of sstables currently open for reading")),
|
|
sm::make_gauge("currently_open_for_writing", [] {
|
|
return sstables_stats::get_shard_stats().open_for_writing -
|
|
sstables_stats::get_shard_stats().closed_for_writing;
|
|
}, sm::description("Number of sstables currently open for writing")),
|
|
|
|
sm::make_counter("total_deleted", [] { return sstables_stats::get_shard_stats().deleted; },
|
|
sm::description("Counter of deleted sstables")),
|
|
|
|
sm::make_gauge("bloom_filter_memory_size", [] { return utils::filter::bloom_filter::get_shard_stats().memory_size; },
|
|
sm::description("Bloom filter memory usage in bytes.")),
|
|
});
|
|
});
|
|
}
|
|
|
|
mutation_source sstable::as_mutation_source() {
|
|
return mutation_source([sst = shared_from_this()] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) mutable {
|
|
return sst->make_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}
|
|
|
|
sstable::sstable(schema_ptr schema,
|
|
const data_dictionary::storage_options& storage,
|
|
generation_type generation,
|
|
sstable_state state,
|
|
version_types v,
|
|
format_types f,
|
|
db::large_data_handler& large_data_handler,
|
|
db::corrupt_data_handler& corrupt_data_handler,
|
|
sstables_manager& manager,
|
|
db_clock::time_point now,
|
|
io_error_handler_gen error_handler_gen,
|
|
size_t buffer_size)
|
|
: sstable_buffer_size(buffer_size)
|
|
, _schema(std::move(schema))
|
|
, _generation(generation)
|
|
, _state(state)
|
|
, _storage(make_storage(manager, storage, _state))
|
|
, _version(v)
|
|
, _format(f)
|
|
, _index_cache(std::make_unique<partition_index_cache>(
|
|
manager.get_cache_tracker().get_lru(), manager.get_cache_tracker().region(), manager.get_cache_tracker().get_partition_index_cache_stats()))
|
|
, _now(now)
|
|
, _read_error_handler(error_handler_gen(sstable_read_error))
|
|
, _write_error_handler(error_handler_gen(sstable_write_error))
|
|
, _large_data_handler(large_data_handler)
|
|
, _corrupt_data_handler(corrupt_data_handler)
|
|
, _manager(manager)
|
|
{
|
|
manager.add(this);
|
|
}
|
|
|
|
file sstable::uncached_index_file() {
|
|
return _cached_index_file->get_file();
|
|
}
|
|
|
|
file sstable::uncached_partitions_file() {
|
|
return _cached_partitions_file->get_file();
|
|
}
|
|
|
|
file sstable::uncached_rows_file() {
|
|
return _cached_rows_file->get_file();
|
|
}
|
|
|
|
void sstable::unused() {
|
|
if (_active) {
|
|
_active = false;
|
|
_manager.deactivate(this);
|
|
} else {
|
|
_manager.remove(this);
|
|
}
|
|
}
|
|
|
|
future<> sstable::destroy() {
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await close_files();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await _index_cache->evict_gently();
|
|
if (_cached_index_file) {
|
|
co_await _cached_index_file->evict_gently();
|
|
}
|
|
co_await _storage->destroy(*this);
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
std::optional<large_data_stats_entry> sstable::get_large_data_stat(large_data_type t) const noexcept {
|
|
if (_large_data_stats) {
|
|
auto it = _large_data_stats->map.find(t);
|
|
if (it != _large_data_stats->map.end()) {
|
|
return std::make_optional<large_data_stats_entry>(it->second);
|
|
}
|
|
}
|
|
return std::make_optional<large_data_stats_entry>();
|
|
}
|
|
|
|
scylla_metadata::ext_timestamp_stats::map_type sstable::get_ext_timestamp_stats() const noexcept {
|
|
if (_ext_timestamp_stats) {
|
|
return _ext_timestamp_stats->map;
|
|
}
|
|
return scylla_metadata::ext_timestamp_stats::map_type{};
|
|
}
|
|
|
|
// The gc_before returned by the function can only be used to estimate if the
|
|
// sstable is worth dropping some tombstones. We only return the maximum
|
|
// gc_before for all the partitions that have record in repair history map. It
|
|
// is fine that some of the partitions inside the sstable does not have a
|
|
// record.
|
|
gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const {
|
|
auto start = get_first_decorated_key().token();
|
|
auto end = get_last_decorated_key().token();
|
|
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
|
|
sstlog.trace("sstable={}, ks={}, cf={}, range={}, gc_state={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range, gc_state.is_gc_enabled());
|
|
return gc_state.get_gc_before_for_range(s, range, compaction_time).max_gc_before;
|
|
}
|
|
|
|
// If the sstable contains any regular live cells, we can not drop the sstable.
|
|
// We do not even bother to query the gc_before. Return
|
|
// gc_clock::time_point::min() as gc_before.
|
|
//
|
|
// If the token range of the sstable contains tokens that do not have a record
|
|
// in the repair history map, we can not drop the sstable, in such case we
|
|
// return gc_clock::time_point::min() as gc_before. Otherwise, return the
|
|
// gc_before from the repair history map.
|
|
gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const {
|
|
auto deletion_time = get_max_local_deletion_time();
|
|
// No need to query gc_before for the sstable if the max_deletion_time is max()
|
|
if (deletion_time == gc_clock::time_point(gc_clock::duration(std::numeric_limits<int>::max()))) {
|
|
sstlog.trace("sstable={}, ks={}, cf={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, shortcut",
|
|
get_filename(), s->ks_name(), s->cf_name(), deletion_time, seastar::value_of([this] { return get_stats_metadata().min_timestamp; }), s->gc_grace_seconds().count());
|
|
return gc_clock::time_point::min();
|
|
}
|
|
auto start = get_first_decorated_key().token();
|
|
auto end = get_last_decorated_key().token();
|
|
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
|
|
sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, gc_state={}, query",
|
|
get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count(), gc_state.is_gc_enabled());
|
|
auto res = gc_state.get_gc_before_for_range(s, range, compaction_time);
|
|
return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min();
|
|
}
|
|
|
|
std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state,
|
|
use_caching caching,
|
|
bool single_partition_read
|
|
) {
|
|
if (!_index_file) {
|
|
if (!_partitions_db_footer) [[unlikely]] {
|
|
on_internal_error(sstlog, fmt::format("_partitions_db_footer is empty for sstable {}", get_filename()));
|
|
}
|
|
auto cached_partitions_file = caching == use_caching::yes
|
|
? _cached_partitions_file
|
|
: seastar::make_shared<cached_file>(
|
|
uncached_partitions_file(),
|
|
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
|
_manager.get_cache_tracker().get_lru(),
|
|
_manager.get_cache_tracker().region(),
|
|
_cached_partitions_file->size(),
|
|
trace_state ? component_name(*this, component_type::Partitions).format() : sstring()
|
|
);
|
|
auto cached_rows_file = caching == use_caching::yes
|
|
? _cached_rows_file
|
|
: seastar::make_shared<cached_file>(
|
|
uncached_rows_file(),
|
|
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
|
_manager.get_cache_tracker().get_lru(),
|
|
_manager.get_cache_tracker().region(),
|
|
_cached_rows_file->size(),
|
|
trace_state ? component_name(*this, component_type::Rows).format() : sstring()
|
|
);
|
|
return trie::make_bti_index_reader(
|
|
cached_partitions_file,
|
|
cached_rows_file,
|
|
_partitions_db_footer.value().trie_root_position,
|
|
data_size(),
|
|
_schema,
|
|
std::move(permit),
|
|
std::move(trace_state)
|
|
);
|
|
}
|
|
return std::make_unique<index_reader>(shared_from_this(), std::move(permit), std::move(trace_state), caching, single_partition_read);
|
|
}
|
|
|
|
// Returns error code, 0 is success
|
|
static future<int> remove_dir(fs::path dir, bool recursive) {
|
|
std::exception_ptr ex;
|
|
int error_code;
|
|
try {
|
|
co_await (recursive ? recursive_remove_directory(dir) : remove_file(dir.native()));
|
|
co_return 0;
|
|
} catch (const std::system_error& e) {
|
|
ex = std::current_exception();
|
|
error_code = e.code().value();
|
|
if (error_code == ENOENT) {
|
|
// Ignore missing directories
|
|
co_return 0;
|
|
}
|
|
if ((error_code == EEXIST || error_code == ENOTEMPTY) && !recursive) {
|
|
// Just return failure if the directory is not empty
|
|
// Let the caller decide what to do about it.
|
|
co_return error_code;
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
error_code = -1;
|
|
}
|
|
sstlog.warn("Could not remove table directory {}: {}. Ignored.", dir, ex);
|
|
co_return error_code;
|
|
}
|
|
|
|
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) {
|
|
// Be paranoid about risky paths
|
|
if (table_dir == "" || table_dir == "/") {
|
|
on_internal_error_noexcept(sstlog, format("Invalid table directory for removal: {}", table_dir));
|
|
abort();
|
|
}
|
|
|
|
int error = 0;
|
|
for (auto subdir : sstables::table_subdirectories) {
|
|
// Remove the snapshot directory only if empty
|
|
// while other subdirectories are removed recusresively.
|
|
auto ec = co_await remove_dir(table_dir / subdir, subdir != sstables::snapshots_dir);
|
|
if (subdir == sstables::snapshots_dir && ec == EEXIST) {
|
|
sstlog.info("Leaving table directory {} behind as it has snapshots", table_dir);
|
|
}
|
|
if (!error) {
|
|
error = ec;
|
|
}
|
|
}
|
|
|
|
if (!error) {
|
|
// Remove the table directory recursively
|
|
// since it may still hold leftover temporary
|
|
// sstable files and directories
|
|
co_await remove_dir(table_dir, true);
|
|
}
|
|
}
|
|
|
|
std::string to_string(const shared_sstable& sst, bool include_origin) {
|
|
auto repaired_at = sst->get_stats_metadata().repaired_at;
|
|
return include_origin ?
|
|
fmt::format("{}:level={:d}:origin={}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin(), repaired_at) :
|
|
fmt::format("{}:level={:d}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), repaired_at);
|
|
}
|
|
|
|
std::string sstable_stream_source::component_basename() const {
|
|
return _sst->component_basename(_type);
|
|
}
|
|
|
|
sstable_stream_source::sstable_stream_source(shared_sstable sst, component_type type)
|
|
: _sst(std::move(sst))
|
|
, _type(type)
|
|
{}
|
|
|
|
future<std::vector<std::unique_ptr<sstable_stream_source>>> create_stream_sources(const sstables::sstable_files_snapshot& snapshot, reader_permit permit) {
|
|
std::vector<std::unique_ptr<sstable_stream_source>> result;
|
|
result.reserve(snapshot.files.size());
|
|
|
|
class sstable_stream_source_impl : public sstable_stream_source {
|
|
file _file;
|
|
public:
|
|
sstable_stream_source_impl(shared_sstable table, component_type type, file f)
|
|
: sstable_stream_source(std::move(table), type)
|
|
, _file(std::move(f))
|
|
{}
|
|
future<input_stream<char>> input(const file_input_stream_options& options) const override {
|
|
if (_type == component_type::Scylla) {
|
|
// Filter out any node-local info (i.e. extensions)
|
|
// and reserialize data. Load into a temp object.
|
|
// TODO/FIXME. Not all extension attributes might
|
|
// need removing. In fact, it might be wrong (in the future)
|
|
// to do so. ATM we know this is safe and correct, but really
|
|
// extensions should remove themselves if required.
|
|
scylla_metadata tmp;
|
|
uint64_t size = co_await _file.size();
|
|
auto r = file_random_access_reader(_file, size, default_sstable_buffer_size);
|
|
co_await parse(*_sst->get_schema(), _sst->get_version(), r, tmp);
|
|
co_await r.close();
|
|
|
|
tmp.remove_extension_attributes();
|
|
|
|
std::vector<temporary_buffer<char>> bufs;
|
|
|
|
co_await seastar::async([&] {
|
|
tmp.get_or_create_components_digests();
|
|
tmp.digest = serialized_checksum(_sst->get_version(), tmp.data);
|
|
using buffer_data_sink_impl = seastar::util::basic_memory_data_sink<decltype(bufs), 128*1024>;
|
|
file_writer fw(data_sink(std::make_unique<buffer_data_sink_impl>(bufs)));
|
|
write(_sst->get_version(), fw, tmp);
|
|
fw.close();
|
|
});
|
|
co_return seastar::util::as_input_stream(std::move(bufs));
|
|
}
|
|
co_return make_file_input_stream(_file, options);
|
|
}
|
|
};
|
|
|
|
class sstable_data_stream_source_impl : public sstable_stream_source {
|
|
file _file;
|
|
reader_permit _permit;
|
|
lw_shared_ptr<checksum> _checksum;
|
|
public:
|
|
sstable_data_stream_source_impl(shared_sstable table, component_type type, file f, reader_permit permit, lw_shared_ptr<checksum> checksum)
|
|
: sstable_stream_source(std::move(table), type)
|
|
, _file(std::move(f))
|
|
, _permit(std::move(permit))
|
|
, _checksum(std::move(checksum))
|
|
{}
|
|
future<input_stream<char>> input(const file_input_stream_options& options) const override {
|
|
co_return co_await _sst->data_stream(0, _sst->ondisk_data_size(), _permit, nullptr, nullptr, options, sstable::raw_stream::compressed_chunks, integrity_check::yes);
|
|
}
|
|
};
|
|
|
|
auto& files = snapshot.files;
|
|
|
|
auto add = [&](component_type type, file f) {
|
|
result.emplace_back(std::make_unique<sstable_stream_source_impl>(snapshot.sst, type, std::move(f)));
|
|
};
|
|
|
|
try {
|
|
add(component_type::TOC, files.at(component_type::TOC));
|
|
add(component_type::Scylla, files.at(component_type::Scylla));
|
|
} catch (std::out_of_range&) {
|
|
std::throw_with_nested(std::invalid_argument("Missing required sstable component"));
|
|
}
|
|
|
|
if (auto data_it = files.find(component_type::Data); data_it != files.end()) {
|
|
lw_shared_ptr<checksum> checksum;
|
|
if (auto crc = files.find(component_type::CRC); crc != files.end()) {
|
|
checksum = co_await snapshot.sst->read_checksum(crc->second);
|
|
}
|
|
if (auto digest = files.find(component_type::Digest); digest != files.end()) {
|
|
co_await snapshot.sst->read_digest(digest->second);
|
|
}
|
|
result.emplace_back(std::make_unique<sstable_data_stream_source_impl>(snapshot.sst, data_it->first, std::move(data_it->second), std::move(permit), std::move(checksum)));
|
|
}
|
|
for (auto&& [type, f] : files) {
|
|
if (type != component_type::TOC && type != component_type::Scylla && type != component_type::Data) {
|
|
add(type, std::move(f));
|
|
}
|
|
}
|
|
|
|
co_return result;
|
|
}
|
|
|
|
class sstable_stream_sink_impl : public sstable_stream_sink {
|
|
shared_sstable _sst;
|
|
component_type _type;
|
|
bool _last_component;
|
|
bool _leave_unsealed;
|
|
public:
|
|
sstable_stream_sink_impl(shared_sstable sst, component_type type, sstable_stream_sink_cfg cfg)
|
|
: _sst(std::move(sst))
|
|
, _type(type)
|
|
, _last_component(cfg.last_component)
|
|
, _leave_unsealed(cfg.leave_unsealed)
|
|
{}
|
|
private:
|
|
future<> load_metadata() const {
|
|
auto metafile = _sst->filename(sstables::component_type::Scylla);
|
|
if (!co_await file_exists(fmt::to_string(metafile))) {
|
|
// for compatibility with streaming a non-scylla table (no scylla component)
|
|
co_return;
|
|
}
|
|
if (!_sst->get_shared_components().scylla_metadata) {
|
|
sstables::scylla_metadata tmp;
|
|
co_await _sst->read_simple<component_type::Scylla>(tmp);
|
|
_sst->get_shared_components().scylla_metadata = std::move(tmp);
|
|
}
|
|
}
|
|
future<> save_metadata() const {
|
|
if (!_sst->get_shared_components().scylla_metadata) {
|
|
co_return;
|
|
}
|
|
auto& metadata = *_sst->get_shared_components().scylla_metadata;
|
|
|
|
file_output_stream_options options;
|
|
options.buffer_size = default_sstable_buffer_size;
|
|
co_await seastar::async([&] {
|
|
metadata.get_or_create_components_digests();
|
|
metadata.digest = serialized_checksum(_sst->get_version(), metadata.data);
|
|
auto w = _sst->make_component_file_writer(component_type::Scylla, std::move(options), open_flags::wo | open_flags::create).get();
|
|
write(_sst->get_version(), w, metadata);
|
|
w.close();
|
|
});
|
|
}
|
|
public:
|
|
future<output_stream<char>> output(const file_open_options& foptions, const file_output_stream_options& stream_options) override {
|
|
assert(_type != component_type::TOC);
|
|
// TOC and scylla components are guaranteed not to depend on metadata. Ignore these (chicken, egg)
|
|
bool load_save_meta = _type != component_type::TemporaryTOC && _type != component_type::Scylla;
|
|
|
|
// otherwise, first load scylla metadata from disk as written so far.
|
|
if (load_save_meta) {
|
|
co_await load_metadata();
|
|
}
|
|
// now we can open the component file. any extensions applied should write info into metadata
|
|
auto f = co_await _sst->open_file(_type, open_flags::wo | open_flags::create, foptions);
|
|
|
|
// Save back to disk.
|
|
if (load_save_meta) {
|
|
co_await save_metadata();
|
|
}
|
|
|
|
co_return co_await make_file_output_stream(std::move(f), stream_options);
|
|
}
|
|
future<shared_sstable> close() override {
|
|
if (_last_component) {
|
|
// If we are the last component in a sequence, we can seal the table.
|
|
if (!_leave_unsealed) {
|
|
co_await _sst->_storage->seal(*_sst);
|
|
}
|
|
co_return std::move(_sst);
|
|
}
|
|
_sst = {};
|
|
co_return nullptr;
|
|
}
|
|
future<> abort() override {
|
|
if (!_sst) {
|
|
co_return;
|
|
}
|
|
auto filename = fmt::to_string(_sst->filename(_type));
|
|
// TODO: if we are the last component (or really always), should we remove all component files?
|
|
// For now, this remains the responsibility of calling code (see handle_tablet_migration etc)
|
|
co_await remove_file(filename);
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, sstable_stream_sink_cfg cfg) {
|
|
auto desc = parse_path(component_filename, schema->ks_name(), schema->cf_name());
|
|
auto sst = sstm.make_sstable(schema, s_opts, desc.generation, state, desc.version, desc.format);
|
|
|
|
auto type = desc.component;
|
|
// Don't write actual TOC. Write temp, if successful, storage::seal will rename this to actual
|
|
// TOC (see above close_and_seal).
|
|
if (type == component_type::TOC) {
|
|
type = component_type::TemporaryTOC;
|
|
}
|
|
|
|
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, cfg);
|
|
}
|
|
|
|
generation_type
|
|
generation_type::from_string(const std::string& s) {
|
|
int64_t int_value;
|
|
if (auto [ptr, ec] = std::from_chars(s.data(), s.data() + s.size(), int_value);
|
|
ec == std::errc() && ptr == s.data() + s.size()) {
|
|
return generation_type(int_value);
|
|
} else {
|
|
static const boost::regex pattern("([0-9a-z]{4})_([0-9a-z]{4})_([0-9a-z]{5})([0-9a-z]{13})");
|
|
boost::smatch match;
|
|
if (!boost::regex_match(s, match, pattern)) {
|
|
throw std::invalid_argument(fmt::format("invalid UUID: {}", s));
|
|
}
|
|
utils::UUID_gen::decimicroseconds timestamp = {};
|
|
auto decode_base36 = [](const std::string& s) {
|
|
std::size_t pos{};
|
|
auto n = std::stoull(s, &pos, 36);
|
|
if (pos != s.size()) {
|
|
throw std::invalid_argument(fmt::format("invalid part in UUID: {}", s));
|
|
}
|
|
return n;
|
|
};
|
|
timestamp += std::chrono::days{decode_base36(match[1])};
|
|
timestamp += std::chrono::seconds{decode_base36(match[2])};
|
|
timestamp += ::utils::UUID_gen::decimicroseconds{decode_base36(match[3])};
|
|
int64_t lsb = decode_base36(match[4]);
|
|
return generation_type{utils::UUID_gen::get_time_UUID_raw(timestamp, lsb)};
|
|
}
|
|
}
|
|
|
|
sstring component_name::format() const {
|
|
return sst._storage->prefix() + "/" + sst.component_basename(component);
|
|
}
|
|
|
|
future<data_sink> file_io_extension::wrap_sink(const sstable& sst, component_type c, data_sink sink) {
|
|
file dummy = create_noop_file();
|
|
auto f = co_await wrap_file(sst, c, std::move(dummy), open_flags::wo);
|
|
|
|
if (!f) {
|
|
co_return sink;
|
|
}
|
|
co_await f.close();
|
|
f = co_await wrap_file(sst, c, create_file_for_sink(std::move(sink)), open_flags::wo);
|
|
co_return co_await make_file_data_sink(std::move(f), file_output_stream_options{});
|
|
}
|
|
|
|
future<data_source> file_io_extension::wrap_source(const sstable& sst, component_type c, data_source) {
|
|
SCYLLA_ASSERT(0 && "You are not supposed to get here, file_io_extension::wrap_source() is not implemented");
|
|
}
|
|
|
|
namespace trie {
|
|
|
|
// This implementation would belong better in sstables/trie/bti_partition_index_writer.cc,
|
|
// but we put it here to have access to the `parse(...)` helpers.
|
|
future<bti_partitions_db_footer> read_bti_partitions_db_footer(const schema& s, sstable_version_types v, const seastar::file& f, uint64_t file_size) {
|
|
file_random_access_reader reader(f, file_size, default_sstable_buffer_size);
|
|
if (file_size < 24) {
|
|
throw malformed_sstable_exception(fmt::format("Partitions.db file is too small: file_size={}", file_size));
|
|
}
|
|
co_await reader.seek(file_size - 24);
|
|
uint64_t keys_position;
|
|
uint64_t partition_count;
|
|
uint64_t trie_root;
|
|
co_await parse(s, v, reader, keys_position);
|
|
co_await parse(s, v, reader, partition_count);
|
|
co_await parse(s, v, reader, trie_root);
|
|
if (trie_root >= file_size) {
|
|
throw malformed_sstable_exception(fmt::format("Partitions.db malformed: trie_root={}, file_size={}", trie_root, file_size));
|
|
}
|
|
if (keys_position >= file_size) {
|
|
throw malformed_sstable_exception(fmt::format("Partitions.db malformed: keys_position={}, file_size={}", keys_position, file_size));
|
|
}
|
|
co_await reader.seek(keys_position);
|
|
disk_string<uint16_t> first_key;
|
|
disk_string<uint16_t> last_key;
|
|
co_await parse(s, v, reader, first_key);
|
|
co_await parse(s, v, reader, last_key);
|
|
co_return trie::bti_partitions_db_footer{
|
|
.first_key = sstables::key::from_bytes(std::move(first_key.value)),
|
|
.last_key = sstables::key::from_bytes(std::move(last_key.value)),
|
|
.partition_count = partition_count,
|
|
.trie_root_position = trie_root,
|
|
};
|
|
}
|
|
|
|
} // namespace trie
|
|
|
|
} // namespace sstables
|
|
|
|
namespace seastar {
|
|
|
|
void
|
|
lw_shared_ptr_deleter<sstables::sstable>::dispose(sstables::sstable* s) {
|
|
s->unused();
|
|
}
|
|
|
|
|
|
template
|
|
sstables::sstable*
|
|
seastar::internal::lw_shared_ptr_accessors<sstables::sstable, void>::to_value(seastar::lw_shared_ptr_counter_base*);
|
|
|
|
}
|