Compare commits
21 Commits
debug_form
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f1efcff31 | ||
|
|
7673a17365 | ||
|
|
ae05d62b97 | ||
|
|
732321e3b8 | ||
|
|
a2622e1919 | ||
|
|
270bf34846 | ||
|
|
168f694c5d | ||
|
|
b5579be915 | ||
|
|
ad60d765f9 | ||
|
|
68d2086fa5 | ||
|
|
403d43093f | ||
|
|
2b1b4d1dfc | ||
|
|
827563902c | ||
|
|
ccf194bd89 | ||
|
|
9b735bb4dc | ||
|
|
f29b87970a | ||
|
|
17a76b6264 | ||
|
|
ab45df1aa1 | ||
|
|
97f0f312e0 | ||
|
|
4df6a17d30 | ||
|
|
b3dbfaf27a |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.2.0-dev
|
||||
VERSION=2025.2.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
330
compress.cc
330
compress.cc
@@ -15,6 +15,8 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "utils/reusable_buffer.hh"
|
||||
#include "sstables/compress.hh"
|
||||
#include "sstables/exceptions.hh"
|
||||
@@ -27,7 +29,7 @@
|
||||
|
||||
// SHA256
|
||||
using dict_id = std::array<std::byte, 32>;
|
||||
class sstable_compressor_factory_impl;
|
||||
class dictionary_holder;
|
||||
|
||||
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
|
||||
|
||||
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
|
||||
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
|
||||
// or referenced by algorithm-specific dicts.
|
||||
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
dict_id _id;
|
||||
std::vector<std::byte> _dict;
|
||||
public:
|
||||
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
|
||||
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
|
||||
~raw_dict();
|
||||
const std::span<const std::byte> raw() const { return _dict; }
|
||||
dict_id id() const { return _id; }
|
||||
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
|
||||
// (which internally holds a pointer to the raw dictionary blob
|
||||
// and parsed entropy tables).
|
||||
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
|
||||
public:
|
||||
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~zstd_ddict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -100,14 +102,14 @@ public:
|
||||
// so the level of compression is decided at the time of construction
|
||||
// of this dict.
|
||||
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
int _level;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
|
||||
public:
|
||||
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
~zstd_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -119,11 +121,11 @@ public:
|
||||
// and a hash index over the substrings of the blob).
|
||||
//
|
||||
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
|
||||
public:
|
||||
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~lz4_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -164,6 +166,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
class snappy_processor: public compressor {
|
||||
@@ -266,6 +269,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
|
||||
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
|
||||
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
|
||||
}
|
||||
|
||||
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
const std::string_view DICTIONARY_OPTION = ".dictionary.";
|
||||
|
||||
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
|
||||
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string compressor::name() const {
|
||||
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
|
||||
}
|
||||
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
|
||||
case algorithm::snappy: return "SnappyCompressor";
|
||||
case algorithm::zstd: return "ZstdCompressor";
|
||||
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
|
||||
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
|
||||
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
|
||||
}
|
||||
abort();
|
||||
}
|
||||
@@ -660,6 +678,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
compressor_ptr make_lz4_sstable_compressor_for_tests() {
|
||||
return std::make_unique<lz4_processor>();
|
||||
}
|
||||
@@ -751,21 +779,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
return snappy_max_compressed_length(input_len);
|
||||
}
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// across nodes.
|
||||
//
|
||||
// Holds weak pointers to all live dictionaries
|
||||
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
|
||||
// and shared (lifetime-extending) pointers to the current writer ("recommended")
|
||||
// dict for each table (so that they can be shared with new SSTables without consulting
|
||||
// `system.dicts`).
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
|
||||
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
|
||||
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
|
||||
//
|
||||
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
|
||||
// is removed from the factory.
|
||||
//
|
||||
@@ -774,10 +793,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
// Has a configurable memory budget for live dicts. If the budget is exceeded,
|
||||
// will return null dicts to new writers (to avoid making the memory usage even worse)
|
||||
// and print warnings.
|
||||
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
|
||||
class dictionary_holder : public weakly_referencable<dictionary_holder> {
|
||||
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
|
||||
shard_id _owner_shard;
|
||||
config _cfg;
|
||||
using config = default_sstable_compressor_factory::config;
|
||||
const config& _cfg;
|
||||
uint64_t _total_live_dict_memory = 0;
|
||||
metrics::metric_groups _metrics;
|
||||
struct zstd_cdict_id {
|
||||
@@ -789,7 +808,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
|
||||
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
|
||||
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
|
||||
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
|
||||
std::map<table_id, foreign_ptr<lw_shared_ptr<const raw_dict>>> _recommended;
|
||||
|
||||
size_t memory_budget() const {
|
||||
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
|
||||
@@ -806,8 +825,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
memory_budget()
|
||||
);
|
||||
}
|
||||
public:
|
||||
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (dict.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
auto id = get_sha256(dict);
|
||||
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
|
||||
return it->second->shared_from_this();
|
||||
@@ -819,7 +841,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
|
||||
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_ddict> ddict;
|
||||
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
|
||||
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
|
||||
@@ -835,15 +859,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(ddict));
|
||||
}
|
||||
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_zstd_dict_for_reading(raw, level);
|
||||
});
|
||||
}
|
||||
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
|
||||
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -859,19 +879,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_zstd_dict_for_writing(rec_it->second, level);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
using lz4_dicts = std::pair<
|
||||
foreign_ptr<lw_shared_ptr<const raw_dict>>,
|
||||
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
|
||||
@@ -879,18 +886,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
|
||||
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
|
||||
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
lw_shared_ptr<const raw_dict> ddict;
|
||||
return make_foreign(std::move(raw));
|
||||
}
|
||||
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
|
||||
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_lz4_dict_for_reading(raw);
|
||||
});
|
||||
}
|
||||
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const lz4_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -905,24 +906,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
|
||||
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_lz4_dict_for_writing(rec_it->second);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
sstable_compressor_factory_impl(config cfg)
|
||||
: _owner_shard(this_shard_id())
|
||||
, _cfg(std::move(cfg))
|
||||
dictionary_holder(const config& cfg)
|
||||
: _cfg(cfg)
|
||||
{
|
||||
if (_cfg.register_metrics) {
|
||||
namespace sm = seastar::metrics;
|
||||
@@ -931,8 +918,8 @@ public:
|
||||
});
|
||||
}
|
||||
}
|
||||
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
|
||||
~sstable_compressor_factory_impl() {
|
||||
dictionary_holder(dictionary_holder&&) = delete;
|
||||
~dictionary_holder() {
|
||||
// Note: `_recommended` might be the only thing keeping some dicts alive,
|
||||
// so clearing it will destroy them.
|
||||
//
|
||||
@@ -948,39 +935,36 @@ public:
|
||||
_recommended.clear();
|
||||
}
|
||||
void forget_raw_dict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_raw_dicts.erase(id);
|
||||
}
|
||||
void forget_zstd_cdict(dict_id id, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_cdicts.erase({id, level});
|
||||
}
|
||||
void forget_zstd_ddict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_ddicts.erase(id);
|
||||
}
|
||||
void forget_lz4_cdict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_lz4_cdicts.erase(id);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
|
||||
return smp::submit_to(_owner_shard, [this, t, dict] {
|
||||
_recommended.erase(t);
|
||||
if (dict.size()) {
|
||||
auto canonical_ptr = get_canonical_ptr(dict);
|
||||
_recommended.emplace(t, canonical_ptr);
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict.size(), fmt_hex(canonical_ptr->id()));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
});
|
||||
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
|
||||
_recommended.erase(t);
|
||||
if (dict) {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict->raw().size(), fmt_hex(dict->id()));
|
||||
_recommended.emplace(t, std::move(dict));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
}
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it == _recommended.end()) {
|
||||
co_return nullptr;
|
||||
}
|
||||
co_return co_await rec_it->second.copy();
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
|
||||
void account_memory_delta(ssize_t n) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
|
||||
compressor_factory_logger.error(
|
||||
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
|
||||
@@ -990,19 +974,85 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
|
||||
: _cfg(std::move(cfg))
|
||||
, _holder(std::make_unique<dictionary_holder>(_cfg))
|
||||
{
|
||||
for (shard_id i = 0; i < smp::count; ++i) {
|
||||
auto numa_id = _cfg.numa_config[i];
|
||||
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
|
||||
_numa_groups[numa_id].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
|
||||
const auto params = s->get_compressor_params();
|
||||
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
|
||||
}
|
||||
|
||||
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
|
||||
auto sp = local_engine->smp().shard_to_numa_node_mapping();
|
||||
return std::vector<unsigned>(sp.begin(), sp.end());
|
||||
}
|
||||
|
||||
unsigned default_sstable_compressor_factory::local_numa_id() {
|
||||
return _cfg.numa_config[this_shard_id()];
|
||||
}
|
||||
|
||||
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
|
||||
auto hash = read_unaligned<uint64_t>(sha.data());
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
|
||||
}
|
||||
return group[hash % group.size()];
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
|
||||
if (_leader_shard != this_shard_id()) {
|
||||
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
|
||||
}
|
||||
auto units = co_await get_units(_recommendation_setting_sem, 1);
|
||||
auto sha = get_sha256(dict);
|
||||
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
continue;
|
||||
}
|
||||
auto r = get_dict_owner(numa_id, sha);
|
||||
auto d = co_await container().invoke_on(r, [dict](self& local) {
|
||||
return make_foreign(local._holder->get_canonical_ptr(dict));
|
||||
});
|
||||
auto local_coordinator = group[0];
|
||||
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
|
||||
local._holder->set_recommended_dict(t, std::move(d));
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
|
||||
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
|
||||
const auto local_coordinator = _numa_groups[local_numa_id()][0];
|
||||
return container().invoke_on(local_coordinator, [t](self& local) {
|
||||
return local._holder->get_recommended_dict(t);
|
||||
});
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_lz4_dicts_for_writing(s->id())
|
||||
: nullptr;
|
||||
holder::foreign_lz4_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_lz4_dict_for_writing(recommended.release());
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1015,9 +1065,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
case algorithm::zstd:
|
||||
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
|
||||
: nullptr;
|
||||
holder::foreign_zstd_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1029,17 +1083,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
abort();
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
|
||||
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
|
||||
return make_compressor_for_writing_impl(params, id);
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_lz4_dict_for_reading(std::move(d));
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1054,8 +1119,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
}
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1067,7 +1137,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
abort();
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
auto dict = dict_from_options(c);
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
return make_compressor_for_reading_impl(params, dict);
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _id(key)
|
||||
, _dict(dict.begin(), dict.end())
|
||||
@@ -1082,7 +1164,7 @@ raw_dict::~raw_dict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _level(level)
|
||||
@@ -1114,7 +1196,7 @@ zstd_cdict::~zstd_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _alloc([this] (ssize_t n) {
|
||||
@@ -1143,7 +1225,7 @@ zstd_ddict::~zstd_ddict() {
|
||||
}
|
||||
}
|
||||
|
||||
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _dict(LZ4_createStream(), LZ4_freeStream)
|
||||
@@ -1162,6 +1244,28 @@ lz4_cdict::~lz4_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
|
||||
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
struct wrapper : sstable_compressor_factory {
|
||||
using impl = default_sstable_compressor_factory;
|
||||
sharded<impl> _impl;
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
|
||||
return _impl.local().make_compressor_for_writing(s);
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
|
||||
return _impl.local().make_compressor_for_reading(c);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
|
||||
return _impl.local().set_recommended_dict(t, d);
|
||||
};
|
||||
wrapper(wrapper&&) = delete;
|
||||
wrapper() {
|
||||
_impl.start().get();
|
||||
}
|
||||
~wrapper() {
|
||||
_impl.stop().get();
|
||||
}
|
||||
};
|
||||
return std::make_unique<wrapper>();
|
||||
}
|
||||
|
||||
|
||||
@@ -64,6 +64,8 @@ public:
|
||||
|
||||
virtual algorithm get_algorithm() const = 0;
|
||||
|
||||
virtual std::optional<unsigned> get_dict_owner_for_test() const;
|
||||
|
||||
using ptr_type = std::unique_ptr<compressor>;
|
||||
};
|
||||
|
||||
|
||||
@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/secondary_index_test.cc',
|
||||
'test/boost/sessions_test.cc',
|
||||
'test/boost/sstable_compaction_test.cc',
|
||||
'test/boost/sstable_compressor_factory_test.cc',
|
||||
'test/boost/sstable_directory_test.cc',
|
||||
'test/boost/sstable_set_test.cc',
|
||||
'test/boost/statement_restrictions_test.cc',
|
||||
|
||||
15
init.cc
15
init.cc
@@ -13,6 +13,7 @@
|
||||
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
|
||||
logging::logger startlog("init");
|
||||
|
||||
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
|
||||
std::any service_set::find(const std::type_info& type) const {
|
||||
return _impl->find(type);
|
||||
}
|
||||
|
||||
// Placed here to avoid dependency on db::config in compress.cc,
|
||||
// where the rest of default_sstable_compressor_factory_config is.
|
||||
auto default_sstable_compressor_factory_config::from_db_config(
|
||||
const db::config& cfg,
|
||||
std::span<const unsigned> numa_config) -> self
|
||||
{
|
||||
return self {
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
|
||||
.numa_config{numa_config.begin(), numa_config.end()},
|
||||
};
|
||||
}
|
||||
|
||||
14
main.cc
14
main.cc
@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
|
||||
langman.invoke_on_all(&lang::manager::start).get();
|
||||
|
||||
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
|
||||
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
|
||||
std::cref(*cfg), std::cref(numa_groups))).get();
|
||||
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
|
||||
sstable_compressor_factory.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting database");
|
||||
|
||||
debug::the_database = &db;
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
|
||||
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto sstables_prefix = std::string_view("sstables/");
|
||||
if (name.starts_with(sstables_prefix)) {
|
||||
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
|
||||
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
|
||||
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
|
||||
} else if (name == dictionary_service::rpc_compression_dict_name) {
|
||||
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
|
||||
}
|
||||
|
||||
@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
switch (rs.state) {
|
||||
case node_state::normal: {
|
||||
if (is_me(ip)) {
|
||||
if (is_me(id)) {
|
||||
co_return;
|
||||
}
|
||||
// In replace-with-same-ip scenario the replaced node IP will be the same
|
||||
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
auto old_ip = it->second;
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
|
||||
|
||||
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
|
||||
if (prev_ip == endpoint) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (_address_map.find(id) != endpoint) {
|
||||
// Address map refused to update IP for the host_id,
|
||||
// this means prev_ip has higher generation than endpoint.
|
||||
// We can immediately remove endpoint from gossiper
|
||||
// since it represents an old IP (before an IP change)
|
||||
// for the given host_id. This is not strictly
|
||||
// necessary, but it reduces the noise circulated
|
||||
// in gossiper messages and allows for clearer
|
||||
// expectations of the gossiper state in tests.
|
||||
|
||||
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
|
||||
// Do not update address.
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
|
||||
if (_ss.raft_topology_change_enabled()) {
|
||||
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
|
||||
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
|
||||
|
||||
co_await _gossiper.start_gossiping(new_generation, app_states);
|
||||
|
||||
utils::get_local_injector().inject("stop_after_starting_gossiping",
|
||||
|
||||
@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
|
||||
const compression_parameters& params,
|
||||
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
|
||||
) {
|
||||
auto factory = make_sstable_compressor_factory();
|
||||
co_await factory->set_recommended_dict(initial_schema->id(), dict);
|
||||
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
|
||||
sharded<default_sstable_compressor_factory> factory;
|
||||
co_await factory.start();
|
||||
std::exception_ptr ex;
|
||||
float result;
|
||||
try {
|
||||
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
|
||||
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await factory.stop();
|
||||
if (ex) {
|
||||
co_return coroutine::exception(ex);
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
@@ -10,20 +10,89 @@
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "compress.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <span>
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
} // namespace db
|
||||
|
||||
struct dictionary_holder;
|
||||
class raw_dict;
|
||||
|
||||
struct sstable_compressor_factory {
|
||||
virtual ~sstable_compressor_factory() {}
|
||||
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
|
||||
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
|
||||
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
|
||||
struct config {
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
};
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
|
||||
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
|
||||
// because then the compiler gives weird complains about default member initializers in line
|
||||
// ```
|
||||
// default_sstable_compressor_factory(config = config{});
|
||||
// ```
|
||||
// apparently due to some compiler bug related default initializers.
|
||||
struct default_sstable_compressor_factory_config {
|
||||
using self = default_sstable_compressor_factory_config;
|
||||
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
|
||||
|
||||
static default_sstable_compressor_factory_config from_db_config(
|
||||
const db::config&,
|
||||
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
|
||||
};
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// between all shards within the same NUMA group.
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// decided by a content hash of the dictionary.
|
||||
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
|
||||
// to that dict.
|
||||
//
|
||||
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
|
||||
// per an opening of an SSTable).
|
||||
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
|
||||
using holder = dictionary_holder;
|
||||
public:
|
||||
using self = default_sstable_compressor_factory;
|
||||
using config = default_sstable_compressor_factory_config;
|
||||
private:
|
||||
config _cfg;
|
||||
// Maps NUMA node ID to the array of shards on that node.
|
||||
std::vector<std::vector<shard_id>> _numa_groups;
|
||||
// Holds dictionaries owned by this shard.
|
||||
std::unique_ptr<dictionary_holder> _holder;
|
||||
// All recommended dictionary updates are serialized by a single "leader shard".
|
||||
// We do this to avoid dealing with concurrent updates altogether.
|
||||
semaphore _recommendation_setting_sem{1};
|
||||
constexpr static shard_id _leader_shard = 0;
|
||||
|
||||
private:
|
||||
using sha256_type = std::array<std::byte, 32>;
|
||||
unsigned local_numa_id();
|
||||
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
|
||||
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
|
||||
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
|
||||
public:
|
||||
default_sstable_compressor_factory(config = config{});
|
||||
~default_sstable_compressor_factory();
|
||||
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
|
||||
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();
|
||||
|
||||
@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
|
||||
// prepare tasks
|
||||
set_state(stream_session_state::PREPARING);
|
||||
auto& db = manager().db();
|
||||
for (auto& request : requests) {
|
||||
// always flush on stream request
|
||||
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
|
||||
const auto& ks = request.keyspace;
|
||||
// Make sure cf requested by peer node exists
|
||||
for (auto& cf : request.column_families) {
|
||||
try {
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
for (auto& summary : summaries) {
|
||||
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
|
||||
auto cf_id = summary.cf_id;
|
||||
// Make sure cf the peer node will send to us exists
|
||||
try {
|
||||
db.find_column_family(cf_id);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
}
|
||||
|
||||
|
||||
@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
|
||||
secondary_index_test.cc
|
||||
sessions_test.cc
|
||||
sstable_compaction_test.cc
|
||||
sstable_compressor_factory_test.cc
|
||||
sstable_directory_test.cc
|
||||
sstable_set_test.cc
|
||||
statement_restrictions_test.cc
|
||||
|
||||
@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
return sstables::test_env::do_with([] (sstables::test_env& env) {
|
||||
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("v", bytes_type)
|
||||
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
|
||||
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
return seastar::async([&env, s, &cf] {
|
||||
// populate
|
||||
auto new_key = [&] {
|
||||
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
|
||||
flushed.get();
|
||||
});
|
||||
}).then([cf_stats] {});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
|
||||
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
|
||||
}
|
||||
|
||||
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
|
||||
const schema& s = *expected.schema();
|
||||
auto expected_cont = expected.entry().squashed_continuity(s);
|
||||
auto actual_cont = actual.get_continuity(s);
|
||||
bool actual_static_cont = actual.static_row_continuous();
|
||||
bool expected_static_cont = expected.squashed().static_row_continuous();
|
||||
if (actual_static_cont != expected_static_cont) {
|
||||
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
if (!expected_cont.equals(s, actual_cont)) {
|
||||
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
// Tests that reading many versions using a cursor gives the logical mutation back.
|
||||
return seastar::async([] {
|
||||
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
auto snap = e.read();
|
||||
auto actual = read_using_cursor(*snap);
|
||||
|
||||
assert_that(s, actual).has_same_continuity(expected);
|
||||
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
|
||||
// Note: squashed continuity of an entry is slightly different than the continuity
|
||||
// of a squashed entry.
|
||||
//
|
||||
// Squashed continuity is the union of continuities of all versions in the entry,
|
||||
// and in particular it includes empty dummy rows resulting in the logical merge
|
||||
// of version.
|
||||
// The process of actually squashing an entry is allowed to
|
||||
// remove those empty dummies, so the squashed entry can have slightly
|
||||
// smaller continuity.
|
||||
//
|
||||
// Since a cursor isn't allowed to remove dummy rows, the strongest test
|
||||
// we can do here is to compare the continuity of the cursor-read mutation
|
||||
// with the squashed continuity of the entry.
|
||||
assert_has_same_squashed_continuity(actual, e);
|
||||
assert_that(s, actual).is_equal_to_compacted(expected);
|
||||
|
||||
// Reversed iteration
|
||||
|
||||
@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
|
||||
auto ts_in_ms = std::chrono::milliseconds(ts);
|
||||
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
|
||||
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
|
||||
BOOST_REQUIRE(ret.second == expected);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2319,6 +2318,7 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
uint32_t _seed;
|
||||
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
|
||||
@@ -2336,7 +2336,7 @@ public:
|
||||
compress))
|
||||
, _random_schema(_seed, *_random_schema_spec)
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
testlog.info("random_schema: {}", _random_schema.cql());
|
||||
}
|
||||
|
||||
@@ -2402,12 +2402,14 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
|
||||
public:
|
||||
scrub_test_framework()
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
}
|
||||
|
||||
~scrub_test_framework() {
|
||||
|
||||
133
test/boost/sstable_compressor_factory_test.cc
Normal file
133
test/boost/sstable_compressor_factory_test.cc
Normal file
@@ -0,0 +1,133 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
|
||||
// 1. Create a random message.
|
||||
// 2. Set this random message as the recommended dict.
|
||||
// 3. On all shards, create compressors.
|
||||
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
|
||||
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
|
||||
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
|
||||
// 7. Repeat this a few times for both lz4 and zstd.
|
||||
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
|
||||
|
||||
// Create a compressor factory.
|
||||
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
|
||||
auto config = default_sstable_compressor_factory::config{
|
||||
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
|
||||
};
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
sstable_compressor_factory.start(std::cref(config)).get();
|
||||
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
|
||||
|
||||
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
|
||||
auto table = table_id::create_random_id();
|
||||
|
||||
// Retry a few times just to check that it works more than once.
|
||||
for (int retry = 0; retry < 3; ++retry) {
|
||||
// Generate a random (and hence uhcompressible without a dict) message.
|
||||
auto message = tests::random::get_sstring(4096);
|
||||
auto dict_view = std::as_bytes(std::span(message));
|
||||
// Set the message as the dict to make the message perfectly compressible.
|
||||
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
|
||||
|
||||
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
|
||||
std::vector<unsigned> compressor_numa_nodes(smp::count);
|
||||
std::vector<unsigned> decompressor_numa_nodes(smp::count);
|
||||
|
||||
// Try for both algorithms, just in case there are some differences in how dictionary
|
||||
// distribution over shards is implemented between them.
|
||||
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
|
||||
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
|
||||
// Validate that the dictionaries work as intended,
|
||||
// and check that their owner is as expected.
|
||||
|
||||
auto params = compression_parameters(algo);
|
||||
|
||||
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
|
||||
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
|
||||
|
||||
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
|
||||
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
|
||||
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
|
||||
|
||||
// Check that the dictionary used by this shard lies on the same NUMA node.
|
||||
// This is important to avoid cross-node memory accesses on the hot path.
|
||||
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
|
||||
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
|
||||
|
||||
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
|
||||
auto output_max_size = compressor->compress_max_size(message.size());
|
||||
auto compressed = std::vector<char>(output_max_size);
|
||||
auto compressed_size = compressor->compress(
|
||||
reinterpret_cast<const char*>(message.data()), message.size(),
|
||||
reinterpret_cast<char*>(compressed.data()), compressed.size());
|
||||
BOOST_REQUIRE_GE(compressed_size, 0);
|
||||
compressed.resize(compressed_size);
|
||||
|
||||
// Validate that the recommeded dict was actually used.
|
||||
BOOST_CHECK(compressed.size() < message.size() / 10);
|
||||
|
||||
auto decompressed = std::vector<char>(message.size());
|
||||
auto decompressed_size = decompressor->uncompress(
|
||||
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
|
||||
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
|
||||
BOOST_REQUIRE_GE(decompressed_size, 0);
|
||||
decompressed.resize(decompressed_size);
|
||||
|
||||
// Validate that the roundtrip through compressor and decompressor
|
||||
// resulted in the original message.
|
||||
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
|
||||
})).get();
|
||||
}
|
||||
|
||||
// Check that the number of owners (and hence, copies) is equal to the number
|
||||
// of NUMA nodes.
|
||||
// This isn't that important, but we don't want to duplicate dictionaries
|
||||
// within a NUMA node unnecessarily.
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
|
||||
{
|
||||
std::vector<unsigned> one_numa_node(smp::count);
|
||||
test_one_numa_topology(one_numa_node);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> two_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
|
||||
two_numa_nodes[i] = i % 2;
|
||||
}
|
||||
test_one_numa_topology(two_numa_nodes);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> n_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
|
||||
n_numa_nodes[i] = i;
|
||||
}
|
||||
test_one_numa_topology(n_numa_nodes);
|
||||
}
|
||||
}
|
||||
@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto builder = schema_builder("test", "summary_test")
|
||||
.with_column("a", int32_type, column_kind::partition_key);
|
||||
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
|
||||
BOOST_REQUIRE(list.size() == 130);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
// d int,
|
||||
// e blob
|
||||
//);
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
|
||||
auto builder = schema_builder("test", "test_multi_schema")
|
||||
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
BOOST_REQUIRE(!m);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
BOOST_REQUIRE(insert(2, 2) == true);
|
||||
BOOST_REQUIRE(insert(5, 5) == true);
|
||||
BOOST_REQUIRE(run.all().size() == 5);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
|
||||
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
// please note, the SSTables used by this test are stored under
|
||||
|
||||
@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
|
||||
# IP-s before they are send back to servers[1] and servers[2],
|
||||
# and the mentioned above code is not exercised by this test.
|
||||
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
|
||||
# sleep_before_start_gossiping injections are needed to reproduce #22777
|
||||
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_start(servers[1].server_id)
|
||||
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
|
||||
if build_mode != 'release':
|
||||
|
||||
@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
|
||||
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
|
||||
|
||||
def execute_with_tracing(cql, statement, *args, **kwargs):
|
||||
kwargs['trace'] = True
|
||||
query_result = cql.execute(statement, *args, **kwargs)
|
||||
|
||||
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
|
||||
page_traces = []
|
||||
for trace in tracing:
|
||||
trace_events = []
|
||||
for event in trace.events:
|
||||
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
|
||||
page_traces.append("\n".join(trace_events))
|
||||
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
|
||||
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
|
||||
" WITH speculative_retry = 'NONE'"
|
||||
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
" AND compaction = {'class': 'NullCompactionStrategy'}")
|
||||
|
||||
for write_statement, delete_statement in statement_pairs:
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
|
||||
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
|
||||
await manager.api.disable_injection(node3.ip_addr, "database_apply")
|
||||
|
||||
def check_data(host, data):
|
||||
|
||||
@@ -15,9 +15,10 @@ from cassandra.cluster import ConsistencyLevel, Session # type: ignore
|
||||
from cassandra.query import SimpleStatement # type: ignore
|
||||
from cassandra.pool import Host # type: ignore
|
||||
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
|
||||
|
||||
@@ -309,13 +310,13 @@ async def test_incremental_read_repair(data_class: DataClass, manager: ManagerCl
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_read_repair_with_trace_logging(request, manager):
|
||||
logger.info("Creating a new cluster")
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace"]
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace:debug_error_injection=trace"]
|
||||
config = {"read_request_timeout_in_ms": 60000}
|
||||
|
||||
for i in range(2):
|
||||
await manager.server_add(cmdline=cmdline, config=config)
|
||||
[node1, node2] = await manager.servers_add(2, cmdline=cmdline, config=config)
|
||||
|
||||
cql = manager.get_cql()
|
||||
srvs = await manager.running_servers()
|
||||
@@ -326,13 +327,15 @@ async def test_read_repair_with_trace_logging(request, manager):
|
||||
|
||||
await cql.run_async(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 0)")
|
||||
|
||||
await manager.server_stop(srvs[0].server_id)
|
||||
prepared = cql.prepare(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)")
|
||||
prepared.consistency_level = ConsistencyLevel.ONE
|
||||
await cql.run_async(prepared)
|
||||
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=True)
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)", consistency_level = ConsistencyLevel.ONE))
|
||||
|
||||
await manager.server_start(srvs[0].server_id)
|
||||
tracing = execute_with_tracing(cql, SimpleStatement(f"SELECT * FROM {ks}.t WHERE pk = 0", consistency_level = ConsistencyLevel.ALL), log = True)
|
||||
|
||||
prepared = cql.prepare(f"SELECT * FROM {ks}.t WHERE pk = 0")
|
||||
prepared.consistency_level = ConsistencyLevel.ALL
|
||||
await cql.run_async(prepared)
|
||||
assert len(tracing) == 1 # 1 page
|
||||
|
||||
found_read_repair = False
|
||||
for event in tracing[0]:
|
||||
found_read_repair |= "digest mismatch, starting read repair" == event.description
|
||||
|
||||
assert found_read_repair
|
||||
|
||||
@@ -346,13 +346,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
LZ4WithDictsCompressor and ZstdWithDictsCompressor
|
||||
to the Cassandra-compatible LZ4Compressor and ZstdCompressor.
|
||||
"""
|
||||
servers = await manager.servers_add(1)
|
||||
servers = await manager.servers_add(1, [
|
||||
*common_debug_cli_options,
|
||||
])
|
||||
|
||||
# Create keyspace and table
|
||||
logger.info("Creating tables")
|
||||
cql = manager.get_cql()
|
||||
|
||||
algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
|
||||
dict_algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
|
||||
nondict_algorithms = ['Snappy', 'LZ4', 'Deflate', 'Zstd']
|
||||
algorithms = dict_algorithms + nondict_algorithms
|
||||
no_compression = 'NoCompression'
|
||||
all_tables = dict_algorithms + nondict_algorithms + [no_compression]
|
||||
|
||||
await cql.run_async("""
|
||||
CREATE KEYSPACE test
|
||||
@@ -363,14 +369,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
CREATE TABLE test."{algo}" (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{algo}Compressor'}};
|
||||
''')
|
||||
for algo in algorithms
|
||||
])
|
||||
for algo in algorithms],
|
||||
cql.run_async(f'''
|
||||
CREATE TABLE test."{no_compression}" (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{}}
|
||||
''')
|
||||
)
|
||||
|
||||
|
||||
# Populate data with
|
||||
blob = random.randbytes(16*1024);
|
||||
logger.info("Populating table")
|
||||
n_blobs = 100
|
||||
for algo in algorithms:
|
||||
for algo in all_tables:
|
||||
insert = cql.prepare(f'''INSERT INTO test."{algo}" (pk, c) VALUES (?, ?);''')
|
||||
insert.consistency_level = ConsistencyLevel.ALL;
|
||||
for pks in itertools.batched(range(n_blobs), n=100):
|
||||
@@ -381,7 +392,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
|
||||
async def validate_select():
|
||||
cql = manager.get_cql()
|
||||
for algo in algorithms:
|
||||
for algo in all_tables:
|
||||
select = cql.prepare(f'''SELECT c FROM test."{algo}" WHERE pk = ? BYPASS CACHE;''')
|
||||
results = await cql.run_async(select, [42])
|
||||
assert results[0][0] == blob
|
||||
@@ -424,7 +435,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
names = set()
|
||||
for table_info in sstable_info:
|
||||
for sstable in table_info["sstables"]:
|
||||
for prop in sstable["extended_properties"]:
|
||||
for prop in sstable.get("extended_properties", []):
|
||||
if prop["group"] == "compression_parameters":
|
||||
for attr in prop["attributes"]:
|
||||
if attr["key"] == "sstable_compression":
|
||||
@@ -433,18 +444,24 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
|
||||
await asyncio.gather(*[
|
||||
manager.api.retrain_dict(servers[0].ip_addr, "test", algo)
|
||||
for algo in algorithms
|
||||
for algo in all_tables
|
||||
])
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
|
||||
|
||||
name_prefix = "org.apache.cassandra.io.compress."
|
||||
|
||||
for algo in algorithms:
|
||||
for algo in dict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {f"{algo}Compressor"}
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_enable_writing', "false")
|
||||
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
|
||||
|
||||
name_prefix = "org.apache.cassandra.io.compress."
|
||||
assert (await get_compressor_names("LZ4WithDicts")) == {name_prefix + "LZ4Compressor"}
|
||||
assert (await get_compressor_names("ZstdWithDicts")) == {name_prefix + "ZstdCompressor"}
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
@@ -128,7 +128,7 @@ public:
|
||||
static constexpr std::string_view ks_name = "ks";
|
||||
static std::atomic<bool> active;
|
||||
private:
|
||||
std::unique_ptr<sstable_compressor_factory> _scf;
|
||||
sharded<default_sstable_compressor_factory> _scf;
|
||||
sharded<replica::database> _db;
|
||||
sharded<gms::feature_service> _feature_service;
|
||||
sharded<sstables::storage_manager> _sstm;
|
||||
@@ -657,10 +657,14 @@ private:
|
||||
auto stop_lang_manager = defer_verbose_shutdown("lang manager", [this] { _lang_manager.stop().get(); });
|
||||
_lang_manager.invoke_on_all(&lang::manager::start).get();
|
||||
|
||||
_scf = make_sstable_compressor_factory();
|
||||
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
|
||||
_scf.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config, std::cref(*cfg), std::cref(numa_groups))).get();
|
||||
auto stop_scf = defer_verbose_shutdown("sstable_compressor_factory", [this] {
|
||||
_scf.stop().get();
|
||||
});
|
||||
|
||||
_db_config = &*cfg;
|
||||
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(*_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
|
||||
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
|
||||
auto stop_db = defer_verbose_shutdown("database", [this] {
|
||||
_db.stop().get();
|
||||
});
|
||||
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
|
||||
void maybe_start_compaction_manager(bool enable = true);
|
||||
|
||||
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
|
||||
explicit test_env(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
|
||||
~test_env();
|
||||
test_env(test_env&&) noexcept;
|
||||
|
||||
@@ -176,15 +176,6 @@ public:
|
||||
|
||||
replica::table::config make_table_config();
|
||||
|
||||
template <typename Func>
|
||||
static inline auto do_with(Func&& func, test_env_config cfg = {}) {
|
||||
return seastar::do_with(test_env(std::move(cfg)), [func = std::move(func)] (test_env& env) mutable {
|
||||
return futurize_invoke(func, env).finally([&env] {
|
||||
return env.stop();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg = {});
|
||||
|
||||
static future<> do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func);
|
||||
@@ -192,7 +183,8 @@ public:
|
||||
template <typename T>
|
||||
static future<T> do_with_async_returning(noncopyable_function<T (test_env&)> func) {
|
||||
return seastar::async([func = std::move(func)] {
|
||||
test_env env;
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env({}, *scf);
|
||||
auto stop = defer([&] { env.stop().get(); });
|
||||
return func(env);
|
||||
});
|
||||
|
||||
@@ -201,7 +201,7 @@ struct test_env::impl {
|
||||
::cache_tracker cache_tracker;
|
||||
gms::feature_service feature_service;
|
||||
db::nop_large_data_handler nop_ld_handler;
|
||||
std::unique_ptr<sstable_compressor_factory> scf;
|
||||
sstable_compressor_factory& scf;
|
||||
test_env_sstables_manager mgr;
|
||||
std::unique_ptr<test_env_compaction_manager> cmgr;
|
||||
reader_concurrency_semaphore semaphore;
|
||||
@@ -210,7 +210,7 @@ struct test_env::impl {
|
||||
data_dictionary::storage_options storage;
|
||||
abort_source abort;
|
||||
|
||||
impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir);
|
||||
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
|
||||
impl(impl&&) = delete;
|
||||
impl(const impl&) = delete;
|
||||
|
||||
@@ -219,16 +219,16 @@ struct test_env::impl {
|
||||
}
|
||||
};
|
||||
|
||||
test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir)
|
||||
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
|
||||
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
|
||||
, dir(tdir == nullptr ? local_dir.value() : *tdir)
|
||||
, db_config(make_db_config(dir.path().native(), cfg.storage))
|
||||
, dir_sem(1)
|
||||
, feature_service(gms::feature_config_from_db_config(*db_config))
|
||||
, scf(make_sstable_compressor_factory())
|
||||
, scf(scfarg)
|
||||
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
|
||||
feature_service, cache_tracker, cfg.available_memory, dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()]{ return host_id; }, *scf, abort, current_scheduling_group(), sstm)
|
||||
[host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm)
|
||||
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
|
||||
, use_uuid(cfg.use_uuid)
|
||||
, storage(std::move(cfg.storage))
|
||||
@@ -242,8 +242,8 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdi
|
||||
}
|
||||
}
|
||||
|
||||
test_env::test_env(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tmp)
|
||||
: _impl(std::make_unique<impl>(std::move(cfg), sstm, tmp))
|
||||
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
|
||||
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -325,7 +325,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
|
||||
sharded<sstables::storage_manager> sstm;
|
||||
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
|
||||
auto stop_sstm = defer([&] { sstm.stop().get(); });
|
||||
test_env env(std::move(cfg), &sstm.local());
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env(std::move(cfg), *scf, &sstm.local());
|
||||
auto close_env = defer([&] { env.stop().get(); });
|
||||
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
|
||||
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
|
||||
@@ -334,7 +335,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
|
||||
}
|
||||
|
||||
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
|
||||
test_env env(std::move(cfg));
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env(std::move(cfg), *scf);
|
||||
auto close_env = defer([&] { env.stop().get(); });
|
||||
func(env);
|
||||
});
|
||||
@@ -476,7 +478,8 @@ test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)>
|
||||
return seastar::async([func = std::move(func)] {
|
||||
tmpdir tdir;
|
||||
sharded<test_env> env;
|
||||
env.start(test_env_config{}, nullptr, &tdir).get();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
|
||||
auto stop = defer([&] { env.stop().get(); });
|
||||
func(env);
|
||||
});
|
||||
|
||||
@@ -144,7 +144,8 @@ int scylla_sstable_main(int argc, char** argv) {
|
||||
}
|
||||
cfg.compaction_strategy = sstables::compaction_strategy::type(app.configuration()["compaction-strategy"].as<sstring>());
|
||||
cfg.timestamp_range = app.configuration()["timestamp-range"].as<api::timestamp_type>();
|
||||
test.start(std::move(cfg)).get();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test.start(std::move(cfg), std::ref(*scf)).get();
|
||||
auto stop_test = deferred_stop(test);
|
||||
|
||||
switch (mode) {
|
||||
|
||||
@@ -195,7 +195,9 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
perf_sstable_test_env(conf cfg) : _cfg(std::move(cfg))
|
||||
perf_sstable_test_env(conf cfg, sstable_compressor_factory& scf)
|
||||
: _env({}, scf)
|
||||
, _cfg(std::move(cfg))
|
||||
, s(create_schema(cfg.compaction_strategy))
|
||||
, _distribution('@', '~')
|
||||
, _mt(make_lw_shared<replica::memtable>(s))
|
||||
|
||||
@@ -24,6 +24,7 @@ from typing import Optional, TypeVar, Any
|
||||
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import Statement # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
||||
|
||||
from test import BUILD_DIR, TOP_SRC_DIR
|
||||
@@ -315,3 +316,29 @@ async def gather_safely(*awaitables: Awaitable):
|
||||
|
||||
def get_xdist_worker_id() -> str | None:
|
||||
return os.environ.get("PYTEST_XDIST_WORKER")
|
||||
|
||||
|
||||
def execute_with_tracing(cql : Session, statement : str | Statement, log : bool = False, *cql_execute_extra_args, **cql_execute_extra_kwargs):
|
||||
""" Execute statement via cql session and log the tracing output. """
|
||||
|
||||
cql_execute_extra_kwargs['trace'] = True
|
||||
query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs)
|
||||
|
||||
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
|
||||
|
||||
ret = []
|
||||
page_traces = []
|
||||
for trace in tracing:
|
||||
ret.append(trace.events)
|
||||
if not log:
|
||||
continue
|
||||
|
||||
trace_events = []
|
||||
for event in trace.events:
|
||||
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
|
||||
page_traces.append("\n".join(trace_events))
|
||||
|
||||
if log:
|
||||
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
|
||||
|
||||
return ret
|
||||
|
||||
@@ -49,7 +49,7 @@ tools::tablets_t do_load_system_tablets(const db::config& dbcfg,
|
||||
std::string_view table_name,
|
||||
reader_permit permit) {
|
||||
sharded<sstable_manager_service> sst_man;
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
|
||||
auto stop_sst_man_service = deferred_stop(sst_man);
|
||||
|
||||
|
||||
@@ -396,7 +396,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
|
||||
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_semaphore = deferred_stop(rcs_sem);
|
||||
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<sstable_manager_service> sst_man;
|
||||
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
|
||||
auto stop_sst_man_service = deferred_stop(sst_man);
|
||||
@@ -500,7 +500,7 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem:
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem(1);
|
||||
abort_source abort;
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker,
|
||||
memory::stats().total_memory(), dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort);
|
||||
|
||||
@@ -3559,7 +3559,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
}
|
||||
|
||||
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem(1);
|
||||
abort_source abort;
|
||||
|
||||
Reference in New Issue
Block a user