mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-27 20:05:10 +00:00
Compare commits
61 Commits
copilot/fi
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19513fa47e | ||
|
|
dec10d348e | ||
|
|
ffea5e67c1 | ||
|
|
bcbbc40026 | ||
|
|
70d9352cec | ||
|
|
e215350c61 | ||
|
|
59bf300e83 | ||
|
|
6d733051de | ||
|
|
24c134992b | ||
|
|
9247c9472a | ||
|
|
ab8d50b5e7 | ||
|
|
7986ef73da | ||
|
|
847504ad25 | ||
|
|
854587c10c | ||
|
|
9058d5658b | ||
|
|
dd9ec03323 | ||
|
|
eef6a95e26 | ||
|
|
9f2a13c8c2 | ||
|
|
4792a27396 | ||
|
|
f26c2b22dc | ||
|
|
163b65cec4 | ||
|
|
fcde30d2b0 | ||
|
|
26bd28dac9 | ||
|
|
6f1efcff31 | ||
|
|
204f9e2cc8 | ||
|
|
0c6a449a30 | ||
|
|
7673a17365 | ||
|
|
ae05d62b97 | ||
|
|
5c5911d874 | ||
|
|
6a2e52d250 | ||
|
|
f98c83b92f | ||
|
|
f5cf4a3893 | ||
|
|
12f0136b26 | ||
|
|
4e45ceda21 | ||
|
|
2c8b5143ba | ||
|
|
474de0f048 | ||
|
|
5ac07a6c72 | ||
|
|
f88d8edcaf | ||
|
|
05c70b0820 | ||
|
|
732321e3b8 | ||
|
|
a2622e1919 | ||
|
|
270bf34846 | ||
|
|
168f694c5d | ||
|
|
b5579be915 | ||
|
|
ad60d765f9 | ||
|
|
68d2086fa5 | ||
|
|
403d43093f | ||
|
|
2b1b4d1dfc | ||
|
|
a5b513dde7 | ||
|
|
2c431c1ea2 | ||
|
|
827563902c | ||
|
|
ccf194bd89 | ||
|
|
9b735bb4dc | ||
|
|
f29b87970a | ||
|
|
82ca17e70d | ||
|
|
ddf9d047db | ||
|
|
17a76b6264 | ||
|
|
ab45df1aa1 | ||
|
|
97f0f312e0 | ||
|
|
4df6a17d30 | ||
|
|
b3dbfaf27a |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.2.0-dev
|
||||
VERSION=2025.2.0-rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
330
compress.cc
330
compress.cc
@@ -15,6 +15,8 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "utils/reusable_buffer.hh"
|
||||
#include "sstables/compress.hh"
|
||||
#include "sstables/exceptions.hh"
|
||||
@@ -27,7 +29,7 @@
|
||||
|
||||
// SHA256
|
||||
using dict_id = std::array<std::byte, 32>;
|
||||
class sstable_compressor_factory_impl;
|
||||
class dictionary_holder;
|
||||
|
||||
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
|
||||
|
||||
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
|
||||
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
|
||||
// or referenced by algorithm-specific dicts.
|
||||
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
dict_id _id;
|
||||
std::vector<std::byte> _dict;
|
||||
public:
|
||||
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
|
||||
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
|
||||
~raw_dict();
|
||||
const std::span<const std::byte> raw() const { return _dict; }
|
||||
dict_id id() const { return _id; }
|
||||
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
|
||||
// (which internally holds a pointer to the raw dictionary blob
|
||||
// and parsed entropy tables).
|
||||
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
|
||||
public:
|
||||
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~zstd_ddict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -100,14 +102,14 @@ public:
|
||||
// so the level of compression is decided at the time of construction
|
||||
// of this dict.
|
||||
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
int _level;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
|
||||
public:
|
||||
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
~zstd_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -119,11 +121,11 @@ public:
|
||||
// and a hash index over the substrings of the blob).
|
||||
//
|
||||
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
|
||||
public:
|
||||
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~lz4_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -164,6 +166,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
class snappy_processor: public compressor {
|
||||
@@ -266,6 +269,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
|
||||
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
|
||||
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
|
||||
}
|
||||
|
||||
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
const std::string_view DICTIONARY_OPTION = ".dictionary.";
|
||||
|
||||
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
|
||||
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string compressor::name() const {
|
||||
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
|
||||
}
|
||||
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
|
||||
case algorithm::snappy: return "SnappyCompressor";
|
||||
case algorithm::zstd: return "ZstdCompressor";
|
||||
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
|
||||
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
|
||||
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
|
||||
}
|
||||
abort();
|
||||
}
|
||||
@@ -660,6 +678,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
compressor_ptr make_lz4_sstable_compressor_for_tests() {
|
||||
return std::make_unique<lz4_processor>();
|
||||
}
|
||||
@@ -751,21 +779,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
return snappy_max_compressed_length(input_len);
|
||||
}
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// across nodes.
|
||||
//
|
||||
// Holds weak pointers to all live dictionaries
|
||||
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
|
||||
// and shared (lifetime-extending) pointers to the current writer ("recommended")
|
||||
// dict for each table (so that they can be shared with new SSTables without consulting
|
||||
// `system.dicts`).
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
|
||||
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
|
||||
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
|
||||
//
|
||||
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
|
||||
// is removed from the factory.
|
||||
//
|
||||
@@ -774,10 +793,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
// Has a configurable memory budget for live dicts. If the budget is exceeded,
|
||||
// will return null dicts to new writers (to avoid making the memory usage even worse)
|
||||
// and print warnings.
|
||||
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
|
||||
class dictionary_holder : public weakly_referencable<dictionary_holder> {
|
||||
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
|
||||
shard_id _owner_shard;
|
||||
config _cfg;
|
||||
using config = default_sstable_compressor_factory::config;
|
||||
const config& _cfg;
|
||||
uint64_t _total_live_dict_memory = 0;
|
||||
metrics::metric_groups _metrics;
|
||||
struct zstd_cdict_id {
|
||||
@@ -789,7 +808,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
|
||||
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
|
||||
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
|
||||
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
|
||||
std::map<table_id, foreign_ptr<lw_shared_ptr<const raw_dict>>> _recommended;
|
||||
|
||||
size_t memory_budget() const {
|
||||
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
|
||||
@@ -806,8 +825,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
memory_budget()
|
||||
);
|
||||
}
|
||||
public:
|
||||
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (dict.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
auto id = get_sha256(dict);
|
||||
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
|
||||
return it->second->shared_from_this();
|
||||
@@ -819,7 +841,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
|
||||
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_ddict> ddict;
|
||||
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
|
||||
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
|
||||
@@ -835,15 +859,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(ddict));
|
||||
}
|
||||
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_zstd_dict_for_reading(raw, level);
|
||||
});
|
||||
}
|
||||
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
|
||||
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -859,19 +879,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_zstd_dict_for_writing(rec_it->second, level);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
using lz4_dicts = std::pair<
|
||||
foreign_ptr<lw_shared_ptr<const raw_dict>>,
|
||||
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
|
||||
@@ -879,18 +886,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
|
||||
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
|
||||
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
lw_shared_ptr<const raw_dict> ddict;
|
||||
return make_foreign(std::move(raw));
|
||||
}
|
||||
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
|
||||
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_lz4_dict_for_reading(raw);
|
||||
});
|
||||
}
|
||||
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const lz4_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -905,24 +906,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
|
||||
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_lz4_dict_for_writing(rec_it->second);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
sstable_compressor_factory_impl(config cfg)
|
||||
: _owner_shard(this_shard_id())
|
||||
, _cfg(std::move(cfg))
|
||||
dictionary_holder(const config& cfg)
|
||||
: _cfg(cfg)
|
||||
{
|
||||
if (_cfg.register_metrics) {
|
||||
namespace sm = seastar::metrics;
|
||||
@@ -931,8 +918,8 @@ public:
|
||||
});
|
||||
}
|
||||
}
|
||||
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
|
||||
~sstable_compressor_factory_impl() {
|
||||
dictionary_holder(dictionary_holder&&) = delete;
|
||||
~dictionary_holder() {
|
||||
// Note: `_recommended` might be the only thing keeping some dicts alive,
|
||||
// so clearing it will destroy them.
|
||||
//
|
||||
@@ -948,39 +935,36 @@ public:
|
||||
_recommended.clear();
|
||||
}
|
||||
void forget_raw_dict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_raw_dicts.erase(id);
|
||||
}
|
||||
void forget_zstd_cdict(dict_id id, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_cdicts.erase({id, level});
|
||||
}
|
||||
void forget_zstd_ddict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_ddicts.erase(id);
|
||||
}
|
||||
void forget_lz4_cdict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_lz4_cdicts.erase(id);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
|
||||
return smp::submit_to(_owner_shard, [this, t, dict] {
|
||||
_recommended.erase(t);
|
||||
if (dict.size()) {
|
||||
auto canonical_ptr = get_canonical_ptr(dict);
|
||||
_recommended.emplace(t, canonical_ptr);
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict.size(), fmt_hex(canonical_ptr->id()));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
});
|
||||
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
|
||||
_recommended.erase(t);
|
||||
if (dict) {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict->raw().size(), fmt_hex(dict->id()));
|
||||
_recommended.emplace(t, std::move(dict));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
}
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it == _recommended.end()) {
|
||||
co_return nullptr;
|
||||
}
|
||||
co_return co_await rec_it->second.copy();
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
|
||||
void account_memory_delta(ssize_t n) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
|
||||
compressor_factory_logger.error(
|
||||
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
|
||||
@@ -990,19 +974,85 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
|
||||
: _cfg(std::move(cfg))
|
||||
, _holder(std::make_unique<dictionary_holder>(_cfg))
|
||||
{
|
||||
for (shard_id i = 0; i < smp::count; ++i) {
|
||||
auto numa_id = _cfg.numa_config[i];
|
||||
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
|
||||
_numa_groups[numa_id].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
|
||||
const auto params = s->get_compressor_params();
|
||||
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
|
||||
}
|
||||
|
||||
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
|
||||
auto sp = local_engine->smp().shard_to_numa_node_mapping();
|
||||
return std::vector<unsigned>(sp.begin(), sp.end());
|
||||
}
|
||||
|
||||
unsigned default_sstable_compressor_factory::local_numa_id() {
|
||||
return _cfg.numa_config[this_shard_id()];
|
||||
}
|
||||
|
||||
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
|
||||
auto hash = read_unaligned<uint64_t>(sha.data());
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
|
||||
}
|
||||
return group[hash % group.size()];
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
|
||||
if (_leader_shard != this_shard_id()) {
|
||||
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
|
||||
}
|
||||
auto units = co_await get_units(_recommendation_setting_sem, 1);
|
||||
auto sha = get_sha256(dict);
|
||||
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
continue;
|
||||
}
|
||||
auto r = get_dict_owner(numa_id, sha);
|
||||
auto d = co_await container().invoke_on(r, [dict](self& local) {
|
||||
return make_foreign(local._holder->get_canonical_ptr(dict));
|
||||
});
|
||||
auto local_coordinator = group[0];
|
||||
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
|
||||
local._holder->set_recommended_dict(t, std::move(d));
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
|
||||
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
|
||||
const auto local_coordinator = _numa_groups[local_numa_id()][0];
|
||||
return container().invoke_on(local_coordinator, [t](self& local) {
|
||||
return local._holder->get_recommended_dict(t);
|
||||
});
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_lz4_dicts_for_writing(s->id())
|
||||
: nullptr;
|
||||
holder::foreign_lz4_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_lz4_dict_for_writing(recommended.release());
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1015,9 +1065,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
case algorithm::zstd:
|
||||
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
|
||||
: nullptr;
|
||||
holder::foreign_zstd_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1029,17 +1083,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
abort();
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
|
||||
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
|
||||
return make_compressor_for_writing_impl(params, id);
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_lz4_dict_for_reading(std::move(d));
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1054,8 +1119,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
}
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1067,7 +1137,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
abort();
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
auto dict = dict_from_options(c);
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
return make_compressor_for_reading_impl(params, dict);
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _id(key)
|
||||
, _dict(dict.begin(), dict.end())
|
||||
@@ -1082,7 +1164,7 @@ raw_dict::~raw_dict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _level(level)
|
||||
@@ -1114,7 +1196,7 @@ zstd_cdict::~zstd_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _alloc([this] (ssize_t n) {
|
||||
@@ -1143,7 +1225,7 @@ zstd_ddict::~zstd_ddict() {
|
||||
}
|
||||
}
|
||||
|
||||
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _dict(LZ4_createStream(), LZ4_freeStream)
|
||||
@@ -1162,6 +1244,28 @@ lz4_cdict::~lz4_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
|
||||
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
struct wrapper : sstable_compressor_factory {
|
||||
using impl = default_sstable_compressor_factory;
|
||||
sharded<impl> _impl;
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
|
||||
return _impl.local().make_compressor_for_writing(s);
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
|
||||
return _impl.local().make_compressor_for_reading(c);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
|
||||
return _impl.local().set_recommended_dict(t, d);
|
||||
};
|
||||
wrapper(wrapper&&) = delete;
|
||||
wrapper() {
|
||||
_impl.start().get();
|
||||
}
|
||||
~wrapper() {
|
||||
_impl.stop().get();
|
||||
}
|
||||
};
|
||||
return std::make_unique<wrapper>();
|
||||
}
|
||||
|
||||
|
||||
@@ -64,6 +64,8 @@ public:
|
||||
|
||||
virtual algorithm get_algorithm() const = 0;
|
||||
|
||||
virtual std::optional<unsigned> get_dict_owner_for_test() const;
|
||||
|
||||
using ptr_type = std::unique_ptr<compressor>;
|
||||
};
|
||||
|
||||
|
||||
@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/secondary_index_test.cc',
|
||||
'test/boost/sessions_test.cc',
|
||||
'test/boost/sstable_compaction_test.cc',
|
||||
'test/boost/sstable_compressor_factory_test.cc',
|
||||
'test/boost/sstable_directory_test.cc',
|
||||
'test/boost/sstable_set_test.cc',
|
||||
'test/boost/statement_restrictions_test.cc',
|
||||
|
||||
@@ -113,10 +113,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
|
||||
if (rs->uses_tablets()) {
|
||||
warnings.push_back(
|
||||
"Tables in this keyspace will be replicated using Tablets "
|
||||
"and will not support CDC, LWT and counters features. "
|
||||
"To use CDC, LWT or counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} "
|
||||
"to the CREATE KEYSPACE statement.");
|
||||
"and will not support Materialized Views, Secondary Indexes, CDC, LWT and counters features. "
|
||||
"To use Materialized Views, Secondary Indexes, CDC, LWT or counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
|
||||
if (ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
}
|
||||
|
||||
2
docs/_static/data/os-support.json
vendored
2
docs/_static/data/os-support.json
vendored
@@ -7,7 +7,7 @@
|
||||
},
|
||||
"ScyllaDB Versions": [
|
||||
{
|
||||
"version": "Enterprise 2025.1",
|
||||
"version": "ScyllaDB 2025.1",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
#old path: new path
|
||||
|
||||
|
||||
# Remove reduntant pages
|
||||
|
||||
/stable/getting-started/tutorials: https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html
|
||||
/stable/contribute: https://github.com/scylladb/scylladb/blob/master/CONTRIBUTING.md
|
||||
|
||||
# Remove an oudated article
|
||||
|
||||
/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
Contribute to ScyllaDB
|
||||
=======================
|
||||
|
||||
Thank you for your interest in making ScyllaDB better!
|
||||
We appreciate your help and look forward to welcoming you to the ScyllaDB Community.
|
||||
There are two ways you can contribute:
|
||||
|
||||
* Send a patch to the ScyllaDB source code
|
||||
* Write documentation for ScyllaDB Docs
|
||||
|
||||
|
||||
Contribute to ScyllaDB's Source Code
|
||||
------------------------------------
|
||||
ScyllaDB developers use patches and email to share and discuss changes.
|
||||
Setting up can take a little time, but once you have done it the first time, it’s easy.
|
||||
|
||||
The basic steps are:
|
||||
|
||||
* Join the ScyllaDB community
|
||||
* Create a Git branch to work on
|
||||
* Commit your work with clear commit messages and sign-offs.
|
||||
* Send a PR or use ``git format-patch`` and ``git send-email`` to send to the list
|
||||
|
||||
|
||||
The entire process is `documented here <https://github.com/scylladb/scylla/blob/master/CONTRIBUTING.md>`_.
|
||||
|
||||
Contribute to ScyllaDB Docs
|
||||
---------------------------
|
||||
|
||||
Each ScyllaDB project has accompanying documentation. For information about contributing documentation to a specific ScyllaDB project, refer to the README file for the individual project.
|
||||
For general information or to contribute to the ScyllaDB Sphinx theme, read the `Contributor's Guide <https://sphinx-theme.scylladb.com/stable/contribute/>`_.
|
||||
@@ -11,7 +11,6 @@ Getting Started
|
||||
requirements
|
||||
Migrate to ScyllaDB </using-scylla/migrate-scylla>
|
||||
Integration Solutions </using-scylla/integrations/index>
|
||||
tutorials
|
||||
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Requirements
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
============
|
||||
Tutorials
|
||||
============
|
||||
|
||||
The tutorials will show you how to use ScyllaDB as a data source for an application.
|
||||
|
||||
|
||||
ScyllaDB Tutorial
|
||||
===================
|
||||
|
||||
`Build an IoT App with sensor simulator and a REST API <https://iot.scylladb.com/stable/>`_
|
||||
|
||||
ScyllaDB Cloud Tutorial
|
||||
=======================
|
||||
|
||||
`Implement CRUD operations with a TODO App <https://github.com/scylladb/scylla-cloud-getting-started/>`_
|
||||
|
||||
ScyllaDB Cloud Feature Store Tutorial
|
||||
=====================================
|
||||
|
||||
`Build a machine learning (ML) feature store with ScyllaDB <https://feature-store.scylladb.com/stable/>`_
|
||||
@@ -73,6 +73,5 @@ In addition, you can read our `blog <https://www.scylladb.com/blog/>`_ and atten
|
||||
kb/index
|
||||
reference/index
|
||||
faq
|
||||
Contribute to ScyllaDB <contribute>
|
||||
2024.2 and earlier documentation <https://enterprise.docs.scylladb.com/branch-2024.2/>
|
||||
|
||||
|
||||
15
init.cc
15
init.cc
@@ -13,6 +13,7 @@
|
||||
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
|
||||
logging::logger startlog("init");
|
||||
|
||||
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
|
||||
std::any service_set::find(const std::type_info& type) const {
|
||||
return _impl->find(type);
|
||||
}
|
||||
|
||||
// Placed here to avoid dependency on db::config in compress.cc,
|
||||
// where the rest of default_sstable_compressor_factory_config is.
|
||||
auto default_sstable_compressor_factory_config::from_db_config(
|
||||
const db::config& cfg,
|
||||
std::span<const unsigned> numa_config) -> self
|
||||
{
|
||||
return self {
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
|
||||
.numa_config{numa_config.begin(), numa_config.end()},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -112,19 +112,19 @@ void production_snitch_base::parse_property_file(std::string contents) {
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_double_declaration(const sstring& key) const {
|
||||
logger().error("double \"{}\" declaration in {}", key, _prop_file_name);
|
||||
logger().warn("double \"{}\" declaration in {}", key, _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_bad_format(const sstring& line) const {
|
||||
logger().error("Bad format in properties file {}: {}", _prop_file_name, line);
|
||||
logger().warn("Bad format in properties file {}: {}", _prop_file_name, line);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_incomplete_file() const {
|
||||
logger().error("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
|
||||
logger().warn("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
|
||||
50
main.cc
50
main.cc
@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
|
||||
langman.invoke_on_all(&lang::manager::start).get();
|
||||
|
||||
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
|
||||
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
|
||||
std::cref(*cfg), std::cref(numa_groups))).get();
|
||||
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
|
||||
sstable_compressor_factory.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting database");
|
||||
|
||||
debug::the_database = &db;
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
|
||||
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto sstables_prefix = std::string_view("sstables/");
|
||||
if (name.starts_with(sstables_prefix)) {
|
||||
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
|
||||
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
|
||||
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
|
||||
} else if (name == dictionary_service::rpc_compression_dict_name) {
|
||||
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
|
||||
}
|
||||
@@ -1755,6 +1757,24 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); });
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
checkpoint(stop_signal, "starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting streaming manager");
|
||||
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
|
||||
return sm.start(stop_signal.as_local_abort_source());
|
||||
}).get();
|
||||
|
||||
api::set_server_stream_manager(ctx, stream_manager).get();
|
||||
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing storage service");
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
@@ -1921,24 +1941,6 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
|
||||
});
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
checkpoint(stop_signal, "starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting streaming manager");
|
||||
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
|
||||
return sm.start(stop_signal.as_local_abort_source());
|
||||
}).get();
|
||||
|
||||
api::set_server_stream_manager(ctx, stream_manager).get();
|
||||
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting hinted handoff manager");
|
||||
if (!hinted_handoff_enabled.is_disabled_for_all()) {
|
||||
hints_dir_initializer.ensure_rebalanced().get();
|
||||
|
||||
@@ -397,7 +397,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _view_update_read_concurrency_semaphores_group(
|
||||
max_memory_concurrent_view_update_reads(),
|
||||
utils::updateable_value<int>(max_count_concurrent_view_update_reads),
|
||||
max_inactive_view_update_queue_length(),
|
||||
std::numeric_limits<size_t>::max(),
|
||||
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,
|
||||
|
||||
@@ -1488,7 +1488,6 @@ private:
|
||||
size_t max_memory_concurrent_view_update_reads() { return _dbcfg.available_memory * 0.01; }
|
||||
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 1000; }
|
||||
size_t max_inactive_view_update_queue_length() { return _dbcfg.available_memory * 0.01 / 1000; }
|
||||
// They're rather heavyweight, so limit more
|
||||
static constexpr size_t max_count_streaming_concurrent_reads{10};
|
||||
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
||||
|
||||
@@ -1892,6 +1892,8 @@ table::sstable_list_builder::build_new_list(const sstables::sstable_set& current
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
||||
|
||||
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
|
||||
|
||||
// add sstables from the current list into the new list except the ones that are in the old list
|
||||
std::vector<sstables::shared_sstable> removed_sstables;
|
||||
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
|
||||
@@ -2178,14 +2180,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
|
||||
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
|
||||
|
||||
struct compaction_group_sstable_set_updater {
|
||||
struct compaction_group_strategy_updater {
|
||||
table& t;
|
||||
compaction_group& cg;
|
||||
compaction_backlog_tracker new_bt;
|
||||
compaction::compaction_strategy_state new_cs_state;
|
||||
lw_shared_ptr<sstables::sstable_set> new_sstables;
|
||||
|
||||
compaction_group_sstable_set_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
|
||||
compaction_group_strategy_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
|
||||
: t(t)
|
||||
, cg(cg)
|
||||
, new_bt(new_cs.make_backlog_tracker())
|
||||
@@ -2196,26 +2197,26 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
|
||||
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
||||
|
||||
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(cg.as_table_state()));
|
||||
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
||||
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
|
||||
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables->insert(s);
|
||||
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables_for_backlog_tracker.push_back(s);
|
||||
});
|
||||
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
||||
}
|
||||
|
||||
void execute() noexcept {
|
||||
// Update strategy state and backlog tracker according to new strategy. SSTable set update
|
||||
// is delayed until new compaction, which is triggered on strategy change. SSTable set
|
||||
// cannot be updated here since it must happen under the set update lock.
|
||||
t._compaction_manager.register_backlog_tracker(cg.as_table_state(), std::move(new_bt));
|
||||
cg.set_main_sstables(std::move(new_sstables));
|
||||
cg.set_compaction_strategy_state(std::move(new_cs_state));
|
||||
}
|
||||
};
|
||||
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
|
||||
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
|
||||
|
||||
for_each_compaction_group([&] (compaction_group& cg) {
|
||||
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
|
||||
compaction_group_strategy_updater updater(*this, cg, new_cs);
|
||||
updater.prepare(new_cs);
|
||||
cg_sstable_set_updaters.push_back(std::move(updater));
|
||||
});
|
||||
@@ -2224,7 +2225,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
for (auto& updater : cg_sstable_set_updaters) {
|
||||
updater.execute();
|
||||
}
|
||||
refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
size_t table::sstables_count() const {
|
||||
|
||||
@@ -769,6 +769,12 @@ utils::input_stream as_input_stream(const bytes_ostream& b) {
|
||||
return utils::input_stream::fragmented(b.fragments().begin(), b.size());
|
||||
}
|
||||
|
||||
template<FragmentedView View>
|
||||
inline
|
||||
auto as_input_stream(View v) {
|
||||
return fragmented_memory_input_stream(fragment_range(v).begin(), v.size_bytes());
|
||||
}
|
||||
|
||||
template<typename Output, typename ...T>
|
||||
void serialize(Output& out, const boost::variant<T...>& v) {}
|
||||
|
||||
|
||||
@@ -89,7 +89,6 @@ future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
|
||||
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}.value()));
|
||||
}
|
||||
|
||||
|
||||
future<raft::log_entries> raft_sys_table_storage::load_log() {
|
||||
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
|
||||
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id}, cql3::query_processor::cache_internal::yes);
|
||||
@@ -103,7 +102,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
|
||||
}
|
||||
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
|
||||
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
|
||||
auto raw_data = row.get_blob("data");
|
||||
auto raw_data = row.get_view("data");
|
||||
auto in = ser::as_input_stream(raw_data);
|
||||
using data_variant_type = decltype(raft::log_entry::data);
|
||||
data_variant_type data = ser::deserialize(in, std::type_identity<data_variant_type>());
|
||||
|
||||
@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
switch (rs.state) {
|
||||
case node_state::normal: {
|
||||
if (is_me(ip)) {
|
||||
if (is_me(id)) {
|
||||
co_return;
|
||||
}
|
||||
// In replace-with-same-ip scenario the replaced node IP will be the same
|
||||
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
auto old_ip = it->second;
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
|
||||
|
||||
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
|
||||
if (prev_ip == endpoint) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (_address_map.find(id) != endpoint) {
|
||||
// Address map refused to update IP for the host_id,
|
||||
// this means prev_ip has higher generation than endpoint.
|
||||
// We can immediately remove endpoint from gossiper
|
||||
// since it represents an old IP (before an IP change)
|
||||
// for the given host_id. This is not strictly
|
||||
// necessary, but it reduces the noise circulated
|
||||
// in gossiper messages and allows for clearer
|
||||
// expectations of the gossiper state in tests.
|
||||
|
||||
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
|
||||
// Do not update address.
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
|
||||
if (_ss.raft_topology_change_enabled()) {
|
||||
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
|
||||
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
|
||||
|
||||
co_await _gossiper.start_gossiping(new_generation, app_states);
|
||||
|
||||
utils::get_local_injector().inject("stop_after_starting_gossiping",
|
||||
@@ -6010,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
|
||||
});
|
||||
}
|
||||
|
||||
inet_address storage_service::host2ip(locator::host_id host) const {
|
||||
auto ip = _address_map.find(host);
|
||||
if (!ip) {
|
||||
throw std::runtime_error(::format("Cannot map host {} to ip", host));
|
||||
}
|
||||
return *ip;
|
||||
}
|
||||
|
||||
// Performs a replica-side operation for a given tablet.
|
||||
// What operation is performed is determined by "op" based on the
|
||||
// current state of tablet metadata. The coordinator is supposed to prepare tablet
|
||||
@@ -7281,11 +7264,7 @@ void storage_service::init_messaging_service() {
|
||||
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
|
||||
streaming::stream_files_response resp;
|
||||
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
|
||||
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future<gms::inet_address> {
|
||||
return ss.container().invoke_on(0, [host] (storage_service& ss) {
|
||||
return ss.host2ip(host);
|
||||
});
|
||||
});
|
||||
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
|
||||
co_return res.stream_bytes;
|
||||
},
|
||||
size_t(0),
|
||||
|
||||
@@ -210,7 +210,6 @@ private:
|
||||
// when both of which sit on the same node. So all the movement is local.
|
||||
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
|
||||
future<> cleanup_tablet(locator::global_tablet_id);
|
||||
inet_address host2ip(locator::host_id) const;
|
||||
// Handler for table load stats RPC.
|
||||
future<locator::load_stats> load_stats_for_tablet_based_tables();
|
||||
future<> process_tablet_split_candidate(table_id) noexcept;
|
||||
|
||||
@@ -1534,7 +1534,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::repair: {
|
||||
if (action_failed(tablet_state.repair)) {
|
||||
bool fail_repair = utils::get_local_injector().enter("handle_tablet_migration_repair_fail");
|
||||
if (fail_repair || action_failed(tablet_state.repair)) {
|
||||
if (do_barrier()) {
|
||||
updates.emplace_back(get_mutation_builder()
|
||||
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
|
||||
@@ -2021,6 +2022,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2157,6 +2160,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::commit_cdc_generation, "
|
||||
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
|
||||
@@ -2236,6 +2241,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception());
|
||||
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
|
||||
@@ -2251,6 +2258,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::write_both_read_old, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2297,6 +2306,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
|
||||
" (node state is {}): {}", state, std::current_exception());
|
||||
@@ -2327,6 +2338,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::write_both_read_new, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2466,6 +2479,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::left_token_ring, "
|
||||
"raft_topology_cmd::command::barrier failed, error {}",
|
||||
@@ -2481,6 +2496,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
node = retake_node(co_await start_operation(), node.id);
|
||||
}
|
||||
|
||||
// Make decommissioning node a non voter before reporting operation completion below.
|
||||
// Otherwise the decommissioned node may see the completion and exit before it is removed from
|
||||
// the config at which point the removal from the config will hang if the cluster had only two
|
||||
// nodes before the decommission.
|
||||
co_await _voter_handler.on_node_removed(node.id, _as);
|
||||
|
||||
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
|
||||
|
||||
rtbuilder.done();
|
||||
|
||||
@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
|
||||
const compression_parameters& params,
|
||||
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
|
||||
) {
|
||||
auto factory = make_sstable_compressor_factory();
|
||||
co_await factory->set_recommended_dict(initial_schema->id(), dict);
|
||||
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
|
||||
sharded<default_sstable_compressor_factory> factory;
|
||||
co_await factory.start();
|
||||
std::exception_ptr ex;
|
||||
float result;
|
||||
try {
|
||||
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
|
||||
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await factory.stop();
|
||||
if (ex) {
|
||||
co_return coroutine::exception(ex);
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
@@ -10,20 +10,89 @@
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "compress.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <span>
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
} // namespace db
|
||||
|
||||
struct dictionary_holder;
|
||||
class raw_dict;
|
||||
|
||||
struct sstable_compressor_factory {
|
||||
virtual ~sstable_compressor_factory() {}
|
||||
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
|
||||
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
|
||||
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
|
||||
struct config {
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
};
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
|
||||
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
|
||||
// because then the compiler gives weird complains about default member initializers in line
|
||||
// ```
|
||||
// default_sstable_compressor_factory(config = config{});
|
||||
// ```
|
||||
// apparently due to some compiler bug related default initializers.
|
||||
struct default_sstable_compressor_factory_config {
|
||||
using self = default_sstable_compressor_factory_config;
|
||||
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
|
||||
|
||||
static default_sstable_compressor_factory_config from_db_config(
|
||||
const db::config&,
|
||||
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
|
||||
};
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// between all shards within the same NUMA group.
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// decided by a content hash of the dictionary.
|
||||
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
|
||||
// to that dict.
|
||||
//
|
||||
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
|
||||
// per an opening of an SSTable).
|
||||
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
|
||||
using holder = dictionary_holder;
|
||||
public:
|
||||
using self = default_sstable_compressor_factory;
|
||||
using config = default_sstable_compressor_factory_config;
|
||||
private:
|
||||
config _cfg;
|
||||
// Maps NUMA node ID to the array of shards on that node.
|
||||
std::vector<std::vector<shard_id>> _numa_groups;
|
||||
// Holds dictionaries owned by this shard.
|
||||
std::unique_ptr<dictionary_holder> _holder;
|
||||
// All recommended dictionary updates are serialized by a single "leader shard".
|
||||
// We do this to avoid dealing with concurrent updates altogether.
|
||||
semaphore _recommendation_setting_sem{1};
|
||||
constexpr static shard_id _leader_shard = 0;
|
||||
|
||||
private:
|
||||
using sha256_type = std::array<std::byte, 32>;
|
||||
unsigned local_numa_id();
|
||||
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
|
||||
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
|
||||
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
|
||||
public:
|
||||
default_sstable_compressor_factory(config = config{});
|
||||
~default_sstable_compressor_factory();
|
||||
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
|
||||
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();
|
||||
|
||||
@@ -945,7 +945,21 @@ filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, replica:
|
||||
|
||||
std::vector<frozen_sstable_run>
|
||||
sstable_set_impl::all_sstable_runs() const {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
auto all_sstables = all();
|
||||
std::unordered_map<sstables::run_id, sstable_run> runs_m;
|
||||
std::vector<frozen_sstable_run> all_runs;
|
||||
|
||||
for (auto&& sst : *all_sstables) {
|
||||
// When a run cannot accept sstable due to overlapping, treat the rejected sstable
|
||||
// as a single-fragment run.
|
||||
if (!runs_m[sst->run_identifier()].insert(sst)) {
|
||||
all_runs.push_back(make_lw_shared<const sstable_run>(sst));
|
||||
}
|
||||
}
|
||||
for (auto&& r : runs_m | std::views::values) {
|
||||
all_runs.push_back(make_lw_shared<const sstable_run>(std::move(r)));
|
||||
}
|
||||
return all_runs;
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#include "message/messaging_service.hh"
|
||||
#include "streaming/stream_blob.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "utils/pretty_printers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "locator/host_id.hh"
|
||||
@@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i
|
||||
|
||||
future<> stream_blob_handler(replica::database& db,
|
||||
netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source,
|
||||
@@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db,
|
||||
|
||||
|
||||
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
@@ -374,7 +373,7 @@ namespace streaming {
|
||||
// Send files in the files list to the nodes in targets list over network
|
||||
// Returns number of bytes sent over network
|
||||
future<size_t>
|
||||
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
|
||||
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
|
||||
size_t ops_total_size = 0;
|
||||
if (targets.empty()) {
|
||||
co_return ops_total_size;
|
||||
@@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
ops_id, sources.size(), sources, targets);
|
||||
|
||||
struct sink_and_source {
|
||||
gms::inet_address node;
|
||||
locator::host_id node;
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink;
|
||||
rpc::source<streaming::stream_blob_cmd_data> source;
|
||||
bool sink_closed = false;
|
||||
@@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
for (auto& x : targets) {
|
||||
const auto& node = x.node;
|
||||
meta.dst_shard_id = x.shard;
|
||||
auto ip = co_await host2ip(node);
|
||||
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
|
||||
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
|
||||
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
|
||||
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
|
||||
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
|
||||
}
|
||||
|
||||
// This fiber sends data to peer node
|
||||
@@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
}
|
||||
|
||||
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
|
||||
stream_files_response resp;
|
||||
auto& table = db.find_column_family(req.table);
|
||||
auto sstables = co_await table.take_storage_snapshot(req.range);
|
||||
@@ -653,7 +651,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
|
||||
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
|
||||
req.ops_id, sstables.size(), files.size(), files, req.range);
|
||||
auto ops_start_time = std::chrono::steady_clock::now();
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
|
||||
resp.stream_bytes = stream_bytes;
|
||||
auto duration = std::chrono::steady_clock::now() - ops_start_time;
|
||||
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",
|
||||
|
||||
@@ -116,13 +116,13 @@ struct stream_blob_info {
|
||||
};
|
||||
|
||||
// The handler for the STREAM_BLOB verb.
|
||||
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
|
||||
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
|
||||
|
||||
// Exposed mainly for testing
|
||||
|
||||
future<> stream_blob_handler(replica::database& db,
|
||||
netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source,
|
||||
@@ -163,11 +163,9 @@ public:
|
||||
size_t stream_bytes = 0;
|
||||
};
|
||||
|
||||
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
|
||||
|
||||
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
|
||||
// stream sstables files specified by the stream_files_request req.
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip);
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
|
||||
|
||||
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
|
||||
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
|
||||
@@ -178,7 +176,6 @@ future<size_t> tablet_stream_files(netw::messaging_service& ms,
|
||||
std::vector<node_and_shard> targets,
|
||||
table_id table,
|
||||
file_stream_id ops_id,
|
||||
host2ip_t host2ip,
|
||||
service::frozen_topology_guard topo_guard,
|
||||
bool may_inject_errors = false
|
||||
);
|
||||
|
||||
@@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
}
|
||||
});
|
||||
ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
auto from = netw::messaging_service::get_source(cinfo).addr;
|
||||
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
||||
auto sink = _ms.local().make_sink_for_stream_blob(source);
|
||||
(void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) {
|
||||
sslog.warn("Failed to run stream blob handler: {}", eptr);
|
||||
@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
|
||||
// prepare tasks
|
||||
set_state(stream_session_state::PREPARING);
|
||||
auto& db = manager().db();
|
||||
for (auto& request : requests) {
|
||||
// always flush on stream request
|
||||
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
|
||||
const auto& ks = request.keyspace;
|
||||
// Make sure cf requested by peer node exists
|
||||
for (auto& cf : request.column_families) {
|
||||
try {
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
for (auto& summary : summaries) {
|
||||
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
|
||||
auto cf_id = summary.cf_id;
|
||||
// Make sure cf the peer node will send to us exists
|
||||
try {
|
||||
db.find_column_family(cf_id);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
}
|
||||
|
||||
|
||||
@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
|
||||
secondary_index_test.cc
|
||||
sessions_test.cc
|
||||
sstable_compaction_test.cc
|
||||
sstable_compressor_factory_test.cc
|
||||
sstable_directory_test.cc
|
||||
sstable_set_test.cc
|
||||
statement_restrictions_test.cc
|
||||
|
||||
@@ -596,9 +596,10 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
|
||||
cf_name))
|
||||
.get();
|
||||
f1.get();
|
||||
e.get_system_keyspace().local().load_built_views().get();
|
||||
|
||||
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
|
||||
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
|
||||
f2.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
|
||||
if (!verb_register) {
|
||||
co_await smp::invoke_on_all([&] {
|
||||
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
auto from = netw::messaging_service::get_source(cinfo).addr;
|
||||
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
||||
auto sink = global_ms.local().make_sink_for_stream_blob(source);
|
||||
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
|
||||
auto path = meta.filename + suffix;
|
||||
@@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
|
||||
co_return make_file_input_stream(std::move(file), foptions);
|
||||
};
|
||||
}
|
||||
auto host2ip = [&global_db] (locator::host_id id) -> future<gms::inet_address> {
|
||||
co_return global_db.local().get_token_metadata().get_topology().my_address();
|
||||
};
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error);
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
|
||||
co_await mark_tablet_stream_done(ops_id);
|
||||
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
|
||||
ret = true;
|
||||
|
||||
@@ -812,27 +812,20 @@ SEASTAR_THREAD_TEST_CASE(background_reclaim) {
|
||||
logalloc::shard_tracker().stop().get();
|
||||
});
|
||||
|
||||
sleep(500ms).get(); // sleep a little, to give the reclaimer a head start
|
||||
|
||||
std::vector<managed_bytes> std_allocs;
|
||||
size_t std_alloc_size = 1000000; // note that managed_bytes fragments these, even in std
|
||||
for (int i = 0; i < 50; ++i) {
|
||||
// Background reclaim is supposed to eventually ensure a certain amount of free memory.
|
||||
while (memory::free_memory() < background_reclaim_free_memory_threshold) {
|
||||
thread::maybe_yield();
|
||||
}
|
||||
|
||||
auto compacted_pre = logalloc::shard_tracker().statistics().memory_compacted;
|
||||
fmt::print("compacted {} items {} (pre)\n", compacted_pre, evictable_allocs.size());
|
||||
std_allocs.emplace_back(managed_bytes::initialized_later(), std_alloc_size);
|
||||
auto compacted_post = logalloc::shard_tracker().statistics().memory_compacted;
|
||||
fmt::print("compacted {} items {} (post)\n", compacted_post, evictable_allocs.size());
|
||||
BOOST_REQUIRE_EQUAL(compacted_pre, compacted_post);
|
||||
|
||||
// Pretend to do some work. Sleeping would be too easy, as the background reclaim group would use
|
||||
// all that time.
|
||||
//
|
||||
// Use thread_cputime_clock to prevent overcommitted test machines from stealing CPU time
|
||||
// and causing test failures.
|
||||
auto deadline = thread_cputime_clock::now() + 100ms;
|
||||
while (thread_cputime_clock::now() < deadline) {
|
||||
thread::maybe_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
return sstables::test_env::do_with([] (sstables::test_env& env) {
|
||||
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("v", bytes_type)
|
||||
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
|
||||
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
return seastar::async([&env, s, &cf] {
|
||||
// populate
|
||||
auto new_key = [&] {
|
||||
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
|
||||
flushed.get();
|
||||
});
|
||||
}).then([cf_stats] {});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
|
||||
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
|
||||
}
|
||||
|
||||
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
|
||||
const schema& s = *expected.schema();
|
||||
auto expected_cont = expected.entry().squashed_continuity(s);
|
||||
auto actual_cont = actual.get_continuity(s);
|
||||
bool actual_static_cont = actual.static_row_continuous();
|
||||
bool expected_static_cont = expected.squashed().static_row_continuous();
|
||||
if (actual_static_cont != expected_static_cont) {
|
||||
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
if (!expected_cont.equals(s, actual_cont)) {
|
||||
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
// Tests that reading many versions using a cursor gives the logical mutation back.
|
||||
return seastar::async([] {
|
||||
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
auto snap = e.read();
|
||||
auto actual = read_using_cursor(*snap);
|
||||
|
||||
assert_that(s, actual).has_same_continuity(expected);
|
||||
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
|
||||
// Note: squashed continuity of an entry is slightly different than the continuity
|
||||
// of a squashed entry.
|
||||
//
|
||||
// Squashed continuity is the union of continuities of all versions in the entry,
|
||||
// and in particular it includes empty dummy rows resulting in the logical merge
|
||||
// of version.
|
||||
// The process of actually squashing an entry is allowed to
|
||||
// remove those empty dummies, so the squashed entry can have slightly
|
||||
// smaller continuity.
|
||||
//
|
||||
// Since a cursor isn't allowed to remove dummy rows, the strongest test
|
||||
// we can do here is to compare the continuity of the cursor-read mutation
|
||||
// with the squashed continuity of the entry.
|
||||
assert_has_same_squashed_continuity(actual, e);
|
||||
assert_that(s, actual).is_equal_to_compacted(expected);
|
||||
|
||||
// Reversed iteration
|
||||
|
||||
@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
|
||||
auto ts_in_ms = std::chrono::milliseconds(ts);
|
||||
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
|
||||
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
|
||||
BOOST_REQUIRE(ret.second == expected);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2319,6 +2318,7 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
uint32_t _seed;
|
||||
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
|
||||
@@ -2336,7 +2336,7 @@ public:
|
||||
compress))
|
||||
, _random_schema(_seed, *_random_schema_spec)
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
testlog.info("random_schema: {}", _random_schema.cql());
|
||||
}
|
||||
|
||||
@@ -2402,12 +2402,14 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
|
||||
public:
|
||||
scrub_test_framework()
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
}
|
||||
|
||||
~scrub_test_framework() {
|
||||
|
||||
133
test/boost/sstable_compressor_factory_test.cc
Normal file
133
test/boost/sstable_compressor_factory_test.cc
Normal file
@@ -0,0 +1,133 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
|
||||
// 1. Create a random message.
|
||||
// 2. Set this random message as the recommended dict.
|
||||
// 3. On all shards, create compressors.
|
||||
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
|
||||
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
|
||||
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
|
||||
// 7. Repeat this a few times for both lz4 and zstd.
|
||||
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
|
||||
|
||||
// Create a compressor factory.
|
||||
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
|
||||
auto config = default_sstable_compressor_factory::config{
|
||||
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
|
||||
};
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
sstable_compressor_factory.start(std::cref(config)).get();
|
||||
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
|
||||
|
||||
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
|
||||
auto table = table_id::create_random_id();
|
||||
|
||||
// Retry a few times just to check that it works more than once.
|
||||
for (int retry = 0; retry < 3; ++retry) {
|
||||
// Generate a random (and hence uhcompressible without a dict) message.
|
||||
auto message = tests::random::get_sstring(4096);
|
||||
auto dict_view = std::as_bytes(std::span(message));
|
||||
// Set the message as the dict to make the message perfectly compressible.
|
||||
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
|
||||
|
||||
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
|
||||
std::vector<unsigned> compressor_numa_nodes(smp::count);
|
||||
std::vector<unsigned> decompressor_numa_nodes(smp::count);
|
||||
|
||||
// Try for both algorithms, just in case there are some differences in how dictionary
|
||||
// distribution over shards is implemented between them.
|
||||
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
|
||||
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
|
||||
// Validate that the dictionaries work as intended,
|
||||
// and check that their owner is as expected.
|
||||
|
||||
auto params = compression_parameters(algo);
|
||||
|
||||
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
|
||||
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
|
||||
|
||||
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
|
||||
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
|
||||
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
|
||||
|
||||
// Check that the dictionary used by this shard lies on the same NUMA node.
|
||||
// This is important to avoid cross-node memory accesses on the hot path.
|
||||
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
|
||||
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
|
||||
|
||||
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
|
||||
auto output_max_size = compressor->compress_max_size(message.size());
|
||||
auto compressed = std::vector<char>(output_max_size);
|
||||
auto compressed_size = compressor->compress(
|
||||
reinterpret_cast<const char*>(message.data()), message.size(),
|
||||
reinterpret_cast<char*>(compressed.data()), compressed.size());
|
||||
BOOST_REQUIRE_GE(compressed_size, 0);
|
||||
compressed.resize(compressed_size);
|
||||
|
||||
// Validate that the recommeded dict was actually used.
|
||||
BOOST_CHECK(compressed.size() < message.size() / 10);
|
||||
|
||||
auto decompressed = std::vector<char>(message.size());
|
||||
auto decompressed_size = decompressor->uncompress(
|
||||
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
|
||||
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
|
||||
BOOST_REQUIRE_GE(decompressed_size, 0);
|
||||
decompressed.resize(decompressed_size);
|
||||
|
||||
// Validate that the roundtrip through compressor and decompressor
|
||||
// resulted in the original message.
|
||||
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
|
||||
})).get();
|
||||
}
|
||||
|
||||
// Check that the number of owners (and hence, copies) is equal to the number
|
||||
// of NUMA nodes.
|
||||
// This isn't that important, but we don't want to duplicate dictionaries
|
||||
// within a NUMA node unnecessarily.
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
|
||||
{
|
||||
std::vector<unsigned> one_numa_node(smp::count);
|
||||
test_one_numa_topology(one_numa_node);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> two_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
|
||||
two_numa_nodes[i] = i % 2;
|
||||
}
|
||||
test_one_numa_topology(two_numa_nodes);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> n_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
|
||||
n_numa_nodes[i] = i;
|
||||
}
|
||||
test_one_numa_topology(n_numa_nodes);
|
||||
}
|
||||
}
|
||||
@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto builder = schema_builder("test", "summary_test")
|
||||
.with_column("a", int32_type, column_kind::partition_key);
|
||||
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
|
||||
BOOST_REQUIRE(list.size() == 130);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
// d int,
|
||||
// e blob
|
||||
//);
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
|
||||
auto builder = schema_builder("test", "test_multi_schema")
|
||||
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
BOOST_REQUIRE(!m);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
BOOST_REQUIRE(insert(2, 2) == true);
|
||||
BOOST_REQUIRE(insert(5, 5) == true);
|
||||
BOOST_REQUIRE(run.all().size() == 5);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
|
||||
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
// please note, the SSTables used by this test are stored under
|
||||
|
||||
@@ -14,11 +14,12 @@ from test.pylib.internal_types import HostID
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id
|
||||
from test.cluster.mv.tablets.test_mv_tablets import get_tablet_replicas
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.cluster.util import new_test_keyspace, wait_for
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,6 +45,14 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
|
||||
|
||||
async def replicas_balanced():
|
||||
base_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "test", 0)]
|
||||
view_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "tv", 0)]
|
||||
return len(set(base_replicas) & set(view_replicas)) == 0 or None
|
||||
# There's 4 nodes and 4 tablets, so even if the initial placement is not balanced,
|
||||
# each node should get 1 replica after some time.
|
||||
await wait_for(replicas_balanced, time.time() + 60)
|
||||
|
||||
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
@@ -401,20 +401,21 @@ class topo:
|
||||
self.dcs = dcs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 3, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 3, nodes = 6, racks = 3, dcs = 1), True),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
])
|
||||
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology):
|
||||
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology_rf_validity):
|
||||
'''Check that restoring of a cluster with stream scopes works'''
|
||||
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks')
|
||||
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
|
||||
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
|
||||
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300 }
|
||||
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces }
|
||||
cmd = [ '--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug' ]
|
||||
servers = []
|
||||
host_ids = {}
|
||||
|
||||
@@ -474,7 +474,7 @@ async def add_new_node(manager: ManagerClient,
|
||||
yield
|
||||
|
||||
LOGGER.info("Add a new node to the cluster")
|
||||
await manager.server_add(timeout=TOPOLOGY_TIMEOUT)
|
||||
await manager.server_add(config={"rf_rack_valid_keyspaces": False}, timeout=TOPOLOGY_TIMEOUT)
|
||||
|
||||
yield
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
|
||||
async def four_nodes_cluster(manager: ManagerClient) -> None:
|
||||
LOGGER.info("Booting initial 4-node cluster.")
|
||||
for _ in range(4):
|
||||
server = await manager.server_add()
|
||||
server = await manager.server_add(config={"rf_rack_valid_keyspaces": False})
|
||||
await manager.api.enable_injection(
|
||||
node_ip=server.ip_addr,
|
||||
injection="raft_server_set_snapshot_thresholds",
|
||||
@@ -93,6 +93,8 @@ async def test_random_failures(manager: ManagerClient,
|
||||
TESTS_COUNT, TESTS_SHUFFLE_SEED, ERROR_INJECTIONS_COUNT, CLUSTER_EVENTS_COUNT,
|
||||
)
|
||||
|
||||
rf_rack_cfg = {"rf_rack_valid_keyspaces": False}
|
||||
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
await table.insert_seq()
|
||||
|
||||
@@ -116,7 +118,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
)
|
||||
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
|
||||
coordinator_log_mark = await coordinator_log.mark()
|
||||
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED)
|
||||
s_info = await manager.server_add(config=rf_rack_cfg, expected_server_up_state=ServerUpState.PROCESS_STARTED)
|
||||
await coordinator_log.wait_for(
|
||||
pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting",
|
||||
from_mark=coordinator_log_mark,
|
||||
@@ -128,7 +130,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
)
|
||||
else:
|
||||
s_info = await manager.server_add(
|
||||
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]},
|
||||
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]} | rf_rack_cfg,
|
||||
expected_server_up_state=ServerUpState.PROCESS_STARTED,
|
||||
)
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ extra_scylla_config_options:
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
enable_user_defined_functions: False
|
||||
rf_rack_valid_keyspaces: True
|
||||
tablets_mode_for_new_keyspaces: enabled
|
||||
run_first:
|
||||
- test_raft_recovery_stuck
|
||||
|
||||
@@ -20,7 +20,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
|
||||
from test.cluster.util import reconnect_driver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
|
||||
@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
|
||||
# IP-s before they are send back to servers[1] and servers[2],
|
||||
# and the mentioned above code is not exercised by this test.
|
||||
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
|
||||
# sleep_before_start_gossiping injections are needed to reproduce #22777
|
||||
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_start(servers[1].server_id)
|
||||
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
|
||||
if build_mode != 'release':
|
||||
|
||||
@@ -32,7 +32,7 @@ N_SERVERS = 2
|
||||
@pytest.fixture
|
||||
async def two_nodes_cluster(manager: ManagerClient) -> list[ServerNum]:
|
||||
logger.info(f"Booting initial 2-nodes cluster")
|
||||
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS)]
|
||||
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS, auto_rack_dc="dc1")]
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
return servers
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureReq
|
||||
|
||||
"""
|
||||
logger.info("Creating a new cluster")
|
||||
srvs = await manager.servers_add(3)
|
||||
srvs = await manager.servers_add(3, auto_rack_dc="dc1")
|
||||
cql, _ = await manager.get_ready_cql(srvs)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") as ks:
|
||||
|
||||
@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -39,7 +39,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
"""
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--cache-hit-rate-read-balancing", "0", "--logger-log-level", "debug_error_injection=trace"]
|
||||
|
||||
nodes = await manager.servers_add(3, cmdline=cmdline)
|
||||
nodes = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
node1, node2, node3 = nodes
|
||||
|
||||
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
|
||||
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
|
||||
|
||||
def execute_with_tracing(cql, statement, *args, **kwargs):
|
||||
kwargs['trace'] = True
|
||||
query_result = cql.execute(statement, *args, **kwargs)
|
||||
|
||||
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
|
||||
page_traces = []
|
||||
for trace in tracing:
|
||||
trace_events = []
|
||||
for event in trace.events:
|
||||
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
|
||||
page_traces.append("\n".join(trace_events))
|
||||
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
|
||||
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
|
||||
" WITH speculative_retry = 'NONE'"
|
||||
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
" AND compaction = {'class': 'NullCompactionStrategy'}")
|
||||
|
||||
for write_statement, delete_statement in statement_pairs:
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
|
||||
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
|
||||
await manager.api.disable_injection(node3.ip_addr, "database_apply")
|
||||
|
||||
def check_data(host, data):
|
||||
|
||||
@@ -64,12 +64,15 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
cfg = {'tablets_mode_for_new_keyspaces' : 'enabled' if tablets_enabled else 'disabled'}
|
||||
|
||||
logger.info("Bootstrapping first two nodes")
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
servers = await manager.servers_add(2, config=cfg, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
# The third node is started as the last one, so we can be sure that is has
|
||||
# the latest topology version
|
||||
logger.info("Bootstrapping the last node")
|
||||
servers += [await manager.server_add(config=cfg)]
|
||||
servers += [await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r3"})]
|
||||
|
||||
# Disable load balancer as it might bump topology version, undoing the decrement below.
|
||||
# This should be done before adding the last two servers,
|
||||
@@ -123,9 +126,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_fence_hints(request, manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster with three nodes")
|
||||
s0 = await manager.server_add(config={
|
||||
'error_injections_at_startup': ['decrease_hints_flush_period']
|
||||
}, cmdline=['--logger-log-level', 'hints_manager=trace'])
|
||||
s0 = await manager.server_add(
|
||||
config={'error_injections_at_startup': ['decrease_hints_flush_period']},
|
||||
cmdline=['--logger-log-level', 'hints_manager=trace'],
|
||||
property_file={"dc": "dc1", "rack": "r1"})
|
||||
|
||||
# Disable load balancer as it might bump topology version, potentially creating a race condition
|
||||
# with read modify write below.
|
||||
@@ -134,7 +138,10 @@ async def test_fence_hints(request, manager: ManagerClient):
|
||||
# which the test relies on.
|
||||
await manager.api.disable_tablet_balancing(s0.ip_addr)
|
||||
|
||||
[s1, s2] = await manager.servers_add(2)
|
||||
[s1, s2] = await manager.servers_add(2, property_file=[
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"}
|
||||
])
|
||||
|
||||
logger.info(f'Creating test table')
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
|
||||
|
||||
@@ -10,7 +10,7 @@ import uuid
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -21,8 +21,8 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
|
||||
since ignore node is permanent now and B is removed from the quorum early so it is enough to
|
||||
have two live nodes for the quorum.
|
||||
"""
|
||||
await manager.servers_add(2)
|
||||
servers = await manager.running_servers()
|
||||
servers += await manager.servers_add(2, property_file=[servers[1].property_file(), servers[2].property_file()])
|
||||
await manager.server_stop_gracefully(servers[3].server_id)
|
||||
await manager.server_stop_gracefully(servers[4].server_id)
|
||||
# test that non existing uuid is rejected
|
||||
@@ -37,6 +37,6 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
|
||||
# is 2
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(start=False, replace_cfg=replace_cfg)
|
||||
await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[2].property_file())
|
||||
|
||||
|
||||
|
||||
@@ -58,10 +58,10 @@ async def test_putget_2dc_with_rf(
|
||||
table_name = "test_table_name"
|
||||
columns = [Column("name", TextType), Column("value", TextType)]
|
||||
logger.info("Create two servers in different DC's")
|
||||
for i in nodes_list:
|
||||
for rack_idx, dc_idx in enumerate(nodes_list):
|
||||
s_info = await manager.server_add(
|
||||
config=CONFIG,
|
||||
property_file={"dc": f"dc{i}", "rack": "myrack"},
|
||||
property_file={"dc": f"dc{dc_idx}", "rack": f"rack{rack_idx}"},
|
||||
)
|
||||
logger.info(s_info)
|
||||
conn = manager.get_cql()
|
||||
|
||||
@@ -23,14 +23,15 @@ async def test_not_enough_token_owners(manager: ManagerClient):
|
||||
"""
|
||||
logging.info('Trying to add a zero-token server as the first server in the cluster')
|
||||
await manager.server_add(config={'join_ring': False},
|
||||
property_file={"dc": "dc1", "rack": "rz"},
|
||||
expected_error='Cannot start the first node in the cluster as zero-token')
|
||||
|
||||
logging.info('Adding the first server')
|
||||
server_a = await manager.server_add()
|
||||
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
|
||||
|
||||
logging.info('Adding two zero-token servers')
|
||||
# The second server is needed only to preserve the Raft majority.
|
||||
server_b = (await manager.servers_add(2, config={'join_ring': False}))[0]
|
||||
server_b = (await manager.servers_add(2, config={'join_ring': False}, property_file={"dc": "dc1", "rack": "rz"}))[0]
|
||||
|
||||
logging.info(f'Trying to decommission the only token owner {server_a}')
|
||||
await manager.decommission_node(server_a.server_id,
|
||||
@@ -47,7 +48,7 @@ async def test_not_enough_token_owners(manager: ManagerClient):
|
||||
await manager.server_start(server_a.server_id)
|
||||
|
||||
logging.info('Adding a normal server')
|
||||
await manager.server_add()
|
||||
await manager.server_add(property_file={"dc": "dc1", "rack": "r2"})
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -43,6 +43,10 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
|
||||
previous state. For replace, add a single node (a sanity check verifying that the cluster is functioning properly).
|
||||
7. Stop sending writes.
|
||||
"""
|
||||
# Currently, the constraints imposed by `rf_rack_valid_keyspaces` are quite strict
|
||||
# and adjusting this test to working with it may require significant changes in the test.
|
||||
# Let's disable the option explicitly until we do that.
|
||||
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
|
||||
# Decrease failure_detector_timeout_in_ms from the default 20 s to speed up some graceful shutdowns in the test.
|
||||
# Shutting down the CQL server can hang for failure_detector_timeout_in_ms in the presence of dead nodes and
|
||||
# CQL requests.
|
||||
@@ -50,7 +54,8 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
|
||||
'endpoint_snitch': 'GossipingPropertyFileSnitch',
|
||||
'tablets_mode_for_new_keyspaces': 'enabled',
|
||||
'failure_detector_timeout_in_ms': 2000,
|
||||
}
|
||||
} | rf_rack_cfg
|
||||
|
||||
property_file_dc1 = {'dc': 'dc1', 'rack': 'rack1'}
|
||||
property_file_dc2 = {'dc': 'dc2', 'rack': 'rack2'}
|
||||
|
||||
@@ -150,7 +155,7 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
|
||||
for i, being_replaced in enumerate(dead_servers):
|
||||
replace_cfg = ReplaceConfig(replaced_id=being_replaced.server_id, reuse_ip_addr=False, use_host_id=True,
|
||||
ignore_dead_nodes=[dead_srv.ip_addr for dead_srv in dead_servers[i + 1:]])
|
||||
new_servers.append(await manager.server_add(replace_cfg=replace_cfg, property_file=property_file_dc2))
|
||||
new_servers.append(await manager.server_add(replace_cfg=replace_cfg, config=rf_rack_cfg, property_file=property_file_dc2))
|
||||
|
||||
logging.info(f'Unsetting the recovery_leader config option on {live_servers}')
|
||||
for srv in live_servers:
|
||||
|
||||
@@ -15,9 +15,10 @@ from cassandra.cluster import ConsistencyLevel, Session # type: ignore
|
||||
from cassandra.query import SimpleStatement # type: ignore
|
||||
from cassandra.pool import Host # type: ignore
|
||||
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
|
||||
|
||||
@@ -309,13 +310,13 @@ async def test_incremental_read_repair(data_class: DataClass, manager: ManagerCl
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_read_repair_with_trace_logging(request, manager):
|
||||
logger.info("Creating a new cluster")
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace"]
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace:debug_error_injection=trace"]
|
||||
config = {"read_request_timeout_in_ms": 60000}
|
||||
|
||||
for i in range(2):
|
||||
await manager.server_add(cmdline=cmdline, config=config)
|
||||
[node1, node2] = await manager.servers_add(2, cmdline=cmdline, config=config, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
srvs = await manager.running_servers()
|
||||
@@ -326,13 +327,15 @@ async def test_read_repair_with_trace_logging(request, manager):
|
||||
|
||||
await cql.run_async(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 0)")
|
||||
|
||||
await manager.server_stop(srvs[0].server_id)
|
||||
prepared = cql.prepare(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)")
|
||||
prepared.consistency_level = ConsistencyLevel.ONE
|
||||
await cql.run_async(prepared)
|
||||
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=True)
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)", consistency_level = ConsistencyLevel.ONE))
|
||||
|
||||
await manager.server_start(srvs[0].server_id)
|
||||
tracing = execute_with_tracing(cql, SimpleStatement(f"SELECT * FROM {ks}.t WHERE pk = 0", consistency_level = ConsistencyLevel.ALL), log = True)
|
||||
|
||||
prepared = cql.prepare(f"SELECT * FROM {ks}.t WHERE pk = 0")
|
||||
prepared.consistency_level = ConsistencyLevel.ALL
|
||||
await cql.run_async(prepared)
|
||||
assert len(tracing) == 1 # 1 page
|
||||
|
||||
found_read_repair = False
|
||||
for event in tracing[0]:
|
||||
found_read_repair |= "digest mismatch, starting read repair" == event.description
|
||||
|
||||
assert found_read_repair
|
||||
|
||||
@@ -22,7 +22,7 @@ async def test_remove_rpc_client_with_pending_requests(request, manager: Manager
|
||||
# Regression test for #17445
|
||||
|
||||
logger.info("starting first two nodes")
|
||||
servers = await manager.servers_add(2)
|
||||
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
|
||||
logger.info(f"wait_for_cql_and_get_hosts for the first node {servers[0]}")
|
||||
host0 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[0]], time.time() + 60))[0]
|
||||
@@ -48,7 +48,7 @@ async def test_remove_rpc_client_with_pending_requests(request, manager: Manager
|
||||
expected_data.sort()
|
||||
|
||||
logger.info(f"adding the third node")
|
||||
servers += [await manager.server_add(start=False)]
|
||||
servers += [await manager.server_add(start=False, property_file=servers[0].property_file())]
|
||||
|
||||
logger.info(f"starting the third node [{servers[2]}]")
|
||||
third_node_future = asyncio.create_task(manager.server_start(servers[2].server_id))
|
||||
|
||||
@@ -40,8 +40,7 @@ async def test_enable_compacting_data_for_streaming_and_repair_live_update(manag
|
||||
silently broken in the past.
|
||||
"""
|
||||
cmdline = ["--enable-compacting-data-for-streaming-and-repair", "0", "--smp", "1", "--logger-log-level", "api=trace"]
|
||||
node1 = await manager.server_add(cmdline=cmdline)
|
||||
node2 = await manager.server_add(cmdline=cmdline)
|
||||
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -89,8 +88,7 @@ async def test_tombstone_gc_for_streaming_and_repair(manager):
|
||||
"--hinted-handoff-enabled", "0",
|
||||
"--smp", "1",
|
||||
"--logger-log-level", "api=trace:database=trace"]
|
||||
node1 = await manager.server_add(cmdline=cmdline)
|
||||
node2 = await manager.server_add(cmdline=cmdline)
|
||||
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -149,10 +147,7 @@ async def test_tombstone_gc_for_streaming_and_repair(manager):
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_succeeds_with_unitialized_bm(manager):
|
||||
await manager.server_add()
|
||||
await manager.server_add()
|
||||
servers = await manager.running_servers()
|
||||
|
||||
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
@@ -171,8 +166,7 @@ async def do_batchlog_flush_in_repair(manager, cache_time_in_ms):
|
||||
total_repair_duration = 0
|
||||
|
||||
cmdline = ["--repair-hints-batchlog-flush-cache-time-in-ms", str(cache_time_in_ms), "--smp", "1", "--logger-log-level", "api=trace"]
|
||||
node1 = await manager.server_add(cmdline=cmdline)
|
||||
node2 = await manager.server_add(cmdline=cmdline)
|
||||
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
@@ -226,10 +220,7 @@ async def test_batchlog_flush_in_repair_without_cache(manager):
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_abort(manager):
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
await manager.server_add(config=cfg)
|
||||
await manager.server_add(config=cfg)
|
||||
servers = await manager.running_servers()
|
||||
|
||||
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
|
||||
@@ -78,7 +78,7 @@ async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> Non
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
"""Replace an existing node with new node using the same IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
|
||||
|
||||
logger.info(f"creating test table")
|
||||
@@ -90,7 +90,7 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
|
||||
replace_future = asyncio.create_task(manager.server_add(replace_cfg))
|
||||
replace_future = asyncio.create_task(manager.server_add(replace_cfg, property_file=servers[0].property_file()))
|
||||
start_time = time.time()
|
||||
next_id = 0
|
||||
logger.info(f"running write requests in a loop while the replacing node is starting")
|
||||
|
||||
@@ -27,7 +27,7 @@ async def test_reversed_queries_during_upgrade(manager: ManagerClient) -> None:
|
||||
in order to test both native and legacy reversed formats.
|
||||
"""
|
||||
cmdline = ["--hinted-handoff-enabled", "0"]
|
||||
node1, _ = await manager.servers_add(2, cmdline)
|
||||
node1, _ = await manager.servers_add(2, cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ async def test_basic(manager: ManagerClient) -> None:
|
||||
'internode_compression': "all",
|
||||
'internode_compression_zstd_max_cpu_fraction': 0.0}
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg)
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -108,7 +108,7 @@ async def test_dict_training(manager: ManagerClient) -> None:
|
||||
'--logger-log-level=dict_training=trace'
|
||||
]
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -170,7 +170,7 @@ async def test_external_dicts(manager: ManagerClient) -> None:
|
||||
'--logger-log-level=advanced_rpc_compressor=debug'
|
||||
]
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -233,7 +233,7 @@ async def test_external_dicts_sanity(manager: ManagerClient) -> None:
|
||||
'--logger-log-level=advanced_rpc_compressor=debug',
|
||||
]
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
|
||||
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sticky_coordinator_enforced(manager: ManagerClient) -> None:
|
||||
await manager.servers_add(2, cmdline=['--logger-log-level', 'paging=trace'])
|
||||
await manager.servers_add(2, cmdline=['--logger-log-level', 'paging=trace'], auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from cassandra.query import SimpleStatement # type: ignore # pylint
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -39,7 +39,7 @@ async def test_snapshot(manager, random_tables):
|
||||
await t.add_column()
|
||||
|
||||
manager.driver_close()
|
||||
server_d = await manager.server_add()
|
||||
server_d = await manager.server_add(property_file=server_a.property_file())
|
||||
logger.info("Started D %s", server_d)
|
||||
|
||||
logger.info("Stopping A %s, B %s, and C %s", server_a, server_b, server_c)
|
||||
|
||||
@@ -54,14 +54,14 @@ async def test_autoretrain_dict(manager: ManagerClient):
|
||||
uncompressed_size = blob_size * n_blobs * rf
|
||||
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = (await manager.servers_add(2, cmdline=[
|
||||
servers = await manager.servers_add(2, cmdline=[
|
||||
'--logger-log-level=storage_service=debug',
|
||||
'--logger-log-level=database=debug',
|
||||
'--logger-log-level=sstable_dict_autotrainer=debug',
|
||||
'--sstable-compression-dictionaries-retrain-period-in-seconds=1',
|
||||
'--sstable-compression-dictionaries-autotrainer-tick-period-in-seconds=1',
|
||||
f'--sstable-compression-dictionaries-min-training-dataset-bytes={int(uncompressed_size/2)}',
|
||||
]))
|
||||
], auto_rack_dc="dc1")
|
||||
|
||||
logger.info("Creating table")
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -74,7 +74,7 @@ async def test_retrain_dict(manager: ManagerClient):
|
||||
|
||||
servers = (await manager.servers_add(2, cmdline=[
|
||||
*common_debug_cli_options,
|
||||
]))
|
||||
], auto_rack_dc="dc1"))
|
||||
|
||||
logger.info("Creating table")
|
||||
cql = manager.get_cql()
|
||||
@@ -185,7 +185,7 @@ async def test_estimate_compression_ratios(manager: ManagerClient):
|
||||
|
||||
servers = (await manager.servers_add(2, cmdline=[
|
||||
*common_debug_cli_options,
|
||||
]))
|
||||
], auto_rack_dc="dc1"))
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -346,13 +346,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
LZ4WithDictsCompressor and ZstdWithDictsCompressor
|
||||
to the Cassandra-compatible LZ4Compressor and ZstdCompressor.
|
||||
"""
|
||||
servers = await manager.servers_add(1)
|
||||
servers = await manager.servers_add(1, [
|
||||
*common_debug_cli_options,
|
||||
])
|
||||
|
||||
# Create keyspace and table
|
||||
logger.info("Creating tables")
|
||||
cql = manager.get_cql()
|
||||
|
||||
algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
|
||||
dict_algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
|
||||
nondict_algorithms = ['Snappy', 'LZ4', 'Deflate', 'Zstd']
|
||||
algorithms = dict_algorithms + nondict_algorithms
|
||||
no_compression = 'NoCompression'
|
||||
all_tables = dict_algorithms + nondict_algorithms + [no_compression]
|
||||
|
||||
await cql.run_async("""
|
||||
CREATE KEYSPACE test
|
||||
@@ -363,14 +369,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
CREATE TABLE test."{algo}" (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{algo}Compressor'}};
|
||||
''')
|
||||
for algo in algorithms
|
||||
])
|
||||
for algo in algorithms],
|
||||
cql.run_async(f'''
|
||||
CREATE TABLE test."{no_compression}" (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{}}
|
||||
''')
|
||||
)
|
||||
|
||||
|
||||
# Populate data with
|
||||
blob = random.randbytes(16*1024);
|
||||
logger.info("Populating table")
|
||||
n_blobs = 100
|
||||
for algo in algorithms:
|
||||
for algo in all_tables:
|
||||
insert = cql.prepare(f'''INSERT INTO test."{algo}" (pk, c) VALUES (?, ?);''')
|
||||
insert.consistency_level = ConsistencyLevel.ALL;
|
||||
for pks in itertools.batched(range(n_blobs), n=100):
|
||||
@@ -381,7 +392,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
|
||||
async def validate_select():
|
||||
cql = manager.get_cql()
|
||||
for algo in algorithms:
|
||||
for algo in all_tables:
|
||||
select = cql.prepare(f'''SELECT c FROM test."{algo}" WHERE pk = ? BYPASS CACHE;''')
|
||||
results = await cql.run_async(select, [42])
|
||||
assert results[0][0] == blob
|
||||
@@ -424,7 +435,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
names = set()
|
||||
for table_info in sstable_info:
|
||||
for sstable in table_info["sstables"]:
|
||||
for prop in sstable["extended_properties"]:
|
||||
for prop in sstable.get("extended_properties", []):
|
||||
if prop["group"] == "compression_parameters":
|
||||
for attr in prop["attributes"]:
|
||||
if attr["key"] == "sstable_compression":
|
||||
@@ -433,18 +444,24 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
|
||||
await asyncio.gather(*[
|
||||
manager.api.retrain_dict(servers[0].ip_addr, "test", algo)
|
||||
for algo in algorithms
|
||||
for algo in all_tables
|
||||
])
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
|
||||
|
||||
name_prefix = "org.apache.cassandra.io.compress."
|
||||
|
||||
for algo in algorithms:
|
||||
for algo in dict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {f"{algo}Compressor"}
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_enable_writing', "false")
|
||||
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
|
||||
|
||||
name_prefix = "org.apache.cassandra.io.compress."
|
||||
assert (await get_compressor_names("LZ4WithDicts")) == {name_prefix + "LZ4Compressor"}
|
||||
assert (await get_compressor_names("ZstdWithDicts")) == {name_prefix + "ZstdCompressor"}
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
@@ -232,6 +232,7 @@ def check_repairs(row_num_before: list[int], row_num_after: list[int], expected_
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.parametrize("included_host_count", [2, 1, 0])
|
||||
async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_count):
|
||||
injection = "handle_tablet_migration_repair_fail"
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
||||
hosts_filter = "00000000-0000-0000-0000-000000000000"
|
||||
if included_host_count == 1:
|
||||
@@ -243,7 +244,7 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
|
||||
|
||||
token = -1
|
||||
async def repair_task():
|
||||
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
await inject_error_on(manager, injection, servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, hosts_filter=hosts_filter)
|
||||
|
||||
async def check_filter():
|
||||
@@ -255,7 +256,7 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
|
||||
assert len(res) == 1
|
||||
assert res[str(token)].repair_hosts_filter.split(",").sort() == hosts_filter.split(",").sort()
|
||||
|
||||
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
await inject_error_off(manager, injection, servers)
|
||||
|
||||
await asyncio.gather(repair_task(), check_filter())
|
||||
|
||||
@@ -264,8 +265,8 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
|
||||
|
||||
async def prepare_multi_dc_repair(manager) -> tuple[list[ServerInfo], CassandraSession, list[Host], str, str]:
|
||||
servers = [await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
|
||||
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
|
||||
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R2'})]
|
||||
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R2'}),
|
||||
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R3'})]
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'DC1': 2, 'DC2': 1} AND tablets = {'initial': 8};")
|
||||
|
||||
@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
# This test verifies that Scylla rejects creating a table if there are too few token-owning nodes.
|
||||
# That means that a keyspace must already be in place, but that's impossible with RF-rack-valid
|
||||
# keyspaces being enforced. We could go over this constraint by creating 3 nodes and then
|
||||
# decommissioning one of them before attempting to create a table, but if we decide to constraint
|
||||
# decommission later on, this test will have to be modified again. Let's simply disable the option.
|
||||
cfg = cfg | {'rf_rack_valid_keyspaces': False}
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
|
||||
cql = manager.get_cql()
|
||||
@@ -66,7 +72,12 @@ async def test_tablet_scaling_option_is_respected(manager: ManagerClient):
|
||||
async def test_tablet_cannot_decommision_below_replication_factor(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
servers = await manager.servers_add(4, config=cfg)
|
||||
servers = await manager.servers_add(4, config=cfg, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"}
|
||||
])
|
||||
|
||||
logger.info("Creating table")
|
||||
cql = manager.get_cql()
|
||||
@@ -132,7 +143,7 @@ async def test_reshape_with_tablets(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_rf_change(manager: ManagerClient, direction):
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
servers = await manager.servers_add(3, config=cfg)
|
||||
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
||||
for s in servers:
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
@@ -141,14 +152,14 @@ async def test_tablet_rf_change(manager: ManagerClient, direction):
|
||||
this_dc = res[0].data_center
|
||||
|
||||
if direction == 'up':
|
||||
rf_from = 2
|
||||
rf_to = 3
|
||||
rf_from = 1
|
||||
rf_to = 2
|
||||
if direction == 'down':
|
||||
rf_from = 3
|
||||
rf_to = 2
|
||||
if direction == 'none':
|
||||
rf_from = 2
|
||||
rf_to = 2
|
||||
rf_to = 1
|
||||
if direction == 'none':
|
||||
rf_from = 1
|
||||
rf_to = 1
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from}}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
@@ -199,7 +210,11 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
|
||||
not owned by the node is attempted to be read."""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'tablets_mode_for_new_keyspaces': 'enabled' }
|
||||
servers = await manager.servers_add(3, config=cfg)
|
||||
servers = await manager.servers_add(3, config=cfg, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -788,12 +803,14 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
|
||||
and when there is another down node in the datacenter, leaving no normal token owners.
|
||||
"""
|
||||
servers: dict[str, list[ServerInfo]] = dict()
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
||||
{'dc': 'dc1', 'rack': 'rack1_1'},
|
||||
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
||||
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
||||
extra_node = 0 if with_zero_token_node else 1
|
||||
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
||||
if with_zero_token_node:
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
||||
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
@@ -815,7 +832,7 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
|
||||
|
||||
logger.info(f"Replacing {node_to_replace} with a new node")
|
||||
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
||||
@@ -827,10 +844,12 @@ async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_tok
|
||||
And then verify that that node can be replaced successfully.
|
||||
"""
|
||||
servers: dict[str, list[ServerInfo]] = dict()
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
||||
{'dc': 'dc1', 'rack': 'rack1_1'},
|
||||
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
||||
servers['dc2'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
||||
if with_zero_token_node:
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
||||
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
@@ -848,7 +867,7 @@ async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_tok
|
||||
|
||||
logger.info(f"Replacing {node_to_remove} with a new node")
|
||||
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
||||
@@ -861,12 +880,14 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient,
|
||||
but other datacenters can be used to rebuild the data.
|
||||
"""
|
||||
servers: dict[str, list[ServerInfo]] = dict()
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
||||
{'dc': 'dc1', 'rack': 'rack1_1'},
|
||||
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
||||
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
||||
extra_node = 0 if with_zero_token_node else 1
|
||||
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
||||
if with_zero_token_node:
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
|
||||
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
||||
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
@@ -888,11 +909,11 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient,
|
||||
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
|
||||
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
|
||||
ignore_dead_nodes=[replaced_host_id])
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[0].property_file())
|
||||
|
||||
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
|
||||
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[1].property_file())
|
||||
|
||||
logger.info("Verifying data")
|
||||
for node in servers['dc2']:
|
||||
@@ -1115,15 +1136,15 @@ async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
async def make_server():
|
||||
s = await manager.server_add(config=cfg)
|
||||
async def make_server(rack: str):
|
||||
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server()
|
||||
await make_server()
|
||||
await make_server()
|
||||
await make_server("r1")
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
from typing import Any
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
|
||||
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
||||
@@ -12,6 +13,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
|
||||
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, TabletReplicas
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
|
||||
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
@@ -94,7 +96,7 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
|
||||
'--logger-log-level', 'messaging_service=trace',
|
||||
'--logger-log-level', 'rpc=trace',
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline)
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
s0 = servers[0].server_id
|
||||
not_s0 = servers[1:]
|
||||
@@ -495,7 +497,11 @@ async def test_tablet_repair(manager: ManagerClient):
|
||||
'--logger-log-level', 'repair=trace',
|
||||
'--task-ttl-in-seconds', '3600', # Make sure the test passes with non-zero task_ttl.
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline)
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
||||
|
||||
@@ -558,7 +564,11 @@ async def test_concurrent_tablet_repair_and_split(manager: ManagerClient):
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, config={
|
||||
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
|
||||
})
|
||||
}, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
@@ -622,9 +632,7 @@ async def test_tablet_missing_data_repair(manager: ManagerClient):
|
||||
cmdline = [
|
||||
'--hinted-handoff-enabled', 'false',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline),
|
||||
await manager.server_add(cmdline=cmdline),
|
||||
await manager.server_add(cmdline=cmdline)]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
@@ -660,7 +668,7 @@ async def test_tablet_missing_data_repair(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_history(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]
|
||||
servers = await manager.servers_add(3, auto_rack_dc="dc1")
|
||||
|
||||
rf = 3
|
||||
tablets = 8
|
||||
@@ -686,7 +694,7 @@ async def test_tablet_repair_history(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_ranges_selection(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = [await manager.server_add(), await manager.server_add()]
|
||||
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
|
||||
rf = 2
|
||||
tablets = 4
|
||||
@@ -1604,12 +1612,12 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
|
||||
logger.info("Verify data is not resurrected")
|
||||
await assert_empty_table()
|
||||
|
||||
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int) -> dict[ServerNum, ServerInfo]:
|
||||
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int, config: dict[str, Any] = None) -> dict[ServerNum, ServerInfo]:
|
||||
logger.debug(f"Creating cluster: num_dcs={num_dcs} num_racks={num_racks} nodes_per_rack={nodes_per_rack}")
|
||||
servers: dict[ServerNum, ServerInfo] = dict()
|
||||
for dc in range(1, num_dcs + 1):
|
||||
for rack in range(1, num_racks + 1):
|
||||
rack_servers = await manager.servers_add(nodes_per_rack, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
|
||||
rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
|
||||
for s in rack_servers:
|
||||
servers[s.server_id] = s
|
||||
logger.debug(f"Created servers={list(servers.values())}")
|
||||
@@ -1683,7 +1691,12 @@ async def test_decommission_rack_basic(manager: ManagerClient):
|
||||
nodes_per_rack = 2
|
||||
rf = num_racks - 1
|
||||
|
||||
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack)
|
||||
# We need to disable this option to be able to create a keyspace. This can be ditched
|
||||
# once we've implemented scylladb/scylladb#23426 and we can add new racks with the option enabled.
|
||||
# Then we can create `rf` nodes, create the keyspace, and add another node.
|
||||
config = {"rf_rack_valid_keyspaces": False}
|
||||
|
||||
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack, config)
|
||||
async with create_and_populate_table(manager, rf=rf) as ctx:
|
||||
logger.info("Verify tablet replicas distribution")
|
||||
tables = {ctx.ks: [ctx.table]}
|
||||
@@ -1719,7 +1732,11 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
||||
nodes_per_rack = 2
|
||||
rf = initial_num_racks
|
||||
|
||||
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack)
|
||||
# We can't add a new rack if we create a keyspace.
|
||||
# Once scylladb/scylladb#23426 has been implemented, this can be ditched.
|
||||
config = {"rf_rack_valid_keyspaces": False}
|
||||
|
||||
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
|
||||
async with create_and_populate_table(manager, rf=rf) as ctx:
|
||||
logger.debug("Temporarily disable tablet load balancing")
|
||||
node1 = sorted(initial_servers.values(), key=lambda s: s.server_id)[0]
|
||||
@@ -1729,7 +1746,7 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
||||
new_rack = f"rack{num_racks}"
|
||||
# copy initial_servers into all_servers, don't just assign it (by reference)
|
||||
all_servers: list[ServerInfo] = list(initial_servers.values())
|
||||
new_rack_servers = await manager.servers_add(nodes_per_rack, property_file={"dc": "dc1", "rack": new_rack})
|
||||
new_rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": "dc1", "rack": new_rack})
|
||||
all_servers.extend(new_rack_servers)
|
||||
|
||||
logger.info("Verify tablet replicas distribution")
|
||||
@@ -1887,7 +1904,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
|
||||
"""Test truncate operation during topology change."""
|
||||
|
||||
# Start 3 node cluster
|
||||
servers = await manager.servers_add(3, config = { 'enable_tablets': True })
|
||||
servers = await manager.servers_add(3, config = { 'enable_tablets': True }, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (k int PRIMARY KEY, v int)")
|
||||
@@ -1905,7 +1922,8 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
|
||||
|
||||
truncate_task = asyncio.create_task(truncate_table())
|
||||
logger.info("Adding fourth node")
|
||||
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True})
|
||||
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True},
|
||||
property_file=servers[0].property_file())
|
||||
await truncate_task
|
||||
|
||||
# Wait for bootstrap completion
|
||||
@@ -1913,3 +1931,41 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
|
||||
|
||||
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
|
||||
assert rows[0].count == 0, "Table should be empty after truncation"
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
|
||||
cmdline = ['--smp=2']
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
table = f"{ks}.test"
|
||||
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
|
||||
|
||||
stop_compaction = False
|
||||
async def background_compaction():
|
||||
while stop_compaction == False:
|
||||
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
|
||||
|
||||
compaction_task = asyncio.create_task(background_compaction())
|
||||
|
||||
for i in range(5):
|
||||
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
|
||||
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
|
||||
|
||||
stop_compaction = True
|
||||
await compaction_task
|
||||
|
||||
async def force_minor_compaction():
|
||||
for i in range(4):
|
||||
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
|
||||
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
|
||||
await force_minor_compaction()
|
||||
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
|
||||
await force_minor_compaction()
|
||||
|
||||
@@ -76,10 +76,10 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl
|
||||
}
|
||||
|
||||
logger.info("starting a node (the leader)")
|
||||
servers = [await manager.server_add(config=config)]
|
||||
servers = [await manager.server_add(config=config, property_file={"dc": "dc1", "rack": "r1"})]
|
||||
|
||||
logger.info("starting a second node (the follower)")
|
||||
servers += [await manager.server_add(config=config)]
|
||||
servers += [await manager.server_add(config=config, property_file={"dc": "dc1", "rack": "r2"})]
|
||||
|
||||
async with new_test_keyspace(manager, "with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
|
||||
|
||||
@@ -27,15 +27,15 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
async def make_server():
|
||||
s = await manager.server_add(config=cfg)
|
||||
async def make_server(rack: str):
|
||||
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server()
|
||||
await make_server()
|
||||
await make_server()
|
||||
await make_server("r1")
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -109,22 +109,22 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
async def make_server():
|
||||
s = await manager.server_add(config=cfg)
|
||||
async def make_server(rack: str):
|
||||
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server()
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
await make_server()
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await make_server()
|
||||
await make_server("r2")
|
||||
|
||||
if fail_stage in ["cleanup_target", "revert_migration"]:
|
||||
# we'll stop 2 servers, group0 quorum should be there - we need five
|
||||
@@ -136,8 +136,8 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
|
||||
# attempt to remove the 2nd node, to make cleanup_target stage
|
||||
# go ahead, will step on the legacy API lock on storage_service,
|
||||
# so we need to ask some other node to do it
|
||||
for _ in range(2):
|
||||
await make_server()
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
|
||||
logger.info(f"Cluster is [{host_ids}]")
|
||||
|
||||
|
||||
@@ -69,7 +69,8 @@ async def test_replace(manager: ManagerClient):
|
||||
'--logger-log-level', 'raft_topology=trace',
|
||||
]
|
||||
|
||||
servers = await manager.servers_add(3, cmdline=cmdline)
|
||||
config = {"rf_rack_valid_keyspaces": False}
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, config=config)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -118,7 +119,7 @@ async def test_replace(manager: ManagerClient):
|
||||
logger.info('Replacing a node')
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
servers.append(await manager.server_add(replace_cfg))
|
||||
servers.append(await manager.server_add(replace_cfg, config=config))
|
||||
servers = servers[1:]
|
||||
|
||||
key_count = await finish_writes()
|
||||
@@ -146,8 +147,10 @@ async def test_removenode(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = ['--logger-log-level', 'storage_service=trace']
|
||||
|
||||
config = {"rf_rack_valid_keyspaces": False}
|
||||
|
||||
# 4 nodes so that we can find new tablet replica for the RF=3 table on removenode
|
||||
servers = await manager.servers_add(4, cmdline=cmdline)
|
||||
servers = await manager.servers_add(4, cmdline=cmdline, config=config)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -211,7 +214,13 @@ async def test_removenode_with_ignored_node(manager: ManagerClient):
|
||||
|
||||
# 5 nodes because we need a quorum with 2 nodes down.
|
||||
# 4 nodes would be enough to not lose data with RF=3.
|
||||
servers = await manager.servers_add(5, cmdline=cmdline)
|
||||
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"}
|
||||
])
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import logging
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -32,7 +32,7 @@ def get_expected_tombstone_gc_mode(rf, tablets):
|
||||
@pytest.mark.parametrize("rf", [1, 2])
|
||||
@pytest.mark.parametrize("tablets", [True, False])
|
||||
async def test_default_tombstone_gc(manager: ManagerClient, rf: int, tablets: bool):
|
||||
_ = [await manager.server_add() for _ in range(2)]
|
||||
_ = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
tablets_enabled = "true" if tablets else "false"
|
||||
async with new_test_keyspace(manager, f"with replication = {{ 'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} and tablets = {{ 'enabled': {tablets_enabled} }}") as keyspace:
|
||||
@@ -44,7 +44,7 @@ async def test_default_tombstone_gc(manager: ManagerClient, rf: int, tablets: bo
|
||||
@pytest.mark.parametrize("rf", [1, 2])
|
||||
@pytest.mark.parametrize("tablets", [True, False])
|
||||
async def test_default_tombstone_gc_does_not_override(manager: ManagerClient, rf: int, tablets: bool):
|
||||
_ = [await manager.server_add() for _ in range(2)]
|
||||
_ = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
tablets_enabled = "true" if tablets else "false"
|
||||
async with new_test_keyspace(manager, f"with replication = {{ 'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} and tablets = {{ 'enabled': {tablets_enabled} }}") as keyspace:
|
||||
@@ -92,7 +92,10 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
cfg = {
|
||||
'group0_tombstone_gc_refresh_interval_in_ms': 1000, # this is 1 hour by default
|
||||
}
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg) for _ in range(3)]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, config=cfg, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r2"}])
|
||||
|
||||
cql = manager.get_cql()
|
||||
hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
|
||||
|
||||
@@ -22,7 +22,8 @@ logger = logging.getLogger(__name__)
|
||||
@pytest.mark.parametrize("tablets_enabled", [True, False])
|
||||
async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bool):
|
||||
"""Test basic topology operations using the topology coordinator."""
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
|
||||
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'} | rf_rack_cfg
|
||||
rf = 3
|
||||
num_nodes = rf
|
||||
if tablets_enabled:
|
||||
@@ -57,7 +58,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
|
||||
|
||||
logger.info(f"Replacing node {servers[0]}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
|
||||
servers = servers[1:] + [await manager.server_add(replace_cfg)]
|
||||
servers = servers[1:] + [await manager.server_add(replace_cfg, config=rf_rack_cfg)]
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
logger.info(f"Stopping node {servers[0]}")
|
||||
|
||||
@@ -25,9 +25,11 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
|
||||
d.mkdir()
|
||||
k = d / "system_key"
|
||||
k.write_text('AES/CBC/PKCS5Padding:128:ApvJEoFpQmogvam18bb54g==')
|
||||
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled',
|
||||
'user_info_encryption': {'enabled': True, 'key_provider': 'LocalFileSystemKeyProviderFactory'},
|
||||
'system_key_directory': d.as_posix()}
|
||||
cfg = cfg | rf_rack_cfg
|
||||
rf = 3
|
||||
num_nodes = rf
|
||||
if tablets_enabled:
|
||||
@@ -62,7 +64,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
|
||||
|
||||
logger.info(f"Replacing node {servers[0]}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
|
||||
servers = servers[1:] + [await manager.server_add(replace_cfg)]
|
||||
servers = servers[1:] + [await manager.server_add(replace_cfg, config=rf_rack_cfg)]
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
logger.info(f"Stopping node {servers[0]}")
|
||||
|
||||
@@ -19,7 +19,7 @@ import pytest
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -27,7 +27,7 @@ async def test_remove_node_add_column(manager: ManagerClient, random_tables: Ran
|
||||
"""Add a node, remove an original node, add a column"""
|
||||
servers = await manager.running_servers()
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
await manager.server_add()
|
||||
await manager.server_add(property_file=servers[1].property_file())
|
||||
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
|
||||
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
@@ -53,7 +53,7 @@ async def test_decommission_node_add_column(manager: ManagerClient, random_table
|
||||
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
|
||||
await manager.api.enable_injection(
|
||||
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
|
||||
bootstrapped_server = await manager.server_add()
|
||||
bootstrapped_server = await manager.server_add(property_file=decommission_target.property_file())
|
||||
async def no_joining_nodes():
|
||||
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
|
||||
return not joining_nodes
|
||||
|
||||
@@ -54,11 +54,11 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage
|
||||
]
|
||||
|
||||
logger.info(f'Adding --smp=3 server')
|
||||
await manager.server_add(cmdline=['--smp', '3'] + log_args)
|
||||
await manager.server_add(cmdline=['--smp', '3'] + log_args, property_file={"dc": "dc1", "rack": "r1"})
|
||||
logger.info(f'Adding --smp=4 server')
|
||||
await manager.server_add(cmdline=['--smp', '4'] + log_args)
|
||||
await manager.server_add(cmdline=['--smp', '4'] + log_args, property_file={"dc": "dc1", "rack": "r2"})
|
||||
logger.info(f'Adding --smp=5 server')
|
||||
await manager.server_add(cmdline=['--smp', '5'] + log_args)
|
||||
await manager.server_add(cmdline=['--smp', '5'] + log_args, property_file={"dc": "dc1", "rack": "r3"})
|
||||
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
|
||||
@@ -18,20 +18,20 @@ from test.cluster.util import create_new_test_keyspace
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('zero_token_nodes', [1, 2])
|
||||
async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token_nodes: int):
|
||||
@pytest.mark.parametrize('rf_rack_valid_keyspaces', [False, True])
|
||||
async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token_nodes: int, rf_rack_valid_keyspaces: bool):
|
||||
"""
|
||||
Test the basic functionality of a DC with zero-token nodes:
|
||||
- adding zero-token nodes to a new DC succeeds
|
||||
- with tablets, ensuring enough replicas for tables depends on the number of token-owners in a DC, not all nodes
|
||||
- client requests in the presence of zero-token nodes succeed (also when zero-token nodes coordinate)
|
||||
"""
|
||||
normal_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch'}
|
||||
zero_token_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch', 'join_ring': False}
|
||||
property_file_dc1 = {'dc': 'dc1', 'rack': 'rack'}
|
||||
normal_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch', 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
|
||||
zero_token_cfg = normal_cfg | {'join_ring': False}
|
||||
property_file_dc2 = {'dc': 'dc2', 'rack': 'rack'}
|
||||
|
||||
logging.info('Creating dc1 with 2 token-owning nodes')
|
||||
servers = await manager.servers_add(2, config=normal_cfg, property_file=property_file_dc1)
|
||||
servers = await manager.servers_add(2, config=normal_cfg, auto_rack_dc='dc1')
|
||||
|
||||
normal_nodes_in_dc2 = 2 - zero_token_nodes
|
||||
logging.info(f'Creating dc2 with {normal_nodes_in_dc2} token-owning and {zero_token_nodes} zero-token nodes')
|
||||
@@ -47,7 +47,17 @@ async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token
|
||||
|
||||
ks_names = list[str]()
|
||||
logging.info('Trying to create tables for different replication factors')
|
||||
for rf in range(3):
|
||||
|
||||
# With `rf_rack_valid_keyspaces` set to true, we cannot create a keyspace with RF > #racks.
|
||||
# Because of that, the test will fail not at the stage when a TABLE is created, but when
|
||||
# the KEYSPACE is. We want to avoid that and hence this statement.
|
||||
#
|
||||
# rf_rack_valid_keyspaces == False: We'll attempt to create tables in keyspaces with too few normal token owners.
|
||||
# rf_rack_valid_keyspaces == True: We'll only create RF-rack-valid keyspaces and so all of the created tables
|
||||
# will have enough normal token owners.
|
||||
rf_range = (normal_nodes_in_dc2 + 1) if rf_rack_valid_keyspaces else 3
|
||||
|
||||
for rf in range(rf_range):
|
||||
failed = False
|
||||
ks_name = await create_new_test_keyspace(dc2_cql, f"""WITH replication =
|
||||
{{'class': 'NetworkTopologyStrategy', 'replication_factor': 2, 'dc2': {rf}}}
|
||||
|
||||
@@ -22,11 +22,11 @@ async def test_zero_token_nodes_no_replication(manager: ManagerClient):
|
||||
Test that zero-token nodes aren't replicas in all non-local replication strategies with and without tablets.
|
||||
"""
|
||||
logging.info('Adding the first server')
|
||||
server_a = await manager.server_add()
|
||||
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
|
||||
logging.info('Adding the second server as zero-token')
|
||||
server_b = await manager.server_add(config={'join_ring': False})
|
||||
server_b = await manager.server_add(config={'join_ring': False}, property_file={"dc": "dc1", "rack": "r2"})
|
||||
logging.info('Adding the third server')
|
||||
await manager.server_add()
|
||||
await manager.server_add(property_file={"dc": "dc1", "rack": "r3"})
|
||||
|
||||
logging.info(f'Initiating connections to {server_a} and {server_b}')
|
||||
cql_a = cluster_con([server_a.ip_addr], 9042, False,
|
||||
|
||||
@@ -24,29 +24,34 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
|
||||
- topology operations in the Raft-based topology involving zero-token nodes succeed
|
||||
- client requests to normal nodes in the presence of zero-token nodes (2 normal nodes, RF=2, CL=2) succeed
|
||||
"""
|
||||
|
||||
def get_pf(rack: str) -> dict[str, str]:
|
||||
return {"dc": "dc1", "rack": rack}
|
||||
|
||||
logging.info('Trying to add a zero-token server in the gossip-based topology')
|
||||
await manager.server_add(config={'join_ring': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'},
|
||||
property_file=get_pf("rz"),
|
||||
expected_error='the raft-based topology is disabled')
|
||||
|
||||
normal_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
|
||||
zero_token_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled', 'join_ring': False}
|
||||
|
||||
logging.info('Adding the first server')
|
||||
server_a = await manager.server_add(config=normal_cfg)
|
||||
server_a = await manager.server_add(config=normal_cfg, property_file=get_pf("r1"))
|
||||
|
||||
logging.info('Adding the second server as zero-token')
|
||||
server_b = await manager.server_add(config=zero_token_cfg)
|
||||
server_b = await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz"))
|
||||
|
||||
logging.info('Adding the third server')
|
||||
server_c = await manager.server_add(config=normal_cfg)
|
||||
server_c = await manager.server_add(config=normal_cfg, property_file=get_pf("r2"))
|
||||
|
||||
await wait_for_cql_and_get_hosts(manager.cql, [server_a, server_c], time.time() + 60)
|
||||
finish_writes = await start_writes(manager.cql, 2, ConsistencyLevel.TWO)
|
||||
|
||||
logging.info('Adding the fourth server as zero-token')
|
||||
await manager.server_add(config=zero_token_cfg) # Necessary to preserve the Raft majority.
|
||||
await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz")) # Necessary to preserve the Raft majority.
|
||||
|
||||
logging.info(f'Restarting {server_b}')
|
||||
await manager.server_stop_gracefully(server_b.server_id)
|
||||
@@ -57,23 +62,24 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
|
||||
|
||||
replace_cfg_b = ReplaceConfig(replaced_id=server_b.server_id, reuse_ip_addr=False, use_host_id=False)
|
||||
logging.info(f'Trying to replace {server_b} with a token-owing server')
|
||||
await manager.server_add(replace_cfg_b, config=normal_cfg, expected_error='Cannot replace the zero-token node')
|
||||
await manager.server_add(replace_cfg_b, config=normal_cfg, property_file=server_b.property_file(),
|
||||
expected_error='Cannot replace the zero-token node')
|
||||
|
||||
logging.info(f'Replacing {server_b}')
|
||||
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg)
|
||||
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg, property_file=server_b.property_file())
|
||||
|
||||
logging.info(f'Stopping {server_b}')
|
||||
await manager.server_stop_gracefully(server_b.server_id)
|
||||
|
||||
replace_cfg_b = ReplaceConfig(replaced_id=server_b.server_id, reuse_ip_addr=True, use_host_id=False)
|
||||
logging.info(f'Replacing {server_b} with the same IP')
|
||||
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg)
|
||||
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg, property_file=server_b.property_file())
|
||||
|
||||
logging.info(f'Decommissioning {server_b}')
|
||||
await manager.decommission_node(server_b.server_id)
|
||||
|
||||
logging.info('Adding two zero-token servers')
|
||||
[server_b, server_d] = await manager.servers_add(2, config=zero_token_cfg)
|
||||
[server_b, server_d] = await manager.servers_add(2, config=zero_token_cfg, property_file=get_pf("rz"))
|
||||
|
||||
logging.info(f'Rebuilding {server_b}')
|
||||
await manager.rebuild_node(server_b.server_id)
|
||||
@@ -92,21 +98,21 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
|
||||
await manager.remove_node(server_a.server_id, server_d.server_id)
|
||||
|
||||
logging.info('Adding a zero-token server')
|
||||
await manager.server_add(config=zero_token_cfg)
|
||||
await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz"))
|
||||
|
||||
# FIXME: Finish writes after the last server_add call once scylladb/scylladb#19737 is fixed.
|
||||
logging.info('Checking results of the background writes')
|
||||
await finish_writes()
|
||||
|
||||
logging.info('Adding a normal server')
|
||||
server_e = await manager.server_add(config=normal_cfg)
|
||||
server_e = await manager.server_add(config=normal_cfg, property_file=get_pf("r1"))
|
||||
|
||||
logging.info(f'Stopping {server_e}')
|
||||
await manager.server_stop_gracefully(server_e.server_id)
|
||||
|
||||
replace_cfg_e = ReplaceConfig(replaced_id=server_e.server_id, reuse_ip_addr=False, use_host_id=False)
|
||||
logging.info(f'Trying to replace {server_e} with a zero-token server')
|
||||
await manager.server_add(replace_cfg_e, config=zero_token_cfg,
|
||||
await manager.server_add(replace_cfg_e, config=zero_token_cfg, property_file=server_e.property_file(),
|
||||
expected_error='Cannot replace the token-owning node')
|
||||
|
||||
await check_node_log_for_failed_mutations(manager, server_a)
|
||||
|
||||
@@ -128,7 +128,7 @@ public:
|
||||
static constexpr std::string_view ks_name = "ks";
|
||||
static std::atomic<bool> active;
|
||||
private:
|
||||
std::unique_ptr<sstable_compressor_factory> _scf;
|
||||
sharded<default_sstable_compressor_factory> _scf;
|
||||
sharded<replica::database> _db;
|
||||
sharded<gms::feature_service> _feature_service;
|
||||
sharded<sstables::storage_manager> _sstm;
|
||||
@@ -657,10 +657,14 @@ private:
|
||||
auto stop_lang_manager = defer_verbose_shutdown("lang manager", [this] { _lang_manager.stop().get(); });
|
||||
_lang_manager.invoke_on_all(&lang::manager::start).get();
|
||||
|
||||
_scf = make_sstable_compressor_factory();
|
||||
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
|
||||
_scf.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config, std::cref(*cfg), std::cref(numa_groups))).get();
|
||||
auto stop_scf = defer_verbose_shutdown("sstable_compressor_factory", [this] {
|
||||
_scf.stop().get();
|
||||
});
|
||||
|
||||
_db_config = &*cfg;
|
||||
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(*_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
|
||||
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
|
||||
auto stop_db = defer_verbose_shutdown("database", [this] {
|
||||
_db.stop().get();
|
||||
});
|
||||
@@ -896,6 +900,9 @@ private:
|
||||
_view_builder.stop().get();
|
||||
});
|
||||
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
|
||||
|
||||
_ss.start(std::ref(abort_sources), std::ref(_db),
|
||||
std::ref(_gossiper),
|
||||
std::ref(_sys_ks),
|
||||
@@ -995,9 +1002,6 @@ private:
|
||||
}
|
||||
});
|
||||
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
|
||||
|
||||
_sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) {
|
||||
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
|
||||
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
||||
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
|
||||
void maybe_start_compaction_manager(bool enable = true);
|
||||
|
||||
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
|
||||
explicit test_env(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
|
||||
~test_env();
|
||||
test_env(test_env&&) noexcept;
|
||||
|
||||
@@ -176,15 +176,6 @@ public:
|
||||
|
||||
replica::table::config make_table_config();
|
||||
|
||||
template <typename Func>
|
||||
static inline auto do_with(Func&& func, test_env_config cfg = {}) {
|
||||
return seastar::do_with(test_env(std::move(cfg)), [func = std::move(func)] (test_env& env) mutable {
|
||||
return futurize_invoke(func, env).finally([&env] {
|
||||
return env.stop();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg = {});
|
||||
|
||||
static future<> do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func);
|
||||
@@ -192,7 +183,8 @@ public:
|
||||
template <typename T>
|
||||
static future<T> do_with_async_returning(noncopyable_function<T (test_env&)> func) {
|
||||
return seastar::async([func = std::move(func)] {
|
||||
test_env env;
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env({}, *scf);
|
||||
auto stop = defer([&] { env.stop().get(); });
|
||||
return func(env);
|
||||
});
|
||||
|
||||
@@ -201,7 +201,7 @@ struct test_env::impl {
|
||||
::cache_tracker cache_tracker;
|
||||
gms::feature_service feature_service;
|
||||
db::nop_large_data_handler nop_ld_handler;
|
||||
std::unique_ptr<sstable_compressor_factory> scf;
|
||||
sstable_compressor_factory& scf;
|
||||
test_env_sstables_manager mgr;
|
||||
std::unique_ptr<test_env_compaction_manager> cmgr;
|
||||
reader_concurrency_semaphore semaphore;
|
||||
@@ -210,7 +210,7 @@ struct test_env::impl {
|
||||
data_dictionary::storage_options storage;
|
||||
abort_source abort;
|
||||
|
||||
impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir);
|
||||
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
|
||||
impl(impl&&) = delete;
|
||||
impl(const impl&) = delete;
|
||||
|
||||
@@ -219,16 +219,16 @@ struct test_env::impl {
|
||||
}
|
||||
};
|
||||
|
||||
test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir)
|
||||
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
|
||||
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
|
||||
, dir(tdir == nullptr ? local_dir.value() : *tdir)
|
||||
, db_config(make_db_config(dir.path().native(), cfg.storage))
|
||||
, dir_sem(1)
|
||||
, feature_service(gms::feature_config_from_db_config(*db_config))
|
||||
, scf(make_sstable_compressor_factory())
|
||||
, scf(scfarg)
|
||||
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
|
||||
feature_service, cache_tracker, cfg.available_memory, dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()]{ return host_id; }, *scf, abort, current_scheduling_group(), sstm)
|
||||
[host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm)
|
||||
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
|
||||
, use_uuid(cfg.use_uuid)
|
||||
, storage(std::move(cfg.storage))
|
||||
@@ -242,8 +242,8 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdi
|
||||
}
|
||||
}
|
||||
|
||||
test_env::test_env(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tmp)
|
||||
: _impl(std::make_unique<impl>(std::move(cfg), sstm, tmp))
|
||||
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
|
||||
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -325,7 +325,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
|
||||
sharded<sstables::storage_manager> sstm;
|
||||
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
|
||||
auto stop_sstm = defer([&] { sstm.stop().get(); });
|
||||
test_env env(std::move(cfg), &sstm.local());
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env(std::move(cfg), *scf, &sstm.local());
|
||||
auto close_env = defer([&] { env.stop().get(); });
|
||||
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
|
||||
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
|
||||
@@ -334,7 +335,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
|
||||
}
|
||||
|
||||
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
|
||||
test_env env(std::move(cfg));
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test_env env(std::move(cfg), *scf);
|
||||
auto close_env = defer([&] { env.stop().get(); });
|
||||
func(env);
|
||||
});
|
||||
@@ -476,7 +478,8 @@ test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)>
|
||||
return seastar::async([func = std::move(func)] {
|
||||
tmpdir tdir;
|
||||
sharded<test_env> env;
|
||||
env.start(test_env_config{}, nullptr, &tdir).get();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
|
||||
auto stop = defer([&] { env.stop().get(); });
|
||||
func(env);
|
||||
});
|
||||
|
||||
@@ -515,3 +515,41 @@ def test_status_with_zero_token_nodes(request, nodetool):
|
||||
]
|
||||
|
||||
_do_test_status(request, nodetool, None, nodes)
|
||||
|
||||
|
||||
def test_status_negative_load(request, nodetool):
|
||||
nodes = [
|
||||
Node(
|
||||
endpoint="127.0.0.1",
|
||||
host_id="78a9c1d0-b341-467e-a076-9eff4cf7ffc6",
|
||||
load=-206015,
|
||||
tokens=["-9175818098208185248", "-3983536194780899528"],
|
||||
datacenter="datacenter1",
|
||||
rack="rack1",
|
||||
status=NodeStatus.Unknown,
|
||||
state=NodeState.Joining,
|
||||
),
|
||||
Node(
|
||||
endpoint="127.0.0.2",
|
||||
host_id="ed341f60-b12a-4fd4-9917-e80977ded0f9",
|
||||
load=277624,
|
||||
tokens=["-1810801828328238220", "2983536194780899528"],
|
||||
datacenter="datacenter1",
|
||||
rack="rack2",
|
||||
status=NodeStatus.Down,
|
||||
state=NodeState.Normal,
|
||||
),
|
||||
Node(
|
||||
endpoint="127.0.0.3",
|
||||
host_id="1e77eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
||||
load=353236,
|
||||
tokens=["3810801828328238220", "6810801828328238220"],
|
||||
datacenter="datacenter1",
|
||||
rack="rack3",
|
||||
status=NodeStatus.Up,
|
||||
state=NodeState.Normal,
|
||||
),
|
||||
]
|
||||
|
||||
status_target = StatusQueryTarget(keyspace="ks", table=None, uses_tablets=False)
|
||||
_do_test_status(request, nodetool, status_target, nodes)
|
||||
|
||||
@@ -144,7 +144,8 @@ int scylla_sstable_main(int argc, char** argv) {
|
||||
}
|
||||
cfg.compaction_strategy = sstables::compaction_strategy::type(app.configuration()["compaction-strategy"].as<sstring>());
|
||||
cfg.timestamp_range = app.configuration()["timestamp-range"].as<api::timestamp_type>();
|
||||
test.start(std::move(cfg)).get();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
test.start(std::move(cfg), std::ref(*scf)).get();
|
||||
auto stop_test = deferred_stop(test);
|
||||
|
||||
switch (mode) {
|
||||
|
||||
@@ -195,7 +195,9 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
perf_sstable_test_env(conf cfg) : _cfg(std::move(cfg))
|
||||
perf_sstable_test_env(conf cfg, sstable_compressor_factory& scf)
|
||||
: _env({}, scf)
|
||||
, _cfg(std::move(cfg))
|
||||
, s(create_schema(cfg.compaction_strategy))
|
||||
, _distribution('@', '~')
|
||||
, _mt(make_lw_shared<replica::memtable>(s))
|
||||
|
||||
@@ -45,13 +45,16 @@ async def load_tablet_repair_task_infos(cql, host, table_id):
|
||||
return repair_task_infos
|
||||
|
||||
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False, cmdline = None) -> (list[ServerInfo], CassandraSession, list[Host], str, str):
|
||||
assert rf <= 3, "A keyspace with RF > 3 will be RF-rack-invalid if there are fewer racks than the RF"
|
||||
|
||||
if fast_stats_refresh:
|
||||
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
|
||||
else:
|
||||
config = {}
|
||||
if disable_flush_cache_time:
|
||||
config.update({'repair_hints_batchlog_flush_cache_time_in_ms': 0})
|
||||
servers = [await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline)]
|
||||
servers = await manager.servers_add(3, config=config, cmdline=cmdline,
|
||||
property_file=[{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(rf)])
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {{'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets))
|
||||
|
||||
@@ -24,6 +24,7 @@ from typing import Optional, TypeVar, Any
|
||||
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import Statement # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
||||
|
||||
from test import BUILD_DIR, TOP_SRC_DIR
|
||||
@@ -315,3 +316,29 @@ async def gather_safely(*awaitables: Awaitable):
|
||||
|
||||
def get_xdist_worker_id() -> str | None:
|
||||
return os.environ.get("PYTEST_XDIST_WORKER")
|
||||
|
||||
|
||||
def execute_with_tracing(cql : Session, statement : str | Statement, log : bool = False, *cql_execute_extra_args, **cql_execute_extra_kwargs):
|
||||
""" Execute statement via cql session and log the tracing output. """
|
||||
|
||||
cql_execute_extra_kwargs['trace'] = True
|
||||
query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs)
|
||||
|
||||
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
|
||||
|
||||
ret = []
|
||||
page_traces = []
|
||||
for trace in tracing:
|
||||
ret.append(trace.events)
|
||||
if not log:
|
||||
continue
|
||||
|
||||
trace_events = []
|
||||
for event in trace.events:
|
||||
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
|
||||
page_traces.append("\n".join(trace_events))
|
||||
|
||||
if log:
|
||||
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
|
||||
|
||||
return ret
|
||||
|
||||
@@ -49,7 +49,7 @@ tools::tablets_t do_load_system_tablets(const db::config& dbcfg,
|
||||
std::string_view table_name,
|
||||
reader_permit permit) {
|
||||
sharded<sstable_manager_service> sst_man;
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
|
||||
auto stop_sst_man_service = deferred_stop(sst_man);
|
||||
|
||||
|
||||
@@ -396,7 +396,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
|
||||
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_semaphore = deferred_stop(rcs_sem);
|
||||
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<sstable_manager_service> sst_man;
|
||||
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
|
||||
auto stop_sst_man_service = deferred_stop(sst_man);
|
||||
@@ -500,7 +500,7 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem:
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem(1);
|
||||
abort_source abort;
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker,
|
||||
memory::stats().total_memory(), dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort);
|
||||
|
||||
@@ -67,13 +67,13 @@ static std::ostream& operator<<(std::ostream& os, const std::vector<sstring>& v)
|
||||
|
||||
// mimic the behavior of FileUtils::stringifyFileSize
|
||||
struct file_size_printer {
|
||||
uint64_t value;
|
||||
int64_t value;
|
||||
bool human_readable;
|
||||
bool use_correct_units;
|
||||
// Cassandra nodetool uses base_2 and base_10 units interchangeably, some
|
||||
// commands use this, some that. Let's accomodate this for now, and maybe
|
||||
// fix this mess at one point in the future, after the rewrite is done.
|
||||
file_size_printer(uint64_t value, bool human_readable = true, bool use_correct_units = false)
|
||||
file_size_printer(int64_t value, bool human_readable = true, bool use_correct_units = false)
|
||||
: value{value}
|
||||
, human_readable{human_readable}
|
||||
, use_correct_units{use_correct_units}
|
||||
@@ -87,15 +87,15 @@ struct fmt::formatter<file_size_printer> : fmt::formatter<string_view> {
|
||||
return fmt::format_to(ctx.out(), "{}", size.value);
|
||||
}
|
||||
|
||||
using unit_t = std::tuple<uint64_t, std::string_view, std::string_view>;
|
||||
using unit_t = std::tuple<int64_t, std::string_view, std::string_view>;
|
||||
const unit_t units[] = {
|
||||
{1UL << 40, "TiB", "TB"},
|
||||
{1UL << 30, "GiB", "GB"},
|
||||
{1UL << 20, "MiB", "MB"},
|
||||
{1UL << 10, "KiB", "KB"},
|
||||
{1LL << 40, "TiB", "TB"},
|
||||
{1LL << 30, "GiB", "GB"},
|
||||
{1LL << 20, "MiB", "MB"},
|
||||
{1LL << 10, "KiB", "KB"},
|
||||
};
|
||||
for (auto [n, base_2, base_10] : units) {
|
||||
if (size.value > n) {
|
||||
if ((size.value > n) || (size.value < -n)) {
|
||||
auto d = static_cast<float>(size.value) / n;
|
||||
auto postfix = size.use_correct_units ? base_2 : base_10;
|
||||
return fmt::format_to(ctx.out(), "{:.2f} {}", d, postfix);
|
||||
@@ -2197,7 +2197,7 @@ void status_operation(scylla_rest_client& client, const bpo::variables_map& vm)
|
||||
const auto joining = get_nodes_of_state(client, "joining");
|
||||
const auto leaving = get_nodes_of_state(client, "leaving");
|
||||
const auto moving = get_nodes_of_state(client, "moving");
|
||||
const auto endpoint_load = rjson_to_map<size_t>(client.get("/storage_service/load_map"));
|
||||
const auto endpoint_load = rjson_to_map<ssize_t>(client.get("/storage_service/load_map"));
|
||||
|
||||
const auto tablets_keyspace = keyspace && keyspace_uses_tablets(client, *keyspace);
|
||||
|
||||
|
||||
@@ -3559,7 +3559,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
}
|
||||
|
||||
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
|
||||
auto scf = make_sstable_compressor_factory();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem(1);
|
||||
abort_source abort;
|
||||
|
||||
@@ -406,7 +406,7 @@ class background_reclaimer {
|
||||
promise<>* _main_loop_wait = nullptr;
|
||||
future<> _done;
|
||||
bool _stopping = false;
|
||||
static constexpr size_t free_memory_threshold = 60'000'000;
|
||||
static constexpr size_t free_memory_threshold = background_reclaim_free_memory_threshold;
|
||||
private:
|
||||
bool have_work() const {
|
||||
#ifndef SEASTAR_DEFAULT_ALLOCATOR
|
||||
|
||||
@@ -29,6 +29,8 @@ constexpr int segment_size_shift = 17; // 128K; see #151, #152
|
||||
constexpr size_t segment_size = 1 << segment_size_shift;
|
||||
constexpr size_t max_zone_segments = 256;
|
||||
|
||||
constexpr size_t background_reclaim_free_memory_threshold = 60'000'000;
|
||||
|
||||
//
|
||||
// Frees some amount of objects from the region to which it's attached.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user