Compare commits
121 Commits
copilot/re
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9536949911 | ||
|
|
01d3b504d1 | ||
|
|
305f827888 | ||
|
|
d094bc6fc9 | ||
|
|
c11a2e2aaf | ||
|
|
a63b22eec6 | ||
|
|
0adf905112 | ||
|
|
c2a9f2d9c6 | ||
|
|
79958472bc | ||
|
|
ba192c1a29 | ||
|
|
89f5374435 | ||
|
|
184e0716b3 | ||
|
|
baa2592299 | ||
|
|
a926cba476 | ||
|
|
9c28b812ca | ||
|
|
d792916e8e | ||
|
|
a539ff6419 | ||
|
|
b295ce38ae | ||
|
|
2e50d1a357 | ||
|
|
d65b390780 | ||
|
|
4ebae7ae62 | ||
|
|
4fed3a5a5a | ||
|
|
5b86b6393a | ||
|
|
024af57bd5 | ||
|
|
c59327950b | ||
|
|
a2b2e46482 | ||
|
|
4a7ddbfe07 | ||
|
|
484fc374c1 | ||
|
|
a5251b4d44 | ||
|
|
2afe0695cf | ||
|
|
b62264e1d9 | ||
|
|
36cc0f8e7e | ||
|
|
a885c87547 | ||
|
|
371fc05943 | ||
|
|
4366cd5a81 | ||
|
|
38ee119112 | ||
|
|
6edf92a9e3 | ||
|
|
609ad01bbc | ||
|
|
10b7f2d924 | ||
|
|
5130ec84de | ||
|
|
9938183ace | ||
|
|
1271b42848 | ||
|
|
012e248792 | ||
|
|
1364eec694 | ||
|
|
85fe37a8e4 | ||
|
|
e21bdbb9ef | ||
|
|
ca8762885b | ||
|
|
3a7a1dc4a9 | ||
|
|
12596a8eca | ||
|
|
be3f50b658 | ||
|
|
6cd954de8d | ||
|
|
9a7ea917eb | ||
|
|
c5cff9e14f | ||
|
|
5aca2c134d | ||
|
|
cc299e335d | ||
|
|
a7b34a54bc | ||
|
|
eb78d3aefb | ||
|
|
304f47f6ec | ||
|
|
8ba5a1be70 | ||
|
|
20602b6a8b | ||
|
|
19513fa47e | ||
|
|
dec10d348e | ||
|
|
ffea5e67c1 | ||
|
|
bcbbc40026 | ||
|
|
70d9352cec | ||
|
|
e215350c61 | ||
|
|
59bf300e83 | ||
|
|
6d733051de | ||
|
|
24c134992b | ||
|
|
9247c9472a | ||
|
|
ab8d50b5e7 | ||
|
|
7986ef73da | ||
|
|
847504ad25 | ||
|
|
854587c10c | ||
|
|
9058d5658b | ||
|
|
dd9ec03323 | ||
|
|
eef6a95e26 | ||
|
|
9f2a13c8c2 | ||
|
|
4792a27396 | ||
|
|
f26c2b22dc | ||
|
|
163b65cec4 | ||
|
|
fcde30d2b0 | ||
|
|
26bd28dac9 | ||
|
|
6f1efcff31 | ||
|
|
204f9e2cc8 | ||
|
|
0c6a449a30 | ||
|
|
7673a17365 | ||
|
|
ae05d62b97 | ||
|
|
5c5911d874 | ||
|
|
6a2e52d250 | ||
|
|
f98c83b92f | ||
|
|
f5cf4a3893 | ||
|
|
12f0136b26 | ||
|
|
4e45ceda21 | ||
|
|
2c8b5143ba | ||
|
|
474de0f048 | ||
|
|
5ac07a6c72 | ||
|
|
f88d8edcaf | ||
|
|
05c70b0820 | ||
|
|
732321e3b8 | ||
|
|
a2622e1919 | ||
|
|
270bf34846 | ||
|
|
168f694c5d | ||
|
|
b5579be915 | ||
|
|
ad60d765f9 | ||
|
|
68d2086fa5 | ||
|
|
403d43093f | ||
|
|
2b1b4d1dfc | ||
|
|
a5b513dde7 | ||
|
|
2c431c1ea2 | ||
|
|
827563902c | ||
|
|
ccf194bd89 | ||
|
|
9b735bb4dc | ||
|
|
f29b87970a | ||
|
|
82ca17e70d | ||
|
|
ddf9d047db | ||
|
|
17a76b6264 | ||
|
|
ab45df1aa1 | ||
|
|
97f0f312e0 | ||
|
|
4df6a17d30 | ||
|
|
b3dbfaf27a |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.2.0-dev
|
||||
VERSION=2025.2.0-rc5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1006,6 +1006,17 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
default_write_isolation = parse_write_isolation(value);
|
||||
}
|
||||
|
||||
// Alternator uses tags whose keys start with the "system:" prefix for
|
||||
// internal purposes. Those should not be readable by ListTagsOfResource,
|
||||
// nor writable with TagResource or UntagResource (see #24098).
|
||||
// Only a few specific system tags, currently only system:write_isolation,
|
||||
// are deliberately intended to be set and read by the user, so are not
|
||||
// considered "internal".
|
||||
static bool tag_key_is_internal(std::string_view tag_key) {
|
||||
return tag_key.starts_with("system:") &&
|
||||
tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY;
|
||||
}
|
||||
|
||||
enum class update_tags_action { add_tags, delete_tags };
|
||||
static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>& tags_map, update_tags_action action) {
|
||||
if (action == update_tags_action::add_tags) {
|
||||
@@ -1030,6 +1041,9 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
|
||||
if (!validate_legal_tag_chars(tag_key)) {
|
||||
throw api_error::validation("A tag Key can only contain letters, spaces, and [+-=._:/]");
|
||||
}
|
||||
if (tag_key_is_internal(tag_key)) {
|
||||
throw api_error::validation(fmt::format("Tag key '{}' is reserved for internal use", tag_key));
|
||||
}
|
||||
// Note tag values are limited similarly to tag keys, but have a
|
||||
// longer length limit, and *can* be empty.
|
||||
if (tag_value.size() > 256) {
|
||||
@@ -1042,7 +1056,11 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
|
||||
}
|
||||
} else if (action == update_tags_action::delete_tags) {
|
||||
for (auto it = tags.Begin(); it != tags.End(); ++it) {
|
||||
tags_map.erase(sstring(it->GetString(), it->GetStringLength()));
|
||||
auto tag_key = rjson::to_string_view(*it);
|
||||
if (tag_key_is_internal(tag_key)) {
|
||||
throw api_error::validation(fmt::format("Tag key '{}' is reserved for internal use", tag_key));
|
||||
}
|
||||
tags_map.erase(sstring(tag_key));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1117,6 +1135,9 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
|
||||
rjson::value& tags = ret["Tags"];
|
||||
for (auto& tag_entry : tags_map) {
|
||||
if (tag_key_is_internal(tag_entry.first)) {
|
||||
continue;
|
||||
}
|
||||
rjson::value new_entry = rjson::empty_object();
|
||||
rjson::add(new_entry, "Key", rjson::from_string(tag_entry.first));
|
||||
rjson::add(new_entry, "Value", rjson::from_string(tag_entry.second));
|
||||
|
||||
@@ -2144,6 +2144,31 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"skip_cleanup",
|
||||
"description":"Don't cleanup keys from loaded sstables. Invalid if load_and_stream is true",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"skip_reshape",
|
||||
"description":"Don't reshape the loaded sstables. Invalid if load_and_stream is true",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"scope",
|
||||
"description":"Defines the set of nodes to which mutations can be streamed",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query",
|
||||
"enum": ["all", "dc", "rack", "node"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -453,17 +453,26 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
|
||||
auto cf = req->get_query_param("cf");
|
||||
auto stream = req->get_query_param("load_and_stream");
|
||||
auto primary_replica = req->get_query_param("primary_replica_only");
|
||||
auto skip_cleanup_p = req->get_query_param("skip_cleanup");
|
||||
boost::algorithm::to_lower(stream);
|
||||
boost::algorithm::to_lower(primary_replica);
|
||||
bool load_and_stream = stream == "true" || stream == "1";
|
||||
bool primary_replica_only = primary_replica == "true" || primary_replica == "1";
|
||||
bool skip_cleanup = skip_cleanup_p == "true" || skip_cleanup_p == "1";
|
||||
auto scope = parse_stream_scope(req->get_query_param("scope"));
|
||||
auto skip_reshape_p = req->get_query_param("skip_reshape");
|
||||
auto skip_reshape = skip_reshape_p == "true" || skip_reshape_p == "1";
|
||||
|
||||
if (scope != sstables_loader::stream_scope::all && !load_and_stream) {
|
||||
throw httpd::bad_param_exception("scope takes no effect without load-and-stream");
|
||||
}
|
||||
// No need to add the keyspace, since all we want is to avoid always sending this to the same
|
||||
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
|
||||
auto coordinator = std::hash<sstring>()(cf) % smp::count;
|
||||
return sst_loader.invoke_on(coordinator,
|
||||
[ks = std::move(ks), cf = std::move(cf),
|
||||
load_and_stream, primary_replica_only] (sstables_loader& loader) {
|
||||
return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only, sstables_loader::stream_scope::all);
|
||||
load_and_stream, primary_replica_only, skip_cleanup, skip_reshape, scope] (sstables_loader& loader) {
|
||||
return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only, skip_cleanup, skip_reshape, scope);
|
||||
}).then_wrapped([] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception());
|
||||
|
||||
343
compress.cc
343
compress.cc
@@ -15,6 +15,8 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "utils/reusable_buffer.hh"
|
||||
#include "sstables/compress.hh"
|
||||
#include "sstables/exceptions.hh"
|
||||
@@ -27,7 +29,7 @@
|
||||
|
||||
// SHA256
|
||||
using dict_id = std::array<std::byte, 32>;
|
||||
class sstable_compressor_factory_impl;
|
||||
class dictionary_holder;
|
||||
|
||||
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
|
||||
|
||||
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
|
||||
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
|
||||
// or referenced by algorithm-specific dicts.
|
||||
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
dict_id _id;
|
||||
std::vector<std::byte> _dict;
|
||||
public:
|
||||
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
|
||||
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
|
||||
~raw_dict();
|
||||
const std::span<const std::byte> raw() const { return _dict; }
|
||||
dict_id id() const { return _id; }
|
||||
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
|
||||
// (which internally holds a pointer to the raw dictionary blob
|
||||
// and parsed entropy tables).
|
||||
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
|
||||
public:
|
||||
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~zstd_ddict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -100,14 +102,14 @@ public:
|
||||
// so the level of compression is decided at the time of construction
|
||||
// of this dict.
|
||||
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
int _level;
|
||||
size_t _used_memory = 0;
|
||||
zstd_callback_allocator _alloc;
|
||||
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
|
||||
public:
|
||||
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
|
||||
~zstd_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -119,11 +121,11 @@ public:
|
||||
// and a hash index over the substrings of the blob).
|
||||
//
|
||||
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
|
||||
weak_ptr<sstable_compressor_factory_impl> _owner;
|
||||
weak_ptr<dictionary_holder> _owner;
|
||||
lw_shared_ptr<const raw_dict> _raw;
|
||||
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
|
||||
public:
|
||||
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
|
||||
~lz4_cdict();
|
||||
auto dict() const { return _dict.get(); }
|
||||
auto raw() const { return _raw->raw(); }
|
||||
@@ -164,6 +166,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
class snappy_processor: public compressor {
|
||||
@@ -266,6 +269,7 @@ public:
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
algorithm get_algorithm() const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
std::optional<unsigned> get_dict_owner_for_test() const override;
|
||||
};
|
||||
|
||||
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
|
||||
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
|
||||
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
|
||||
}
|
||||
|
||||
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
const std::string_view DICTIONARY_OPTION = ".dictionary.";
|
||||
|
||||
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
|
||||
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string compressor::name() const {
|
||||
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
|
||||
}
|
||||
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
|
||||
case algorithm::snappy: return "SnappyCompressor";
|
||||
case algorithm::zstd: return "ZstdCompressor";
|
||||
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
|
||||
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
|
||||
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
|
||||
}
|
||||
abort();
|
||||
}
|
||||
@@ -518,13 +536,17 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
|
||||
}
|
||||
}
|
||||
|
||||
void compression_parameters::validate(const gms::feature_service& fs) {
|
||||
if (!fs.sstable_compression_dicts) {
|
||||
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) {
|
||||
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
|
||||
if (!dicts_enabled) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
|
||||
"all nodes are upgraded to a versions which supports it",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
if (!dicts_allowed) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
}
|
||||
if (_chunk_length) {
|
||||
auto chunk_length = _chunk_length.value();
|
||||
@@ -660,6 +682,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
|
||||
if (_cdict) {
|
||||
return _cdict.get_owner_shard();
|
||||
} else if (_ddict) {
|
||||
return _ddict.get_owner_shard();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
compressor_ptr make_lz4_sstable_compressor_for_tests() {
|
||||
return std::make_unique<lz4_processor>();
|
||||
}
|
||||
@@ -751,21 +783,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
return snappy_max_compressed_length(input_len);
|
||||
}
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// across nodes.
|
||||
//
|
||||
// Holds weak pointers to all live dictionaries
|
||||
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
|
||||
// and shared (lifetime-extending) pointers to the current writer ("recommended")
|
||||
// dict for each table (so that they can be shared with new SSTables without consulting
|
||||
// `system.dicts`).
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
|
||||
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
|
||||
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
|
||||
//
|
||||
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
|
||||
// is removed from the factory.
|
||||
//
|
||||
@@ -774,10 +797,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
|
||||
// Has a configurable memory budget for live dicts. If the budget is exceeded,
|
||||
// will return null dicts to new writers (to avoid making the memory usage even worse)
|
||||
// and print warnings.
|
||||
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
|
||||
class dictionary_holder : public weakly_referencable<dictionary_holder> {
|
||||
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
|
||||
shard_id _owner_shard;
|
||||
config _cfg;
|
||||
using config = default_sstable_compressor_factory::config;
|
||||
const config& _cfg;
|
||||
uint64_t _total_live_dict_memory = 0;
|
||||
metrics::metric_groups _metrics;
|
||||
struct zstd_cdict_id {
|
||||
@@ -789,7 +812,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
|
||||
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
|
||||
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
|
||||
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
|
||||
std::map<table_id, lw_shared_ptr<foreign_ptr<lw_shared_ptr<const raw_dict>>>> _recommended;
|
||||
|
||||
size_t memory_budget() const {
|
||||
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
|
||||
@@ -806,8 +829,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
memory_budget()
|
||||
);
|
||||
}
|
||||
public:
|
||||
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (dict.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
auto id = get_sha256(dict);
|
||||
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
|
||||
return it->second->shared_from_this();
|
||||
@@ -819,7 +845,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
|
||||
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_ddict> ddict;
|
||||
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
|
||||
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
|
||||
@@ -835,15 +863,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(ddict));
|
||||
}
|
||||
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_zstd_dict_for_reading(raw, level);
|
||||
});
|
||||
}
|
||||
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
|
||||
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const zstd_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -859,19 +883,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
|
||||
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_zstd_dict_for_writing(rec_it->second, level);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
using lz4_dicts = std::pair<
|
||||
foreign_ptr<lw_shared_ptr<const raw_dict>>,
|
||||
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
|
||||
@@ -879,18 +890,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
|
||||
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
|
||||
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
lw_shared_ptr<const raw_dict> ddict;
|
||||
return make_foreign(std::move(raw));
|
||||
}
|
||||
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
|
||||
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
|
||||
auto raw = get_canonical_ptr(dict);
|
||||
return get_lz4_dict_for_reading(raw);
|
||||
});
|
||||
}
|
||||
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (!_cfg.enable_writing_dictionaries() || !raw) {
|
||||
return nullptr;
|
||||
}
|
||||
lw_shared_ptr<const lz4_cdict> cdict;
|
||||
// If we can share an already-allocated dict, we do that regardless of memory budget.
|
||||
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
|
||||
@@ -905,24 +910,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
|
||||
}
|
||||
return make_foreign(std::move(cdict));
|
||||
}
|
||||
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
|
||||
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
|
||||
if (!_cfg.enable_writing_dictionaries()) {
|
||||
return {};
|
||||
}
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it != _recommended.end()) {
|
||||
return get_lz4_dict_for_writing(rec_it->second);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
sstable_compressor_factory_impl(config cfg)
|
||||
: _owner_shard(this_shard_id())
|
||||
, _cfg(std::move(cfg))
|
||||
dictionary_holder(const config& cfg)
|
||||
: _cfg(cfg)
|
||||
{
|
||||
if (_cfg.register_metrics) {
|
||||
namespace sm = seastar::metrics;
|
||||
@@ -931,8 +922,8 @@ public:
|
||||
});
|
||||
}
|
||||
}
|
||||
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
|
||||
~sstable_compressor_factory_impl() {
|
||||
dictionary_holder(dictionary_holder&&) = delete;
|
||||
~dictionary_holder() {
|
||||
// Note: `_recommended` might be the only thing keeping some dicts alive,
|
||||
// so clearing it will destroy them.
|
||||
//
|
||||
@@ -948,39 +939,39 @@ public:
|
||||
_recommended.clear();
|
||||
}
|
||||
void forget_raw_dict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_raw_dicts.erase(id);
|
||||
}
|
||||
void forget_zstd_cdict(dict_id id, int level) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_cdicts.erase({id, level});
|
||||
}
|
||||
void forget_zstd_ddict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_zstd_ddicts.erase(id);
|
||||
}
|
||||
void forget_lz4_cdict(dict_id id) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
_lz4_cdicts.erase(id);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
|
||||
return smp::submit_to(_owner_shard, [this, t, dict] {
|
||||
_recommended.erase(t);
|
||||
if (dict.size()) {
|
||||
auto canonical_ptr = get_canonical_ptr(dict);
|
||||
_recommended.emplace(t, canonical_ptr);
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict.size(), fmt_hex(canonical_ptr->id()));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
});
|
||||
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
|
||||
_recommended.erase(t);
|
||||
if (dict) {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
|
||||
t, dict->raw().size(), fmt_hex(dict->id()));
|
||||
_recommended.emplace(t, make_lw_shared(std::move(dict)));
|
||||
} else {
|
||||
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
|
||||
}
|
||||
}
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
|
||||
auto rec_it = _recommended.find(t);
|
||||
if (rec_it == _recommended.end()) {
|
||||
co_return nullptr;
|
||||
}
|
||||
// Note that rec_it might be invalidated while we are doing the copy(),
|
||||
// so we have to make a copy of the outer shared ptr first.
|
||||
lw_shared_ptr<foreign_ptr<lw_shared_ptr<const raw_dict>>> ptr = rec_it->second;
|
||||
co_return co_await ptr->copy();
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
|
||||
void account_memory_delta(ssize_t n) {
|
||||
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
|
||||
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
|
||||
compressor_factory_logger.error(
|
||||
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
|
||||
@@ -990,19 +981,85 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
|
||||
: _cfg(std::move(cfg))
|
||||
, _holder(std::make_unique<dictionary_holder>(_cfg))
|
||||
{
|
||||
for (shard_id i = 0; i < smp::count; ++i) {
|
||||
auto numa_id = _cfg.numa_config[i];
|
||||
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
|
||||
_numa_groups[numa_id].push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
|
||||
const auto params = s->get_compressor_params();
|
||||
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
|
||||
}
|
||||
|
||||
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
|
||||
auto sp = local_engine->smp().shard_to_numa_node_mapping();
|
||||
return std::vector<unsigned>(sp.begin(), sp.end());
|
||||
}
|
||||
|
||||
unsigned default_sstable_compressor_factory::local_numa_id() {
|
||||
return _cfg.numa_config[this_shard_id()];
|
||||
}
|
||||
|
||||
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
|
||||
auto hash = read_unaligned<uint64_t>(sha.data());
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
|
||||
}
|
||||
return group[hash % group.size()];
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
|
||||
if (_leader_shard != this_shard_id()) {
|
||||
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
|
||||
}
|
||||
auto units = co_await get_units(_recommendation_setting_sem, 1);
|
||||
auto sha = get_sha256(dict);
|
||||
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
|
||||
const auto& group = _numa_groups[numa_id];
|
||||
if (group.empty()) {
|
||||
continue;
|
||||
}
|
||||
auto r = get_dict_owner(numa_id, sha);
|
||||
auto d = co_await container().invoke_on(r, [dict](self& local) {
|
||||
return make_foreign(local._holder->get_canonical_ptr(dict));
|
||||
});
|
||||
auto local_coordinator = group[0];
|
||||
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
|
||||
local._holder->set_recommended_dict(t, std::move(d));
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
|
||||
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
|
||||
const auto local_coordinator = _numa_groups[local_numa_id()][0];
|
||||
return container().invoke_on(local_coordinator, [t](self& local) {
|
||||
return local._holder->get_recommended_dict(t);
|
||||
});
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_lz4_dicts_for_writing(s->id())
|
||||
: nullptr;
|
||||
holder::foreign_lz4_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_lz4_dict_for_writing(recommended.release());
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1015,9 +1072,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
case algorithm::zstd:
|
||||
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto cdict = _cfg.enable_writing_dictionaries()
|
||||
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
|
||||
: nullptr;
|
||||
holder::foreign_zstd_cdict cdict;
|
||||
if (auto recommended = co_await get_recommended_dict(id)) {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
|
||||
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
|
||||
});
|
||||
}
|
||||
if (cdict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
|
||||
}
|
||||
@@ -1029,17 +1090,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
|
||||
abort();
|
||||
}
|
||||
|
||||
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
|
||||
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
|
||||
return make_compressor_for_writing_impl(params, id);
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
using algorithm = compression_parameters::algorithm;
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
switch (algo) {
|
||||
case algorithm::lz4:
|
||||
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
|
||||
case algorithm::lz4_with_dicts: {
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_lz4_dict_for_reading(std::move(d));
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1054,8 +1126,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
}
|
||||
case algorithm::zstd_with_dicts: {
|
||||
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
|
||||
auto dict = dict_from_options(c);
|
||||
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
|
||||
auto dict_span = dict;
|
||||
auto sha = get_sha256(dict_span);
|
||||
auto dict_owner = get_dict_owner(local_numa_id(), sha);
|
||||
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
|
||||
auto d = local._holder->get_canonical_ptr(dict_span);
|
||||
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
|
||||
});
|
||||
if (ddict) {
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
|
||||
}
|
||||
@@ -1067,7 +1144,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
|
||||
abort();
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
|
||||
const auto params = compression_parameters(sstables::options_from_compression(c));
|
||||
auto dict = dict_from_options(c);
|
||||
const auto algo = params.get_algorithm();
|
||||
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
|
||||
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
|
||||
}
|
||||
|
||||
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
|
||||
return make_compressor_for_reading_impl(params, dict);
|
||||
}
|
||||
|
||||
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _id(key)
|
||||
, _dict(dict.begin(), dict.end())
|
||||
@@ -1082,7 +1171,7 @@ raw_dict::~raw_dict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _level(level)
|
||||
@@ -1114,7 +1203,7 @@ zstd_cdict::~zstd_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _alloc([this] (ssize_t n) {
|
||||
@@ -1143,7 +1232,7 @@ zstd_ddict::~zstd_ddict() {
|
||||
}
|
||||
}
|
||||
|
||||
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
|
||||
: _owner(owner.weak_from_this())
|
||||
, _raw(raw)
|
||||
, _dict(LZ4_createStream(), LZ4_freeStream)
|
||||
@@ -1162,6 +1251,28 @@ lz4_cdict::~lz4_cdict() {
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
|
||||
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
struct wrapper : sstable_compressor_factory {
|
||||
using impl = default_sstable_compressor_factory;
|
||||
sharded<impl> _impl;
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
|
||||
return _impl.local().make_compressor_for_writing(s);
|
||||
}
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
|
||||
return _impl.local().make_compressor_for_reading(c);
|
||||
}
|
||||
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
|
||||
return _impl.local().set_recommended_dict(t, d);
|
||||
};
|
||||
wrapper(wrapper&&) = delete;
|
||||
wrapper() {
|
||||
_impl.start().get();
|
||||
}
|
||||
~wrapper() {
|
||||
_impl.stop().get();
|
||||
}
|
||||
};
|
||||
return std::make_unique<wrapper>();
|
||||
}
|
||||
|
||||
|
||||
12
compress.hh
12
compress.hh
@@ -13,12 +13,9 @@
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace gms {
|
||||
class feature_service;
|
||||
} // namespace gms
|
||||
|
||||
class compression_parameters;
|
||||
|
||||
class compressor {
|
||||
@@ -64,6 +61,8 @@ public:
|
||||
|
||||
virtual algorithm get_algorithm() const = 0;
|
||||
|
||||
virtual std::optional<unsigned> get_dict_owner_for_test() const;
|
||||
|
||||
using ptr_type = std::unique_ptr<compressor>;
|
||||
};
|
||||
|
||||
@@ -106,7 +105,10 @@ public:
|
||||
algorithm get_algorithm() const { return _algorithm; }
|
||||
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
|
||||
|
||||
void validate(const gms::feature_service&);
|
||||
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
|
||||
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
|
||||
void validate(dicts_feature_enabled, dicts_usage_allowed);
|
||||
|
||||
std::map<sstring, sstring> get_options() const;
|
||||
|
||||
bool compression_enabled() const {
|
||||
|
||||
@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/secondary_index_test.cc',
|
||||
'test/boost/sessions_test.cc',
|
||||
'test/boost/sstable_compaction_test.cc',
|
||||
'test/boost/sstable_compressor_factory_test.cc',
|
||||
'test/boost/sstable_directory_test.cc',
|
||||
'test/boost/sstable_set_test.cc',
|
||||
'test/boost/statement_restrictions_test.cc',
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "db/per_partition_rate_limit_options.hh"
|
||||
#include "db/tablet_options.hh"
|
||||
#include "utils/bloom_calculations.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
@@ -135,7 +136,9 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
|
||||
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
|
||||
}
|
||||
compression_parameters cp(*compression_options);
|
||||
cp.validate(db.features());
|
||||
cp.validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
|
||||
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
|
||||
}
|
||||
|
||||
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
|
||||
|
||||
@@ -113,10 +113,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
|
||||
if (rs->uses_tablets()) {
|
||||
warnings.push_back(
|
||||
"Tables in this keyspace will be replicated using Tablets "
|
||||
"and will not support CDC, LWT and counters features. "
|
||||
"To use CDC, LWT or counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} "
|
||||
"to the CREATE KEYSPACE statement.");
|
||||
"and will not support Materialized Views, Secondary Indexes, CDC, LWT and counters features. "
|
||||
"To use Materialized Views, Secondary Indexes, CDC, LWT or counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
|
||||
if (ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
}
|
||||
|
||||
@@ -1230,7 +1230,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
|
||||
"bytes written to data file. Value must be between 0 and 1.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, (size_t(128) << 10) + 1, "Warn about memory allocations above this size; set to zero to disable.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
|
||||
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
|
||||
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")
|
||||
, enable_node_aggregated_table_metrics(this, "enable_node_aggregated_table_metrics", value_status::Used, true, "Enable aggregated per node, per keyspace and per table metrics reporting, applicable if enable_keyspace_column_family_metrics is false.")
|
||||
@@ -1243,6 +1243,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, sstable_format(this, "sstable_format", value_status::Used, "me", "Default sstable file format", {"md", "me"})
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
|
||||
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
|
||||
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
|
||||
"on tables which already have it configured, and won't do anything to existing sstables.\n"
|
||||
"To affect existing tables, you can ALTER them to a non-dictionary compressor, or disable dictionary compression "
|
||||
"for the whole node through `sstable_compression_dictionaries_enable_writing`.")
|
||||
, sstable_compression_dictionaries_enable_writing(this, "sstable_compression_dictionaries_enable_writing", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Enables SSTable compression with shared dictionaries (for tables which opt in). If set to false, this node won't write any new SSTables using dictionary compression.\n"
|
||||
"Option meant not for regular usage, but for unforeseen problems that call for disabling dictionaries without modifying table schema.")
|
||||
|
||||
@@ -436,6 +436,7 @@ public:
|
||||
named_value<bool> enable_sstables_mc_format;
|
||||
named_value<bool> enable_sstables_md_format;
|
||||
named_value<sstring> sstable_format;
|
||||
named_value<bool> sstable_compression_dictionaries_allow_in_ddl;
|
||||
named_value<bool> sstable_compression_dictionaries_enable_writing;
|
||||
named_value<float> sstable_compression_dictionaries_memory_budget_fraction;
|
||||
named_value<float> sstable_compression_dictionaries_retrain_period_in_seconds;
|
||||
|
||||
@@ -22,6 +22,8 @@ def readable_desc_rst(description):
|
||||
|
||||
cleaned_line = line.replace('\\n', '\n')
|
||||
|
||||
cleaned_line = cleaned_line.replace('\\t', '\n' + indent * 2)
|
||||
|
||||
if line.endswith('"'):
|
||||
cleaned_line = cleaned_line[:-1] + ' '
|
||||
|
||||
|
||||
28
docs/_static/data/os-support.json
vendored
28
docs/_static/data/os-support.json
vendored
@@ -1,15 +1,24 @@
|
||||
{
|
||||
"Linux Distributions": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04"],
|
||||
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
},
|
||||
"ScyllaDB Versions": [
|
||||
{
|
||||
"version": "Enterprise 2025.1",
|
||||
"version": "ScyllaDB 2025.2",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04"],
|
||||
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"version": "ScyllaDB 2025.1",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
@@ -18,7 +27,7 @@
|
||||
{
|
||||
"version": "Enterprise 2024.2",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04"],
|
||||
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
@@ -27,20 +36,11 @@
|
||||
{
|
||||
"version": "Enterprise 2024.1",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04*"],
|
||||
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04*"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"version": "Open Source 6.2",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["20.04", "22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
#old path: new path
|
||||
|
||||
|
||||
# Remove reduntant pages
|
||||
|
||||
/stable/getting-started/tutorials: https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html
|
||||
/stable/contribute: https://github.com/scylladb/scylladb/blob/master/CONTRIBUTING.md
|
||||
|
||||
# Remove an oudated article
|
||||
|
||||
/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
Contribute to ScyllaDB
|
||||
=======================
|
||||
|
||||
Thank you for your interest in making ScyllaDB better!
|
||||
We appreciate your help and look forward to welcoming you to the ScyllaDB Community.
|
||||
There are two ways you can contribute:
|
||||
|
||||
* Send a patch to the ScyllaDB source code
|
||||
* Write documentation for ScyllaDB Docs
|
||||
|
||||
|
||||
Contribute to ScyllaDB's Source Code
|
||||
------------------------------------
|
||||
ScyllaDB developers use patches and email to share and discuss changes.
|
||||
Setting up can take a little time, but once you have done it the first time, it’s easy.
|
||||
|
||||
The basic steps are:
|
||||
|
||||
* Join the ScyllaDB community
|
||||
* Create a Git branch to work on
|
||||
* Commit your work with clear commit messages and sign-offs.
|
||||
* Send a PR or use ``git format-patch`` and ``git send-email`` to send to the list
|
||||
|
||||
|
||||
The entire process is `documented here <https://github.com/scylladb/scylla/blob/master/CONTRIBUTING.md>`_.
|
||||
|
||||
Contribute to ScyllaDB Docs
|
||||
---------------------------
|
||||
|
||||
Each ScyllaDB project has accompanying documentation. For information about contributing documentation to a specific ScyllaDB project, refer to the README file for the individual project.
|
||||
For general information or to contribute to the ScyllaDB Sphinx theme, read the `Contributor's Guide <https://sphinx-theme.scylladb.com/stable/contribute/>`_.
|
||||
@@ -67,9 +67,6 @@ You can enable CDC when creating or altering a table using the ``cdc`` option, f
|
||||
|
||||
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
|
||||
|
||||
.. note::
|
||||
If you enabled CDC and later decide to disable it, you need to **stop all writes** to the base table before issuing the ``ALTER TABLE ... WITH cdc = {'enabled':false};`` command.
|
||||
|
||||
.. include:: /features/cdc/_common/cdc-params.rst
|
||||
|
||||
Using CDC with Applications
|
||||
|
||||
@@ -113,7 +113,38 @@ Pick a zone where Haswell CPUs are found. Local SSD performance offers, accordin
|
||||
Image with NVMe disk interface is recommended.
|
||||
(`More info <https://cloud.google.com/compute/docs/disks/local-ssd>`_)
|
||||
|
||||
Recommended instances types are `n1-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n1_machines>`_ and `n2-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n2_machines>`_
|
||||
Recommended instances types are `z3-highmem-highlssd <https://cloud.google.com/compute/docs/storage-optimized-machines#z3_machine_types>`_,
|
||||
`n1-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n1_machines>`_, and `n2-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n2_machines>`_
|
||||
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GB)
|
||||
- Storage (GB)
|
||||
* - z3-highmem-8-highlssd
|
||||
- 8
|
||||
- 64
|
||||
- 3,000
|
||||
* - z3-highmem-16-highlssd
|
||||
- 16
|
||||
- 128
|
||||
- 6,000
|
||||
* - z3-highmem-22-highlssd
|
||||
- 22
|
||||
- 176
|
||||
- 9,000
|
||||
* - z3-highmem-32-highlssd
|
||||
- 32
|
||||
- 256
|
||||
- 12,000
|
||||
* - z3-highmem-44-highlssd
|
||||
- 44
|
||||
- 352
|
||||
- 18,000
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
|
||||
@@ -11,7 +11,6 @@ Getting Started
|
||||
requirements
|
||||
Migrate to ScyllaDB </using-scylla/migrate-scylla>
|
||||
Integration Solutions </using-scylla/integrations/index>
|
||||
tutorials
|
||||
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Requirements
|
||||
|
||||
@@ -4,6 +4,9 @@ OS Support by Linux Distributions and Version
|
||||
The following matrix shows which Linux distributions, containers, and images
|
||||
are :ref:`supported <os-support-definition>` with which versions of ScyllaDB.
|
||||
|
||||
Note that support for Ubuntu 20.04 is deprecated and will be removed in
|
||||
a future release.
|
||||
|
||||
.. datatemplate:json:: /_static/data/os-support.json
|
||||
:template: platforms.tmpl
|
||||
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
============
|
||||
Tutorials
|
||||
============
|
||||
|
||||
The tutorials will show you how to use ScyllaDB as a data source for an application.
|
||||
|
||||
|
||||
ScyllaDB Tutorial
|
||||
===================
|
||||
|
||||
`Build an IoT App with sensor simulator and a REST API <https://iot.scylladb.com/stable/>`_
|
||||
|
||||
ScyllaDB Cloud Tutorial
|
||||
=======================
|
||||
|
||||
`Implement CRUD operations with a TODO App <https://github.com/scylladb/scylla-cloud-getting-started/>`_
|
||||
|
||||
ScyllaDB Cloud Feature Store Tutorial
|
||||
=====================================
|
||||
|
||||
`Build a machine learning (ML) feature store with ScyllaDB <https://feature-store.scylladb.com/stable/>`_
|
||||
@@ -73,6 +73,5 @@ In addition, you can read our `blog <https://www.scylladb.com/blog/>`_ and atten
|
||||
kb/index
|
||||
reference/index
|
||||
faq
|
||||
Contribute to ScyllaDB <contribute>
|
||||
2024.2 and earlier documentation <https://enterprise.docs.scylladb.com/branch-2024.2/>
|
||||
|
||||
|
||||
@@ -2,40 +2,65 @@
|
||||
How to Safely Increase the Replication Factor
|
||||
=======================================================
|
||||
|
||||
A replication factor (RF) is configured per keyspace. You can change the RF
|
||||
using the :ref:`ALTER KEYSPACE <alter-keyspace-statement>` command.
|
||||
|
||||
**Topic: What can happen when you increase RF**
|
||||
To increase the RF safely, ensure you follow the guidelines below.
|
||||
The guidelines differ depending on whether your a keyspace is tablets-based
|
||||
(the default) or has tablets disabled. See :doc:`Data Distribution with Tablets </architecture/tablets>`
|
||||
for more information about tablets.
|
||||
|
||||
Increasing the RF in Tablets-based Keyspaces
|
||||
-------------------------------------------------
|
||||
|
||||
**Audience: ScyllaDB administrators**
|
||||
If a keyspace has tablets enabled (the default), changing the RF does not
|
||||
impact data consistency in the cluster.
|
||||
|
||||
However, due to limitations in the current protocol used to pass tablet data
|
||||
to drivers, drivers will not pick up new replicas after the RF is increased.
|
||||
As a result, drivers will not route requests to new replicas, causing imbalance.
|
||||
|
||||
Issues
|
||||
------
|
||||
To avoid this issue, restart the client applications after the ALTER statement
|
||||
that changes the RF completes successfully.
|
||||
|
||||
When a Replication Factor (RF) is increased, using the :ref:`ALTER KEYSPACE <alter-keyspace-statement>` command, the data consistency is effectively dropped
|
||||
by the difference of the RF_new value and the RF_old value for all pre-existing data.
|
||||
Increasing the RF in Keyspaces with Tablets Disabled
|
||||
----------------------------------------------------------
|
||||
|
||||
If you :ref:`opted out of tablets when creating a keyspace <tablets-enable-tablets>`,
|
||||
so your keyspace is vnodes-based, increasing the RF will impact data consistency.
|
||||
|
||||
Data consistency in your cluster is effectively dropped by the difference
|
||||
between the RF_new value and the RF_old value for all pre-existing data.
|
||||
Consistency will only be restored after running a repair.
|
||||
|
||||
Another issue occurs in keyspaces with tablets enabled and is driver-related. Due to limitations in the current protocol used to pass tablet data to drivers, drivers will not pick
|
||||
up new replicas after replication factor is increased. This will cause them to avoid routing requests to those replicas, causing imbalance.
|
||||
|
||||
Resolution
|
||||
----------
|
||||
========================
|
||||
|
||||
When one increases an RF, one should consider that the pre-existing data will **not be streamed** to new replicas (a common misconception).
|
||||
When you increase the RF, you should be aware that the pre-existing data will
|
||||
**not be streamed** to new replicas (a common misconception).
|
||||
|
||||
As a result, in order to make sure that you can keep on reading the old data with the same level of consistency, increase the read Consistency Level (CL) according to the following formula:
|
||||
As a result, in order to make sure that you can keep on reading the old data
|
||||
with the same level of consistency:
|
||||
|
||||
``CL_new = CL_old + RF_new - RF_old``
|
||||
#. Increase the read Consistency Level (CL) according to the following formula:
|
||||
|
||||
After you run a repair, you can decrease the CL. If RF has only been changed in a particular Data Center (DC) only the nodes in that DC have to be repaired.
|
||||
.. code::
|
||||
|
||||
CL_new = CL_old + RF_new - RF_old
|
||||
|
||||
#. Run repair.
|
||||
#. Decrease the CL.
|
||||
|
||||
|
||||
If RF has only been changed in a particular Datacenter (DC), only the nodes in
|
||||
that DC have to be repaired.
|
||||
|
||||
To resolve the driver-related issue, restart the client applications after the ALTER statement that changes the RF completes successfully.
|
||||
|
||||
Example
|
||||
=======
|
||||
|
||||
In this example your five node cluster RF is 3 and your CL is TWO. You want to increase your RF from 3 to 5.
|
||||
In this example, your five-node cluster RF is 3 and your CL is TWO. You want to increase your RF from 3 to 5.
|
||||
|
||||
#. Increase the read CL by a RF_new - RF_old value.
|
||||
Following the example the RF_new is 5 and the RF_old is 3 so, 5-3 =2. You need to increase the CL by 2.
|
||||
@@ -45,9 +70,9 @@ In this example your five node cluster RF is 3 and your CL is TWO. You want to i
|
||||
#. Restore the reads CL to the originally intended value. For this example, QUORUM.
|
||||
|
||||
|
||||
If you do not follow the procedure above you may start reading stale or null data after increasing the RF.
|
||||
If you do not follow the procedure above, you may start reading stale or null data after increasing the RF.
|
||||
|
||||
More Information
|
||||
References
|
||||
----------------
|
||||
|
||||
* :doc:`Fault Tolerance </architecture/architecture-fault-tolerance/>`
|
||||
|
||||
@@ -5,4 +5,3 @@ The cassandra-stress tool is used for benchmarking and load testing both ScyllaD
|
||||
|
||||
Cassandra Stress is not part of ScyllaDB and it is not distributed along side it anymore. It has it's own separate repository and release cycle. More information about it can be found on `GitHub <https://github.com/scylladb/cassandra-stress>`_ or on `DockerHub <https://hub.docker.com/r/scylladb/cassandra-stress>`_.
|
||||
|
||||
.. include:: /rst_include/apache-copyrights.rst
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
.. _nodetool-cleanup-cmd:
|
||||
|
||||
Nodetool cleanup
|
||||
================
|
||||
**cleanup** ``[<keyspace> <tablename ...>]``- triggers the immediate removal of data from node(s) that "lose" part of their token range due to a range movement operation (node addition or node replacement).
|
||||
|
||||
@@ -29,7 +29,7 @@ Load and Stream
|
||||
|
||||
.. code::
|
||||
|
||||
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las]
|
||||
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las] [--scope <scope>]
|
||||
|
||||
The Load and Stream feature extends nodetool refresh. The new ``-las`` option loads arbitrary sstables that do not belong to a node into the cluster. It loads the sstables from the disk and calculates the data's owning nodes, and streams automatically.
|
||||
For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. We can copy the sstables from the old cluster to any of the new nodes and trigger the load and stream process.
|
||||
@@ -39,5 +39,42 @@ Load and Stream make restores and migrations much easier:
|
||||
* You can place sstable from every node to every node
|
||||
* No need to run nodetool cleanup to remove unused data
|
||||
|
||||
Scope
|
||||
-----
|
||||
|
||||
The `scope` parameter describes the subset of cluster nodes where you want to load data:
|
||||
|
||||
* `node` - On the local node.
|
||||
* `rack` - On the local rack.
|
||||
* `dc` - In the datacenter (DC) where the local node lives.
|
||||
* `all` (default) - Everywhere across the cluster.
|
||||
|
||||
Scope supports a variety of options for filtering out the destination nodes.
|
||||
On one extreme, one node is given all SStables with the scope ``all``; on the other extreme, all
|
||||
nodes are loading only their own SStables with the scope ``node``. In between, you can choose
|
||||
a subset of nodes to load only SStables that belong to the rack or DC.
|
||||
|
||||
This option is only valid when using the ``--load-and-stream`` option.
|
||||
|
||||
|
||||
Skip cleanup
|
||||
---------------
|
||||
|
||||
.. code::
|
||||
|
||||
nodetool refresh <my_keyspace> <my_table> [--skip-cleanup]
|
||||
|
||||
When loading an SSTable, Scylla will cleanup it from keys that the node is not responsible for. To skip this step, use the `--skip-cleanup` option.
|
||||
See :ref:`nodetool cleanup <nodetool-cleanup-cmd>`.
|
||||
|
||||
|
||||
Skip reshape
|
||||
---------------
|
||||
|
||||
.. code::
|
||||
|
||||
nodetool refresh <my_keyspace> <my_table> [--skip-reshape]
|
||||
|
||||
When refreshing, the SSTables to load might be out of shape, Scylla will attempt to reshape them if that's the case. To skip this step, use the `--skip-reshape` option.
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -4,8 +4,7 @@ Upgrade ScyllaDB
|
||||
|
||||
.. toctree::
|
||||
|
||||
ScyllaDB Open Source 6.2 to ScyllaDB 2025.1 <upgrade-guide-from-6.2-to-2025.1/index>
|
||||
ScyllaDB Enterprise 2024.x to ScyllaDB 2025.1 <upgrade-guide-from-2024.x-to-2025.1/index>
|
||||
ScyllaDB 2025.1 to ScyllaDB 2025.2 <upgrade-guide-from-2025.1-to-2025.2/index>
|
||||
ScyllaDB Image <ami-upgrade>
|
||||
|
||||
|
||||
|
||||
@@ -1,129 +0,0 @@
|
||||
=====================================
|
||||
Enable Consistent Topology Updates
|
||||
=====================================
|
||||
|
||||
.. note::
|
||||
|
||||
The following procedure only applies if:
|
||||
|
||||
* You're upgrading **from ScyllaDB Enterprise 2024.1** to ScyllaDB 2025.1.
|
||||
* You previously upgraded from 2024.1 to 2024.2 without enabling consistent
|
||||
topology updates (see the `2024.2 upgrade guide <https://enterprise.docs.scylladb.com/branch-2024.2/upgrade/upgrade-enterprise/upgrade-guide-from-2024.1-to-2024.2/enable-consistent-topology.html>`_
|
||||
for reference).
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
ScyllaDB 2025.1 has :ref:`consistent topology changes based on Raft <raft-topology-changes>`.
|
||||
Clusters created with version 2025.1 use consistent topology changes right
|
||||
from the start. However, consistent topology changes are *not* automatically
|
||||
enabled in clusters upgraded from version 2024.1. In such clusters, you need to
|
||||
enable consistent topology changes manually by following the procedure described in this article.
|
||||
|
||||
Before you start, you **must** check that the cluster meets the prerequisites
|
||||
and ensure that some administrative procedures will not be run while
|
||||
the procedure is in progress.
|
||||
|
||||
.. _enable-raft-topology-2025.1-prerequisites:
|
||||
|
||||
Prerequisites
|
||||
=============
|
||||
|
||||
* Make sure that all nodes in the cluster are upgraded to ScyllaDB 2025.1.
|
||||
* Verify that :ref:`schema on raft is enabled <schema-on-raft-enabled>`.
|
||||
* Make sure that all nodes enabled ``SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`` cluster feature.
|
||||
One way to verify it is to look for the following message in the log:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
features - Feature SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES is enabled
|
||||
|
||||
Alternatively, it can be verified programmatically by checking whether the ``value``
|
||||
column under the ``enabled_features`` key contains the name of the feature in
|
||||
the ``system.scylla_local`` table. One way to do it is with the following bash script:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
until cqlsh -e "select value from system.scylla_local where key = 'enabled_features'" | grep "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"
|
||||
do
|
||||
echo "Upgrade didn't finish yet on the local node, waiting 10 seconds before checking again..."
|
||||
sleep 10
|
||||
done
|
||||
echo "Upgrade completed on the local node"
|
||||
|
||||
* Make sure that all nodes are alive for the duration of the procedure.
|
||||
|
||||
.. _enable-raft-topology-2025.1-forbidden-operations:
|
||||
|
||||
Administrative operations that must not be running during the procedure
|
||||
=========================================================================
|
||||
|
||||
Make sure that administrative operations will not be running while
|
||||
the procedure is in progress. In particular, you must abstain from:
|
||||
|
||||
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>`
|
||||
(adding, replacing, removing, decommissioning nodes, etc.).
|
||||
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
|
||||
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
|
||||
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.
|
||||
* Any change of authorization via :doc:`CQL API </operating-scylla/security/authorization>`.
|
||||
* Schema changes.
|
||||
|
||||
Running the procedure
|
||||
=====================
|
||||
|
||||
.. warning::
|
||||
|
||||
Before proceeding, make sure that all the :ref:`prerequisites <enable-raft-topology-2025.1-prerequisites>` are met
|
||||
and no :ref:`forbidden administrative operations <enable-raft-topology-2025.1-forbidden-operations>` will run
|
||||
during the procedure. Failing to do so may put the cluster in an inconsistent state.
|
||||
|
||||
#. Issue a POST HTTP request to the ``/storage_service/raft_topology/upgrade``
|
||||
endpoint to any of the nodes in the cluster.
|
||||
For example, you can do it with ``curl``:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X POST "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
#. Wait until all nodes report that the procedure is complete. You can check
|
||||
whether a node finished the procedure in one of two ways:
|
||||
|
||||
* By sending a HTTP ``GET`` request on the ``/storage_service/raft_topology/upgrade``
|
||||
endpoint. For example, you can do it with ``curl``:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X GET "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
It will return a JSON string that will be equal to ``done`` after the procedure is complete on that node.
|
||||
|
||||
* By querying the ``upgrade_state`` column in the ``system.topology`` table.
|
||||
You can use ``cqlsh`` to get the value of the column:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
cqlsh -e "select upgrade_state from system.topology"
|
||||
|
||||
The ``upgrade_state`` column should be set to ``done`` after the procedure
|
||||
is complete on that node:
|
||||
|
||||
After the procedure is complete on all nodes, wait at least one minute before
|
||||
issuing any topology changes in order to avoid data loss from writes that were
|
||||
started before the procedure.
|
||||
|
||||
What if the procedure gets stuck?
|
||||
===================================
|
||||
|
||||
If the procedure gets stuck at some point, first check the status of your cluster:
|
||||
|
||||
- If there are some nodes that are not alive, try to restart them.
|
||||
- If all nodes are alive, ensure that the network is healthy and every node can reach all other nodes.
|
||||
- If all nodes are alive and the network is healthy, perform
|
||||
a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart/>` of the cluster.
|
||||
|
||||
If none of the above solves the issue, perform :ref:`the Raft recovery procedure <recovery-procedure>`.
|
||||
During recovery, the cluster will switch back to the gossip-based topology management mechanism.
|
||||
|
||||
After exiting recovery, you should retry enabling consistent topology updates using
|
||||
the procedure described in this document.
|
||||
@@ -1,17 +0,0 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB Enterprise 2024.x to ScyllaDB 2025.1
|
||||
==========================================================
|
||||
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB <upgrade-guide-from-2024.x-to-2025.1>
|
||||
Enable Consistent Topology Updates <enable-consistent-topology>
|
||||
Metrics <metric-update-2024.x-to-2025.1>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB Enterprise 2024.x.y to ScyllaDB 2025.1.y <upgrade-guide-from-2024.x-to-2025.1>`
|
||||
* :doc:`Enable Consistent Topology Updates <enable-consistent-topology>`
|
||||
* :doc:`Metrics Update Between 2024.x and 2025.1 <metric-update-2024.x-to-2025.1>`
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 2024.x
|
||||
.. |NEW_VERSION| replace:: 2025.1
|
||||
|
||||
=======================================================================================
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
=======================================================================================
|
||||
|
||||
ScyllaDB Enterprise |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
|
||||
New Metrics
|
||||
------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_alternator_batch_item_count
|
||||
- The total number of items processed across all batches.
|
||||
* - scylla_hints_for_views_manager_sent_bytes_total
|
||||
- The total size of the sent hints (in bytes).
|
||||
* - scylla_hints_manager_sent_bytes_total
|
||||
- The total size of the sent hints (in bytes).
|
||||
* - scylla_io_queue_activations
|
||||
- The number of times the class was woken up from idle.
|
||||
* - scylla_raft_apply_index
|
||||
- The applied index.
|
||||
* - scylla_raft_commit_index
|
||||
- The commit index.
|
||||
* - scylla_raft_log_last_index
|
||||
- The index of the last log entry.
|
||||
* - scylla_raft_log_last_term
|
||||
- The term of the last log entry.
|
||||
* - scylla_raft_snapshot_last_index
|
||||
- The index of the snapshot.
|
||||
* - scylla_raft_snapshot_last_term
|
||||
- The term of the snapshot.
|
||||
* - scylla_raft_state
|
||||
- The current state: 0 - follower, 1 - candidate, 2 - leader
|
||||
* - scylla_rpc_client_delay_samples
|
||||
- The total number of delay samples.
|
||||
* - scylla_rpc_client_delay_total
|
||||
- The total delay in seconds.
|
||||
* - scylla_storage_proxy_replica_received_hints_bytes_total
|
||||
- The total size of hints and MV hints received by this node.
|
||||
* - scylla_storage_proxy_replica_received_hints_total
|
||||
- The number of hints and MV hints received by this node.
|
||||
|
||||
Renamed Metrics
|
||||
------------------
|
||||
|
||||
The following metrics are renamed in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - 2024.2
|
||||
- 2025.1
|
||||
* - scylla_hints_for_views_manager_sent
|
||||
- scylla_hints_for_views_manager_sent_total
|
||||
* - scylla_hints_manager_sent
|
||||
- scylla_hints_manager_sent_total
|
||||
* - scylla_forward_service_requests_dispatched_to_other_nodes
|
||||
- scylla_mapreduce_service_requests_dispatched_to_other_nodes
|
||||
* - scylla_forward_service_requests_dispatched_to_own_shards
|
||||
- scylla_mapreduce_service_requests_dispatched_to_own_shards
|
||||
* - scylla_forward_service_requests_executed
|
||||
- scylla_mapreduce_service_requests_executed
|
||||
|
||||
@@ -1,395 +0,0 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2024.x
|
||||
.. |NEW_VERSION| replace:: 2025.1
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2024.x to 2025.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-2024.x-to-2025.1
|
||||
|
||||
=======================================================================================
|
||||
Upgrade from |SCYLLA_NAME| Enterprise |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
=======================================================================================
|
||||
|
||||
This document is a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
|
||||
to |NEW_VERSION|, and rollback to version |SRC_VERSION| if required.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL) CentOS, Debian,
|
||||
and Ubuntu. See :doc:`OS Support by Platform and Version </getting-started/os-support>`
|
||||
for information about supported versions.
|
||||
|
||||
This guide also applies when you're upgrading ScyllaDB official image on EC2,
|
||||
GCP, or Azure.
|
||||
|
||||
|
||||
Before You Upgrade ScyllaDB
|
||||
================================
|
||||
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
|
||||
upgrade the driver before you upgrade ScyllaDB. The latest two versions of each driver
|
||||
are supported.
|
||||
|
||||
**Upgrade ScyllaDB Monitoring Stack**
|
||||
|
||||
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
|
||||
version supports the ScyllaDB version to which you want to upgrade. See
|
||||
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
|
||||
|
||||
We recommend upgrading the Monitoring Stack to the latest version.
|
||||
|
||||
**Check Feature Updates**
|
||||
|
||||
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
|
||||
at the `ScyllaDB Community Forum <https://forum.scylladb.com/>`_.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes in the cluster, you will:
|
||||
|
||||
* Check that the cluster's schema is synchronized
|
||||
* Drain the node and backup the data
|
||||
* Backup the configuration file
|
||||
* Stop ScyllaDB
|
||||
* Download and install new ScyllaDB packages
|
||||
* Start ScyllaDB
|
||||
* Validate that the upgrade was successful
|
||||
|
||||
|
||||
.. caution::
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node before
|
||||
validating that the node you upgraded is up and running the new version.
|
||||
|
||||
**During** the rolling upgrade, it is highly recommended:
|
||||
|
||||
* Not to use the new |NEW_VERSION| features.
|
||||
* Not to run administration functions, like repairs, refresh, rebuild, or add or remove
|
||||
nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
|
||||
ScyllaDB Manager's scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
**After** the upgrade, you may need to enable consistent topology updates.
|
||||
See :ref:`After Upgrading Every Node <upgrade-2024.x-2025.1-after-upgrading-nodes>` for details.
|
||||
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Check the cluster schema
|
||||
-------------------------
|
||||
Make sure that all nodes have the schema synchronized before upgrade. The upgrade
|
||||
procedure will fail if there is a schema disagreement between nodes.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool describecluster
|
||||
|
||||
Backup the data
|
||||
-----------------------------------
|
||||
|
||||
Before any major procedure, like an upgrade, it is recommended to backup all the data
|
||||
to an external device.
|
||||
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
|
||||
to create backups.
|
||||
|
||||
Alternatively, you can use the ``nodetool snapshot`` command. For **each** node in the cluster, run
|
||||
the following command:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all the directories
|
||||
having that name under ``/var/lib/scylla`` to a backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the
|
||||
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
|
||||
|
||||
Backup the configuration file
|
||||
------------------------------
|
||||
|
||||
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
|
||||
in case you need to rollback the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``.
|
||||
You should use the same version as this version in case you want to |ROLLBACK|_
|
||||
the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.1.list
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.1.repo
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions. If you’re using your
|
||||
own image and have installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
|
||||
#. Install the new ScyllaDB version with the additional
|
||||
``scylla-machine-image`` package:
|
||||
|
||||
.. code::
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
sudo apt-get dist-upgrade scylla-machine-image
|
||||
|
||||
#. Run ``scylla_setup`` without running ``io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
If you need JMX server, see
|
||||
:doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
|
||||
and get new version.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
|
||||
the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
|
||||
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog``
|
||||
to validate there are no new errors in the log.
|
||||
#. Check again after two minutes to validate that no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
.. _upgrade-2024.x-2025.1-after-upgrading-nodes:
|
||||
|
||||
After Upgrading Every Node
|
||||
===============================
|
||||
|
||||
This step applies if:
|
||||
|
||||
* You're upgrading from ScyllaDB Enterprise **2024.1** to ScyllaDB 2025.1.
|
||||
* You previously upgraded from 2024.1 to 2024.2 without enabling consistent
|
||||
topology updates (see the `2024.2 upgrade guide <https://enterprise.docs.scylladb.com/branch-2024.2/upgrade/upgrade-enterprise/upgrade-guide-from-2024.1-to-2024.2/enable-consistent-topology.html>`_
|
||||
for reference).
|
||||
|
||||
After you have upgraded every node, you must enable the Raft-based consistent
|
||||
topology updates feature. See
|
||||
:doc:`Enable Consistent Topology Updates </upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/enable-consistent-topology>`
|
||||
for instructions.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
.. warning::
|
||||
|
||||
The rollback procedure can only be applied if some nodes have **not** been upgraded
|
||||
to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade procedure is
|
||||
started with |NEW_VERSION|, rollback becomes impossible. At that point, the only way
|
||||
to restore a cluster to |SRC_VERSION| is by restoring it from backup.
|
||||
|
||||
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION|.x to
|
||||
|SRC_VERSION|.y. Apply this procedure if an upgrade from |SRC_VERSION| to |NEW_VERSION|
|
||||
failed before completing on all nodes.
|
||||
|
||||
* Use this procedure only for nodes you upgraded to |NEW_VERSION|.
|
||||
* Execute the commands one node at a time, moving to the next node
|
||||
only after the rollback procedure is completed successfully.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require a full cluster shutdown.
|
||||
For each of the nodes you rollback to |SRC_VERSION|, you will:
|
||||
|
||||
* Drain the node and stop ScyllaDB
|
||||
* Retrieve the old ScyllaDB packages
|
||||
* Restore the configuration file
|
||||
* Reload systemd configuration
|
||||
* Restart ScyllaDB
|
||||
* Validate the rollback success
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node
|
||||
before validating that the rollback was successful and the node is up and
|
||||
running the old version.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
|
||||
Drain and gracefully stop the node
|
||||
----------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
|
||||
sudo chown root.root /etc/apt/sources.list.d/scylla.list
|
||||
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla-enterprise
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
|
||||
sudo chown root.root /etc/yum.repos.d/scylla.repo
|
||||
sudo chmod 644 /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code:: console
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum remove scylla\*
|
||||
sudo yum install scylla-enterprise
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
|
||||
tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to additionally restore the ``scylla-machine-image`` package.
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade
|
||||
(see the **Debian/Ubuntu** tab).
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla-enterprise
|
||||
sudo apt-get install scylla-enterpraise-machine-image
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
|
||||
|
||||
Reload systemd configuration
|
||||
----------------------------
|
||||
|
||||
You must reload the unit file if the systemd unit file is changed.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback
|
||||
is successful, move to the next node in the cluster.
|
||||
@@ -0,0 +1,13 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB 2025.1 to ScyllaDB 2025.2
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-2025.1-to-2025.2>
|
||||
Metrics Update <metric-update-2025.1-to-2025.2>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB 2025.1.x to ScyllaDB 2025.2.y <upgrade-guide-from-2025.1-to-2025.2>`
|
||||
* :doc:`Metrics Update Between 2025.1 and 2025.2 <metric-update-2025.1-to-2025.2>`
|
||||
@@ -0,0 +1,61 @@
|
||||
.. |SRC_VERSION| replace:: 2025.1
|
||||
.. |NEW_VERSION| replace:: 2025.2
|
||||
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
New Metrics
|
||||
------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
|
||||
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_alternator_batch_item_count_histogram
|
||||
- A histogram of the number of items in a batch request.
|
||||
* - scylla_database_total_view_updates_failed_pairing
|
||||
- Total number of view updates for which we failed base/view pairing.
|
||||
* - scylla_group_name_cross_rack_collocations
|
||||
- The number of co-locating migrations that move replica across racks.
|
||||
* - scylla_network_bytes_received
|
||||
- The number of bytes received from network sockets.
|
||||
* - scylla_network_bytes_sent
|
||||
- The number of bytes written to network sockets.
|
||||
* - scylla_reactor_awake_time_ms_total
|
||||
- Total reactor awake time (wall_clock).
|
||||
* - scylla_reactor_cpu_used_time_ms
|
||||
- Total reactor thread CPU time (from CLOCK_THREAD_CPUTIME).
|
||||
* - scylla_reactor_sleep_time_ms_total
|
||||
- Total reactor sleep time (wall clock).
|
||||
* - scylla_sstable_compression_dicts_total_live_memory_bytes
|
||||
- Total amount of memory consumed by SSTable compression dictionaries in RAM.
|
||||
* - scylla_transport_connections_blocked
|
||||
- Holds an incrementing counter with the CQL connections that were blocked
|
||||
before being processed due to threshold configured via
|
||||
uninitialized_connections_semaphore_cpu_concurrency.Blocks are normal
|
||||
when we have multiple connections initialized at once. If connectionsare
|
||||
timing out and this value is high it indicates either connections storm
|
||||
or unusually slow processing.
|
||||
* - scylla_transport_connections_shed
|
||||
- Holds an incrementing counter with the CQL connections that were shed
|
||||
due to concurrency semaphore timeout (threshold configured via
|
||||
uninitialized_connections_semaphore_cpu_concurrency). This typically can
|
||||
happen during connection.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 6.2
|
||||
.. |NEW_VERSION| replace:: 2025.1
|
||||
.. |SRC_VERSION| replace:: 2025.1
|
||||
.. |NEW_VERSION| replace:: 2025.2
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 6.2 to 2025.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-6.2-to-2025.1
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.1 to 2025.2
|
||||
.. _SCYLLA_METRICS: ../metric-update-2025.1-to-2025.2
|
||||
|
||||
=======================================================================================
|
||||
Upgrade from |SCYLLA_NAME| Open Source |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
=======================================================================================
|
||||
|
||||
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
|
||||
@@ -20,7 +20,7 @@ This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS,
|
||||
and Ubuntu. See :doc:`OS Support by Platform and Version </getting-started/os-support>`
|
||||
for information about supported versions.
|
||||
|
||||
It also applies when using ScyllaDB official image on EC2, GCP, or Azure.
|
||||
It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
@@ -28,7 +28,7 @@ Before You Upgrade ScyllaDB
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
|
||||
upgrade the driver before you upgrade ScyllaDB. The latest two versions of each driver
|
||||
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
|
||||
are supported.
|
||||
|
||||
**Upgrade ScyllaDB Monitoring Stack**
|
||||
@@ -150,7 +150,7 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.1.list
|
||||
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.2.list
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
@@ -168,7 +168,7 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.1.repo
|
||||
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.2.repo
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB Open Source 6.2 to ScyllaDB 2025.1
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-6.2-to-2025.1>
|
||||
Metrics Update <metric-update-6.2-to-2025.1>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB Open Source 6.2 .x to ScyllaDB 2025.1.y <upgrade-guide-from-6.2-to-2025.1>`
|
||||
* :doc:`Metrics Update Between 6.2 and 2025.1 <metric-update-6.2-to-2025.1>`
|
||||
@@ -1,54 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 6.2
|
||||
.. |NEW_VERSION| replace:: 2025.1
|
||||
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
New Metrics
|
||||
------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
|
||||
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_alternator_rcu_total
|
||||
- The total number of consumed read units, counted as half units.
|
||||
* - scylla_alternator_wcu_total
|
||||
- The total number of consumed write units, counted as half units.
|
||||
* - scylla_rpc_compression_bytes_received
|
||||
- The bytes read from RPC connections after decompression.
|
||||
* - scylla_rpc_compression_bytes_sent
|
||||
- The bytes written to RPC connections before compression.
|
||||
* - scylla_rpc_compression_compressed_bytes_received
|
||||
- The bytes read from RPC connections before decompression.
|
||||
* - scylla_rpc_compression_compressed_bytes_sent
|
||||
- The bytes written to RPC connections after compression.
|
||||
* - scylla_rpc_compression_compression_cpu_nanos
|
||||
- The nanoseconds spent on compression.
|
||||
* - scylla_rpc_compression_decompression_cpu_nanos
|
||||
- The nanoseconds spent on decompression.
|
||||
* - scylla_rpc_compression_messages_received
|
||||
- The RPC messages received.
|
||||
* - scylla_rpc_compression_messages_sent
|
||||
- The RPC messages sent.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -8,10 +8,7 @@ such as Apache Cassandra, or from other ScyllaDB clusters:
|
||||
* From SSTable to SSTable
|
||||
- Using nodetool refresh, :ref:`Load and Stream <nodetool-refresh-load-and-stream>` option.
|
||||
- On a large scale, it requires tooling to upload / transfer files from location to location.
|
||||
* From SSTable to CQL.
|
||||
- :doc:`sstableloader</operating-scylla/admin-tools/sstableloader/>`
|
||||
* From CQL to CQL
|
||||
- `Spark Migrator <https://github.com/scylladb/scylla-migrator>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
|
||||
|
||||
- `Spark Migrator <https://migrator.docs.scylladb.com/>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
|
||||
* From DynamoDB to ScyllaDB Alternator
|
||||
- `Spark Migrator <https://github.com/scylladb/scylla-migrator>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
|
||||
- `Spark Migrator <https://migrator.docs.scylladb.com/>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
|
||||
|
||||
@@ -867,8 +867,8 @@ future<std::vector<kmip_host::id_type>> kmip_host::impl::find_matching_keys(cons
|
||||
|
||||
auto [kdl_attrs, crypt_alg] = make_attributes(info, false);
|
||||
|
||||
static const char kmip_tag_cryptographic_length[] = KMIP_TAG_CRYPTOGRAPHIC_LENGTH_STR;
|
||||
static const char kmip_tag_cryptographic_usage_mask[] = KMIP_TAG_CRYPTOGRAPHIC_USAGE_MASK_STR;
|
||||
static const char kmip_tag_cryptographic_length[] = KMIP_TAGSTR_CRYPTOGRAPHIC_LENGTH;
|
||||
static const char kmip_tag_cryptographic_usage_mask[] = KMIP_TAGSTR_CRYPTOGRAPHIC_USAGE_MASK;
|
||||
|
||||
// #1079. Query mask apparently ignores things like cryptographic
|
||||
// attribute set of options, instead we must specify the query
|
||||
|
||||
15
init.cc
15
init.cc
@@ -13,6 +13,7 @@
|
||||
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
|
||||
logging::logger startlog("init");
|
||||
|
||||
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
|
||||
std::any service_set::find(const std::type_info& type) const {
|
||||
return _impl->find(type);
|
||||
}
|
||||
|
||||
// Placed here to avoid dependency on db::config in compress.cc,
|
||||
// where the rest of default_sstable_compressor_factory_config is.
|
||||
auto default_sstable_compressor_factory_config::from_db_config(
|
||||
const db::config& cfg,
|
||||
std::span<const unsigned> numa_config) -> self
|
||||
{
|
||||
return self {
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
|
||||
.numa_config{numa_config.begin(), numa_config.end()},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -112,19 +112,19 @@ void production_snitch_base::parse_property_file(std::string contents) {
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_double_declaration(const sstring& key) const {
|
||||
logger().error("double \"{}\" declaration in {}", key, _prop_file_name);
|
||||
logger().warn("double \"{}\" declaration in {}", key, _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_bad_format(const sstring& line) const {
|
||||
logger().error("Bad format in properties file {}: {}", _prop_file_name, line);
|
||||
logger().warn("Bad format in properties file {}: {}", _prop_file_name, line);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
[[noreturn]]
|
||||
void production_snitch_base::throw_incomplete_file() const {
|
||||
logger().error("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
|
||||
logger().warn("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
|
||||
@@ -221,6 +221,18 @@ std::optional<tablet_replica> get_leaving_replica(const tablet_info& tinfo, cons
|
||||
return *leaving.begin();
|
||||
}
|
||||
|
||||
bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tablet_transition_info& trinfo) {
|
||||
if (replica == locator::get_leaving_replica(tinfo, trinfo)) {
|
||||
// we do tablet cleanup on the leaving replica in the `cleanup` stage, after which there is only the `end_migration` stage.
|
||||
return trinfo.stage == locator::tablet_transition_stage::end_migration;
|
||||
}
|
||||
if (replica == trinfo.pending_replica) {
|
||||
// we do tablet cleanup on the pending replica in the `cleanup_target` stage, after which there is only the `revert_migration` stage.
|
||||
return trinfo.stage == locator::tablet_transition_stage::revert_migration;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migration_info& mig) {
|
||||
return replace_replica(tinfo.replicas, mig.src, mig.dst);
|
||||
}
|
||||
|
||||
@@ -291,6 +291,10 @@ struct tablet_transition_info {
|
||||
// Returns the leaving replica for a given transition.
|
||||
std::optional<tablet_replica> get_leaving_replica(const tablet_info&, const tablet_transition_info&);
|
||||
|
||||
// True if the tablet is transitioning and it's in a stage that follows the stage
|
||||
// where we clean up the tablet on the given replica.
|
||||
bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tablet_transition_info& trinfo);
|
||||
|
||||
/// Represents intention to move a single tablet replica from src to dst.
|
||||
struct tablet_migration_info {
|
||||
locator::tablet_transition_kind kind;
|
||||
|
||||
50
main.cc
50
main.cc
@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
|
||||
langman.invoke_on_all(&lang::manager::start).get();
|
||||
|
||||
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
|
||||
.register_metrics = true,
|
||||
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
|
||||
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
|
||||
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
|
||||
std::cref(*cfg), std::cref(numa_groups))).get();
|
||||
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
|
||||
sstable_compressor_factory.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting database");
|
||||
|
||||
debug::the_database = &db;
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
|
||||
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
|
||||
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
auto sstables_prefix = std::string_view("sstables/");
|
||||
if (name.starts_with(sstables_prefix)) {
|
||||
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
|
||||
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
|
||||
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
|
||||
} else if (name == dictionary_service::rpc_compression_dict_name) {
|
||||
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
|
||||
}
|
||||
@@ -1755,6 +1757,24 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); });
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
checkpoint(stop_signal, "starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting streaming manager");
|
||||
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
|
||||
return sm.start(stop_signal.as_local_abort_source());
|
||||
}).get();
|
||||
|
||||
api::set_server_stream_manager(ctx, stream_manager).get();
|
||||
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing storage service");
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
@@ -1921,24 +1941,6 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
|
||||
});
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
checkpoint(stop_signal, "starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting streaming manager");
|
||||
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
|
||||
return sm.start(stop_signal.as_local_abort_source());
|
||||
}).get();
|
||||
|
||||
api::set_server_stream_manager(ctx, stream_manager).get();
|
||||
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting hinted handoff manager");
|
||||
if (!hinted_handoff_enabled.is_disabled_for_all()) {
|
||||
hints_dir_initializer.ensure_rebalanced().get();
|
||||
|
||||
@@ -149,7 +149,8 @@ class compact_mutation_state {
|
||||
gc_clock::time_point _query_time;
|
||||
max_purgeable_fn _get_max_purgeable;
|
||||
can_gc_fn _can_gc;
|
||||
api::timestamp_type _max_purgeable = api::missing_timestamp;
|
||||
api::timestamp_type _max_purgeable_regular = api::missing_timestamp;
|
||||
api::timestamp_type _max_purgeable_shadowable = api::missing_timestamp;
|
||||
std::optional<gc_clock::time_point> _gc_before;
|
||||
const query::partition_slice& _slice;
|
||||
uint64_t _row_limit{};
|
||||
@@ -288,11 +289,12 @@ private:
|
||||
if (!t) {
|
||||
return false;
|
||||
}
|
||||
if (_max_purgeable == api::missing_timestamp) {
|
||||
_max_purgeable = _get_max_purgeable(*_dk, is_shadowable);
|
||||
auto& max_purgeable = is_shadowable ? _max_purgeable_shadowable : _max_purgeable_regular;
|
||||
if (max_purgeable == api::missing_timestamp) {
|
||||
max_purgeable = _get_max_purgeable(*_dk, is_shadowable);
|
||||
}
|
||||
auto ret = t.timestamp < _max_purgeable;
|
||||
mclog.debug("can_gc: t={} is_shadowable={} max_purgeable={}: ret={}", t, is_shadowable, _max_purgeable, ret);
|
||||
auto ret = t.timestamp < max_purgeable;
|
||||
mclog.debug("can_gc: t={} is_shadowable={} max_purgeable={}: ret={}", t, is_shadowable, max_purgeable, ret);
|
||||
return ret;
|
||||
};
|
||||
|
||||
@@ -347,7 +349,8 @@ public:
|
||||
_static_row_live = false;
|
||||
_partition_tombstone = {};
|
||||
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
|
||||
_max_purgeable = api::missing_timestamp;
|
||||
_max_purgeable_regular = api::missing_timestamp;
|
||||
_max_purgeable_shadowable = api::missing_timestamp;
|
||||
_gc_before = std::nullopt;
|
||||
_last_static_row.reset();
|
||||
_last_pos = position_in_partition::for_partition_start();
|
||||
|
||||
30
pgo/pgo.py
30
pgo/pgo.py
@@ -20,7 +20,6 @@ import logging
|
||||
import os
|
||||
import pathlib
|
||||
import random
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import signal
|
||||
@@ -102,32 +101,6 @@ def configure_cpusets():
|
||||
config_logger.info(f"Choosing cpusets for nodes: {NODE_CPUSETS.get()}")
|
||||
config_logger.info(f"Choosing cpuset for load generators: {CS_CPUSET.get()}")
|
||||
|
||||
JAVA_HOME: ContextVar[Optional[str]] = ContextVar('JAVA_HOME')
|
||||
|
||||
async def configure_java() -> None:
|
||||
"""
|
||||
cassandra-stress can only deal with Java 11
|
||||
"""
|
||||
version_output = (await bash("java -version", stderr=asyncio.subprocess.PIPE))[2]
|
||||
assert isinstance(version_output, bytes)
|
||||
version_first_line = version_output.decode().split(sep='\n')[0]
|
||||
config_logger.info(f"First line of java -version: {version_first_line}")
|
||||
version = 11
|
||||
if re.search(rf'version.*{version}\.[0-9]+\.[0-9]+', version_first_line):
|
||||
config_logger.info(f"Default Java version recognized as Java {version}. Proceeding with the default.")
|
||||
JAVA_HOME.set(None)
|
||||
return
|
||||
|
||||
config_logger.info(f"Default Java version is not recognized as Java {version}.")
|
||||
if os.path.exists(java_path := f'/usr/lib/jvm/java-{version}'):
|
||||
config_logger.warning(f"{java_path} found. Choosing it as JAVA_HOME.")
|
||||
JAVA_HOME.set(java_path)
|
||||
return
|
||||
|
||||
error = f"Failed to find a suitable Java version. Java {version} is required."
|
||||
config_logger.error(error)
|
||||
raise RuntimeError(error)
|
||||
|
||||
################################################################################
|
||||
# Child process utilities
|
||||
|
||||
@@ -534,7 +507,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
|
||||
|
||||
def cs_command(cmd: list[str], n: int, node: str, cl: str, pop: Optional[str] = None, warmup: bool = False, rate: str = "threads=200", schema: Optional[str] = None) -> list[str]:
|
||||
"""Strings together a cassandra-stress command from given options."""
|
||||
return (["env", f"JAVA_HOME={JAVA_HOME.get()}"] if JAVA_HOME.get() else []) + [
|
||||
return [
|
||||
"cassandra-stress",
|
||||
*cmd,
|
||||
f"n={n}",
|
||||
@@ -817,7 +790,6 @@ async def train_full(executable: PathLike, output_profile_file: PathLike, datase
|
||||
training_logger.info(f"Starting training of executable {executable}. Exhaustive logs can be found in {LOGDIR.get()}/")
|
||||
|
||||
configure_cpusets()
|
||||
await configure_java()
|
||||
|
||||
assert executable_exists(executable)
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:5c807f5ffe7f1bf9acb6a461887f31157e3f84886a7749e61396d9153c0863b2
|
||||
size 6011808
|
||||
oid sha256:6bdf1d3c9ba4866abe2b9f28542b0912086c43b2737135e911dfdeb70cba3d8c
|
||||
size 5951620
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:19ad6fbf38bcc7db54150988c4bb394caa487f5e2f2c132bb9f27ad7cae43666
|
||||
size 6015468
|
||||
oid sha256:9121eac637fb95a096caec3b2f03d46decaceef3eb30ffe02adcff52aa21cb5c
|
||||
size 5964108
|
||||
|
||||
@@ -397,7 +397,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _view_update_read_concurrency_semaphores_group(
|
||||
max_memory_concurrent_view_update_reads(),
|
||||
utils::updateable_value<int>(max_count_concurrent_view_update_reads),
|
||||
max_inactive_view_update_queue_length(),
|
||||
std::numeric_limits<size_t>::max(),
|
||||
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,
|
||||
|
||||
@@ -1488,7 +1488,6 @@ private:
|
||||
size_t max_memory_concurrent_view_update_reads() { return _dbcfg.available_memory * 0.01; }
|
||||
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 1000; }
|
||||
size_t max_inactive_view_update_queue_length() { return _dbcfg.available_memory * 0.01 / 1000; }
|
||||
// They're rather heavyweight, so limit more
|
||||
static constexpr size_t max_count_streaming_concurrent_reads{10};
|
||||
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
||||
|
||||
@@ -167,13 +167,13 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
|
||||
}
|
||||
|
||||
future<>
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf) {
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) {
|
||||
const auto& rs = db.local().find_keyspace(ks).get_replication_strategy();
|
||||
if (rs.is_per_table()) {
|
||||
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
|
||||
}
|
||||
|
||||
return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf)] {
|
||||
return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] {
|
||||
auto global_table = get_table_on_all_shards(db, ks, cf).get();
|
||||
|
||||
sharded<sstables::sstable_directory> directory;
|
||||
@@ -217,10 +217,12 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, shard
|
||||
// - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction
|
||||
// so that cleanup can be considered per compaction group
|
||||
const auto& erm = db.local().find_keyspace(ks).get_vnode_effective_replication_map();
|
||||
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
|
||||
auto owned_ranges_ptr = skip_cleanup ? lw_shared_ptr<dht::token_range_vector>(nullptr) : compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
|
||||
reshard(directory, db, ks, cf, make_sstable, owned_ranges_ptr).get();
|
||||
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, make_sstable,
|
||||
[] (const sstables::shared_sstable&) { return true; }).get();
|
||||
if (!skip_reshape) {
|
||||
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, make_sstable,
|
||||
[] (const sstables::shared_sstable&) { return true; }).get();
|
||||
}
|
||||
|
||||
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();
|
||||
|
||||
@@ -91,7 +91,7 @@ public:
|
||||
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
|
||||
};
|
||||
|
||||
future<sstables::generation_type> highest_generation_seen(sharded<sstables::sstable_directory>& directory);
|
||||
|
||||
@@ -830,12 +830,19 @@ public:
|
||||
auto local_replica = locator::tablet_replica{_my_host_id, this_shard_id()};
|
||||
|
||||
for (auto tid : tmap.tablet_ids()) {
|
||||
auto range = tmap.get_token_range(tid);
|
||||
|
||||
if (tmap.has_replica(tid, local_replica)) {
|
||||
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
|
||||
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
|
||||
if (!tmap.has_replica(tid, local_replica)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if the tablet was cleaned up already on this replica, don't allocate a storage group for it.
|
||||
auto trinfo = tmap.get_tablet_transition_info(tid);
|
||||
if (trinfo && locator::is_post_cleanup(local_replica, tmap.get_tablet_info(tid), *trinfo)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto range = tmap.get_token_range(tid);
|
||||
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
|
||||
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
|
||||
}
|
||||
_storage_groups = std::move(ret);
|
||||
}
|
||||
@@ -1892,6 +1899,8 @@ table::sstable_list_builder::build_new_list(const sstables::sstable_set& current
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
||||
|
||||
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
|
||||
|
||||
// add sstables from the current list into the new list except the ones that are in the old list
|
||||
std::vector<sstables::shared_sstable> removed_sstables;
|
||||
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
|
||||
@@ -2178,14 +2187,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
|
||||
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
|
||||
|
||||
struct compaction_group_sstable_set_updater {
|
||||
struct compaction_group_strategy_updater {
|
||||
table& t;
|
||||
compaction_group& cg;
|
||||
compaction_backlog_tracker new_bt;
|
||||
compaction::compaction_strategy_state new_cs_state;
|
||||
lw_shared_ptr<sstables::sstable_set> new_sstables;
|
||||
|
||||
compaction_group_sstable_set_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
|
||||
compaction_group_strategy_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
|
||||
: t(t)
|
||||
, cg(cg)
|
||||
, new_bt(new_cs.make_backlog_tracker())
|
||||
@@ -2196,26 +2204,26 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
|
||||
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
||||
|
||||
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(cg.as_table_state()));
|
||||
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
||||
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
|
||||
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables->insert(s);
|
||||
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables_for_backlog_tracker.push_back(s);
|
||||
});
|
||||
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
||||
}
|
||||
|
||||
void execute() noexcept {
|
||||
// Update strategy state and backlog tracker according to new strategy. SSTable set update
|
||||
// is delayed until new compaction, which is triggered on strategy change. SSTable set
|
||||
// cannot be updated here since it must happen under the set update lock.
|
||||
t._compaction_manager.register_backlog_tracker(cg.as_table_state(), std::move(new_bt));
|
||||
cg.set_main_sstables(std::move(new_sstables));
|
||||
cg.set_compaction_strategy_state(std::move(new_cs_state));
|
||||
}
|
||||
};
|
||||
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
|
||||
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
|
||||
|
||||
for_each_compaction_group([&] (compaction_group& cg) {
|
||||
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
|
||||
compaction_group_strategy_updater updater(*this, cg, new_cs);
|
||||
updater.prepare(new_cs);
|
||||
cg_sstable_set_updaters.push_back(std::move(updater));
|
||||
});
|
||||
@@ -2224,7 +2232,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
for (auto& updater : cg_sstable_set_updaters) {
|
||||
updater.execute();
|
||||
}
|
||||
refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
size_t table::sstables_count() const {
|
||||
@@ -2707,7 +2714,7 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
|
||||
}
|
||||
|
||||
// Allocate storage group if tablet is migrating in.
|
||||
// Allocate storage group if tablet is migrating in, or deallocate if it's migrating out.
|
||||
auto this_replica = locator::tablet_replica{
|
||||
.host = erm.get_token_metadata().get_my_id(),
|
||||
.shard = this_shard_id()
|
||||
@@ -2723,6 +2730,8 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
auto range = new_tablet_map->get_token_range(tid);
|
||||
_storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range));
|
||||
tablet_migrating_in = true;
|
||||
} else if (_storage_groups.contains(tid.value()) && locator::is_post_cleanup(this_replica, new_tablet_map->get_tablet_info(tid), transition_info)) {
|
||||
remove_storage_group(tid.value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2742,7 +2751,7 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
|
||||
// problem as fresh node won't have any data in new range and migration cleanup invalidates the
|
||||
// range being moved away.
|
||||
if (tablet_migrating_in) {
|
||||
if (tablet_migrating_in || old_tablet_count != new_tablet_count) {
|
||||
refresh_mutation_source();
|
||||
}
|
||||
}
|
||||
@@ -4123,7 +4132,6 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
|
||||
co_await stop_compaction_groups(sg);
|
||||
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
|
||||
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
||||
_sg_manager->remove_storage_group(tid.value());
|
||||
}
|
||||
|
||||
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
||||
|
||||
@@ -954,12 +954,15 @@ public:
|
||||
// but caller can skip to a position outside the current set
|
||||
const dht::ring_position_view& pos = s.pos;
|
||||
auto token = pos.token();
|
||||
if (!_cur_set || pos.token() >= _lowest_next_token) {
|
||||
auto pr_end = s.range ? dht::ring_position_view::for_range_end(*s.range) : dht::ring_position_view::max();
|
||||
// End of stream is reached when pos is past the end of the read range (i.e. exclude tablets
|
||||
// that doesn't intersect with the range).
|
||||
// We don't want to advance next position when EOS has been reached, such that a fast forward
|
||||
// to the next tablet range will work.
|
||||
bool eos_reached = dht::ring_position_tri_compare(*_tset.schema(), pos, pr_end) > 0;
|
||||
if ((!_cur_set || pos.token() >= _lowest_next_token) && !eos_reached) {
|
||||
auto idx = _tset.group_of(token);
|
||||
auto pr_end = s.range ? dht::ring_position_view::for_range_end(*s.range) : dht::ring_position_view::max();
|
||||
// End of stream is reached when pos is past the end of the read range (i.e. exclude tablets
|
||||
// that doesn't intersect with the range).
|
||||
if (dht::ring_position_tri_compare(*_tset.schema(), pos, pr_end) <= 0 && _tset._sstable_set_ids.contains(idx)) {
|
||||
if (_tset._sstable_set_ids.contains(idx)) {
|
||||
_cur_set = _tset.find_sstable_set(idx);
|
||||
}
|
||||
// Set the next token to point to the next engaged storage group.
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: d7ff58f2b2...9f0034a099
@@ -769,6 +769,12 @@ utils::input_stream as_input_stream(const bytes_ostream& b) {
|
||||
return utils::input_stream::fragmented(b.fragments().begin(), b.size());
|
||||
}
|
||||
|
||||
template<FragmentedView View>
|
||||
inline
|
||||
auto as_input_stream(View v) {
|
||||
return fragmented_memory_input_stream(fragment_range(v).begin(), v.size_bytes());
|
||||
}
|
||||
|
||||
template<typename Output, typename ...T>
|
||||
void serialize(Output& out, const boost::variant<T...>& v) {}
|
||||
|
||||
|
||||
@@ -579,7 +579,7 @@ future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_req
|
||||
co_await coroutine::parallel_for_each(vnodes_per_addr,
|
||||
[&] (std::pair<const locator::host_id, dht::partition_range_vector>& vnodes_with_addr) -> future<> {
|
||||
locator::host_id addr = vnodes_with_addr.first;
|
||||
query::mapreduce_result& result_ = result;
|
||||
query::mapreduce_result& shared_accumulator = result;
|
||||
tracing::trace_state_ptr& tr_state_ = tr_state;
|
||||
retrying_dispatcher& dispatcher_ = dispatcher;
|
||||
|
||||
@@ -600,9 +600,21 @@ future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_req
|
||||
flogger.debug("received mapreduce_result={} from {}", partial_printer, addr);
|
||||
|
||||
auto aggrs = mapreduce_aggregates(req);
|
||||
co_return co_await aggrs.with_thread_if_needed([&result_, &aggrs, partial_result = std::move(partial_result)] () mutable {
|
||||
aggrs.merge(result_, std::move(partial_result));
|
||||
});
|
||||
|
||||
// Anytime this coroutine yields, other coroutines may want to write to `shared_accumulator`.
|
||||
// As merging can yield internally, merging directly to `shared_accumulator` would result in race condition.
|
||||
// We can safely write to `shared_accumulator` only when it is empty.
|
||||
while (!shared_accumulator.query_results.empty()) {
|
||||
// Move `shared_accumulator` content to local variable. Leave `shared_accumulator` empty - now other coroutines can safely write to it.
|
||||
query::mapreduce_result previous_results = std::exchange(shared_accumulator, {});
|
||||
// Merge two local variables - it can yield.
|
||||
co_await aggrs.with_thread_if_needed([&previous_results, &aggrs, &partial_result] () mutable {
|
||||
aggrs.merge(partial_result, std::move(previous_results));
|
||||
});
|
||||
// `partial_result` now contains results merged by this coroutine, but `shared_accumulator` might have been updated by others.
|
||||
}
|
||||
// `shared_accumulator` is empty, we can atomically write results merged by this coroutine.
|
||||
shared_accumulator = std::move(partial_result);
|
||||
});
|
||||
|
||||
mapreduce_aggregates aggrs(req);
|
||||
|
||||
@@ -89,7 +89,6 @@ future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
|
||||
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}.value()));
|
||||
}
|
||||
|
||||
|
||||
future<raft::log_entries> raft_sys_table_storage::load_log() {
|
||||
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
|
||||
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id}, cql3::query_processor::cache_internal::yes);
|
||||
@@ -103,7 +102,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
|
||||
}
|
||||
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
|
||||
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
|
||||
auto raw_data = row.get_blob("data");
|
||||
auto raw_data = row.get_view("data");
|
||||
auto in = ser::as_input_stream(raw_data);
|
||||
using data_variant_type = decltype(raft::log_entry::data);
|
||||
data_variant_type data = ser::deserialize(in, std::type_identity<data_variant_type>());
|
||||
|
||||
@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
switch (rs.state) {
|
||||
case node_state::normal: {
|
||||
if (is_me(ip)) {
|
||||
if (is_me(id)) {
|
||||
co_return;
|
||||
}
|
||||
// In replace-with-same-ip scenario the replaced node IP will be the same
|
||||
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
|
||||
auto old_ip = it->second;
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
|
||||
|
||||
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
|
||||
if (prev_ip == endpoint) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (_address_map.find(id) != endpoint) {
|
||||
// Address map refused to update IP for the host_id,
|
||||
// this means prev_ip has higher generation than endpoint.
|
||||
// We can immediately remove endpoint from gossiper
|
||||
// since it represents an old IP (before an IP change)
|
||||
// for the given host_id. This is not strictly
|
||||
// necessary, but it reduces the noise circulated
|
||||
// in gossiper messages and allows for clearer
|
||||
// expectations of the gossiper state in tests.
|
||||
|
||||
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
|
||||
// Do not update address.
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
|
||||
if (_ss.raft_topology_change_enabled()) {
|
||||
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
|
||||
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
|
||||
|
||||
co_await _gossiper.start_gossiping(new_generation, app_states);
|
||||
|
||||
utils::get_local_injector().inject("stop_after_starting_gossiping",
|
||||
@@ -6010,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
|
||||
});
|
||||
}
|
||||
|
||||
inet_address storage_service::host2ip(locator::host_id host) const {
|
||||
auto ip = _address_map.find(host);
|
||||
if (!ip) {
|
||||
throw std::runtime_error(::format("Cannot map host {} to ip", host));
|
||||
}
|
||||
return *ip;
|
||||
}
|
||||
|
||||
// Performs a replica-side operation for a given tablet.
|
||||
// What operation is performed is determined by "op" based on the
|
||||
// current state of tablet metadata. The coordinator is supposed to prepare tablet
|
||||
@@ -7281,11 +7264,7 @@ void storage_service::init_messaging_service() {
|
||||
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
|
||||
streaming::stream_files_response resp;
|
||||
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
|
||||
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future<gms::inet_address> {
|
||||
return ss.container().invoke_on(0, [host] (storage_service& ss) {
|
||||
return ss.host2ip(host);
|
||||
});
|
||||
});
|
||||
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
|
||||
co_return res.stream_bytes;
|
||||
},
|
||||
size_t(0),
|
||||
|
||||
@@ -210,7 +210,6 @@ private:
|
||||
// when both of which sit on the same node. So all the movement is local.
|
||||
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
|
||||
future<> cleanup_tablet(locator::global_tablet_id);
|
||||
inet_address host2ip(locator::host_id) const;
|
||||
// Handler for table load stats RPC.
|
||||
future<locator::load_stats> load_stats_for_tablet_based_tables();
|
||||
future<> process_tablet_split_candidate(table_id) noexcept;
|
||||
|
||||
@@ -1471,8 +1471,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
case locator::tablet_transition_stage::use_new:
|
||||
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
|
||||
break;
|
||||
case locator::tablet_transition_stage::cleanup:
|
||||
if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
|
||||
case locator::tablet_transition_stage::cleanup: {
|
||||
bool wait = utils::get_local_injector().enter("cleanup_tablet_wait");
|
||||
if (!wait && advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
|
||||
auto maybe_dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
|
||||
if (!maybe_dst) {
|
||||
rtlogger.info("Tablet cleanup of {} skipped because no replicas leaving", gid);
|
||||
@@ -1489,6 +1490,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
})) {
|
||||
transition_to(locator::tablet_transition_stage::end_migration);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::cleanup_target:
|
||||
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
|
||||
@@ -1534,7 +1536,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::repair: {
|
||||
if (action_failed(tablet_state.repair)) {
|
||||
bool fail_repair = utils::get_local_injector().enter("handle_tablet_migration_repair_fail");
|
||||
if (fail_repair || action_failed(tablet_state.repair)) {
|
||||
if (do_barrier()) {
|
||||
updates.emplace_back(get_mutation_builder()
|
||||
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
|
||||
@@ -2021,6 +2024,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2157,6 +2162,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::commit_cdc_generation, "
|
||||
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
|
||||
@@ -2236,6 +2243,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception());
|
||||
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
|
||||
@@ -2251,6 +2260,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::write_both_read_old, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2297,6 +2308,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
|
||||
" (node state is {}): {}", state, std::current_exception());
|
||||
@@ -2327,6 +2340,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::write_both_read_new, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
@@ -2466,6 +2481,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (raft::request_aborted&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::left_token_ring, "
|
||||
"raft_topology_cmd::command::barrier failed, error {}",
|
||||
@@ -2481,6 +2498,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
node = retake_node(co_await start_operation(), node.id);
|
||||
}
|
||||
|
||||
// Make decommissioning node a non voter before reporting operation completion below.
|
||||
// Otherwise the decommissioned node may see the completion and exit before it is removed from
|
||||
// the config at which point the removal from the config will hang if the cluster had only two
|
||||
// nodes before the decommission.
|
||||
co_await _voter_handler.on_node_removed(node.id, _as);
|
||||
|
||||
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
|
||||
|
||||
rtbuilder.done();
|
||||
|
||||
@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
|
||||
const compression_parameters& params,
|
||||
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
|
||||
) {
|
||||
auto factory = make_sstable_compressor_factory();
|
||||
co_await factory->set_recommended_dict(initial_schema->id(), dict);
|
||||
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
|
||||
sharded<default_sstable_compressor_factory> factory;
|
||||
co_await factory.start();
|
||||
std::exception_ptr ex;
|
||||
float result;
|
||||
try {
|
||||
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
|
||||
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await factory.stop();
|
||||
if (ex) {
|
||||
co_return coroutine::exception(ex);
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
@@ -10,20 +10,89 @@
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "compress.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <span>
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
} // namespace db
|
||||
|
||||
struct dictionary_holder;
|
||||
class raw_dict;
|
||||
|
||||
struct sstable_compressor_factory {
|
||||
virtual ~sstable_compressor_factory() {}
|
||||
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
|
||||
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
|
||||
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
|
||||
struct config {
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
};
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
|
||||
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
|
||||
// because then the compiler gives weird complains about default member initializers in line
|
||||
// ```
|
||||
// default_sstable_compressor_factory(config = config{});
|
||||
// ```
|
||||
// apparently due to some compiler bug related default initializers.
|
||||
struct default_sstable_compressor_factory_config {
|
||||
using self = default_sstable_compressor_factory_config;
|
||||
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
|
||||
bool register_metrics = false;
|
||||
utils::updateable_value<bool> enable_writing_dictionaries{true};
|
||||
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
|
||||
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
|
||||
|
||||
static default_sstable_compressor_factory_config from_db_config(
|
||||
const db::config&,
|
||||
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
|
||||
};
|
||||
|
||||
// Constructs compressors and decompressors for SSTables,
|
||||
// making sure that the expensive identical parts (dictionaries) are shared
|
||||
// between all shards within the same NUMA group.
|
||||
//
|
||||
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
|
||||
// decided by a content hash of the dictionary.
|
||||
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
|
||||
// to that dict.
|
||||
//
|
||||
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
|
||||
// per an opening of an SSTable).
|
||||
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
|
||||
using holder = dictionary_holder;
|
||||
public:
|
||||
using self = default_sstable_compressor_factory;
|
||||
using config = default_sstable_compressor_factory_config;
|
||||
private:
|
||||
config _cfg;
|
||||
// Maps NUMA node ID to the array of shards on that node.
|
||||
std::vector<std::vector<shard_id>> _numa_groups;
|
||||
// Holds dictionaries owned by this shard.
|
||||
std::unique_ptr<dictionary_holder> _holder;
|
||||
// All recommended dictionary updates are serialized by a single "leader shard".
|
||||
// We do this to avoid dealing with concurrent updates altogether.
|
||||
semaphore _recommendation_setting_sem{1};
|
||||
constexpr static shard_id _leader_shard = 0;
|
||||
|
||||
private:
|
||||
using sha256_type = std::array<std::byte, 32>;
|
||||
unsigned local_numa_id();
|
||||
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
|
||||
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
|
||||
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
|
||||
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
|
||||
public:
|
||||
default_sstable_compressor_factory(config = config{});
|
||||
~default_sstable_compressor_factory();
|
||||
|
||||
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
|
||||
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
|
||||
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
|
||||
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
|
||||
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();
|
||||
|
||||
@@ -805,6 +805,14 @@ class incremental_reader_selector : public reader_selector {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
return _fn(sst, *_pr);
|
||||
}
|
||||
|
||||
dht::ring_position_view pr_end() const {
|
||||
return dht::ring_position_view::for_range_end(*_pr);
|
||||
}
|
||||
|
||||
bool end_of_stream() const {
|
||||
return _selector_position.is_max() || dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) > 0;
|
||||
}
|
||||
public:
|
||||
explicit incremental_reader_selector(schema_ptr s,
|
||||
lw_shared_ptr<const sstable_set> sstables,
|
||||
@@ -839,8 +847,8 @@ public:
|
||||
auto selection = _selector->select({_selector_position, _pr});
|
||||
_selector_position = selection.next_position;
|
||||
|
||||
irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
|
||||
_selector_position);
|
||||
irclogger.trace("{}: {} sstables to consider, advancing selector to {}, eos={}", fmt::ptr(this), selection.sstables.size(),
|
||||
_selector_position, end_of_stream());
|
||||
|
||||
readers.clear();
|
||||
for (auto& sst : selection.sstables) {
|
||||
@@ -848,7 +856,7 @@ public:
|
||||
readers.push_back(create_reader(sst));
|
||||
}
|
||||
}
|
||||
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
||||
} while (!end_of_stream() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
||||
|
||||
irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size());
|
||||
|
||||
@@ -865,9 +873,15 @@ public:
|
||||
_pr = ≺
|
||||
|
||||
auto pos = dht::ring_position_view::for_range_start(*_pr);
|
||||
|
||||
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
|
||||
return create_new_readers(pos);
|
||||
}
|
||||
// If selector position Y is contained in new range [X, Z], then we should try selecting new
|
||||
// sstables since it might have sstables that overlap with that range.
|
||||
if (!_selector_position.is_max() && dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) <= 0) {
|
||||
return create_new_readers(std::nullopt);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
@@ -945,7 +959,21 @@ filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, replica:
|
||||
|
||||
std::vector<frozen_sstable_run>
|
||||
sstable_set_impl::all_sstable_runs() const {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
auto all_sstables = all();
|
||||
std::unordered_map<sstables::run_id, sstable_run> runs_m;
|
||||
std::vector<frozen_sstable_run> all_runs;
|
||||
|
||||
for (auto&& sst : *all_sstables) {
|
||||
// When a run cannot accept sstable due to overlapping, treat the rejected sstable
|
||||
// as a single-fragment run.
|
||||
if (!runs_m[sst->run_identifier()].insert(sst)) {
|
||||
all_runs.push_back(make_lw_shared<const sstable_run>(sst));
|
||||
}
|
||||
}
|
||||
for (auto&& r : runs_m | std::views::values) {
|
||||
all_runs.push_back(make_lw_shared<const sstable_run>(std::move(r)));
|
||||
}
|
||||
return all_runs;
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
|
||||
@@ -2602,7 +2602,7 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
|
||||
co_return nullptr;
|
||||
}
|
||||
auto checksum = make_lw_shared<sstables::checksum>();
|
||||
co_await do_read_simple(component_type::CRC, [checksum, this] (version_types v, file crc_file) -> future<> {
|
||||
co_await do_read_simple(component_type::CRC, [&checksum, this] (version_types v, file crc_file) -> future<> {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
|
||||
@@ -2629,7 +2629,14 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
|
||||
|
||||
co_await crc_stream.close();
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
_components->checksum = checksum->weak_from_this();
|
||||
if (!_components->checksum) {
|
||||
_components->checksum = checksum->weak_from_this();
|
||||
} else {
|
||||
// Race condition: Another fiber/thread has called `read_checksum()`
|
||||
// while we were loading the component from disk. Discard our local
|
||||
// copy and use theirs.
|
||||
checksum = _components->checksum->shared_from_this();
|
||||
}
|
||||
});
|
||||
|
||||
co_return std::move(checksum);
|
||||
|
||||
@@ -528,7 +528,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
// All the global operations are going to happen here, and just the reloading happens
|
||||
// in there.
|
||||
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
bool load_and_stream, bool primary_replica_only, stream_scope scope) {
|
||||
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
|
||||
if (_loading_new_sstables) {
|
||||
throw std::runtime_error("Already loading SSTables. Try again later");
|
||||
} else {
|
||||
@@ -544,8 +544,16 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
load_and_stream_desc = "auto-enabled-for-tablets";
|
||||
}
|
||||
|
||||
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}",
|
||||
ks_name, cf_name, load_and_stream_desc, primary_replica_only);
|
||||
if (load_and_stream && skip_reshape) {
|
||||
throw std::runtime_error("Skipping reshape is not possible when doing load-and-stream");
|
||||
}
|
||||
|
||||
if (!load_and_stream && skip_cleanup) {
|
||||
throw std::runtime_error("Skipping cleanup is not possible when doing load-and-stream");
|
||||
}
|
||||
|
||||
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, skip_cleanup={}",
|
||||
ks_name, cf_name, load_and_stream_desc, primary_replica_only, skip_cleanup);
|
||||
try {
|
||||
if (load_and_stream) {
|
||||
::table_id table_id;
|
||||
@@ -560,7 +568,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, scope, {});
|
||||
});
|
||||
} else {
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name, skip_cleanup, skip_reshape);
|
||||
}
|
||||
} catch (...) {
|
||||
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
|
||||
|
||||
@@ -104,10 +104,14 @@ public:
|
||||
*
|
||||
* @param ks_name the keyspace in which to search for new SSTables.
|
||||
* @param cf_name the column family in which to search for new SSTables.
|
||||
* @param load_and_stream load SSTables that do not belong to this node and stream them to the appropriate nodes.
|
||||
* @param primary_replica_only whether to stream only to the primary replica that owns the data.
|
||||
* @param skip_cleanup whether to skip the cleanup step when loading SSTables.
|
||||
* @param skip_reshape whether to skip the reshape step when loading SSTables.
|
||||
* @return a future<> when the operation finishes.
|
||||
*/
|
||||
future<> load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
bool load_and_stream, bool primary_replica_only, stream_scope scope);
|
||||
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope);
|
||||
|
||||
/**
|
||||
* Download new SSTables not currently tracked by the system from object store
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#include "message/messaging_service.hh"
|
||||
#include "streaming/stream_blob.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "utils/pretty_printers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "locator/host_id.hh"
|
||||
@@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i
|
||||
|
||||
future<> stream_blob_handler(replica::database& db,
|
||||
netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source,
|
||||
@@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db,
|
||||
|
||||
|
||||
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
@@ -374,7 +373,7 @@ namespace streaming {
|
||||
// Send files in the files list to the nodes in targets list over network
|
||||
// Returns number of bytes sent over network
|
||||
future<size_t>
|
||||
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
|
||||
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
|
||||
size_t ops_total_size = 0;
|
||||
if (targets.empty()) {
|
||||
co_return ops_total_size;
|
||||
@@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
ops_id, sources.size(), sources, targets);
|
||||
|
||||
struct sink_and_source {
|
||||
gms::inet_address node;
|
||||
locator::host_id node;
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink;
|
||||
rpc::source<streaming::stream_blob_cmd_data> source;
|
||||
bool sink_closed = false;
|
||||
@@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
for (auto& x : targets) {
|
||||
const auto& node = x.node;
|
||||
meta.dst_shard_id = x.shard;
|
||||
auto ip = co_await host2ip(node);
|
||||
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
|
||||
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
|
||||
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
|
||||
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
|
||||
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
|
||||
}
|
||||
|
||||
// This fiber sends data to peer node
|
||||
@@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
}
|
||||
|
||||
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
|
||||
stream_files_response resp;
|
||||
auto& table = db.find_column_family(req.table);
|
||||
auto sstables = co_await table.take_storage_snapshot(req.range);
|
||||
@@ -653,7 +651,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
|
||||
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
|
||||
req.ops_id, sstables.size(), files.size(), files, req.range);
|
||||
auto ops_start_time = std::chrono::steady_clock::now();
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
|
||||
resp.stream_bytes = stream_bytes;
|
||||
auto duration = std::chrono::steady_clock::now() - ops_start_time;
|
||||
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",
|
||||
|
||||
@@ -116,13 +116,13 @@ struct stream_blob_info {
|
||||
};
|
||||
|
||||
// The handler for the STREAM_BLOB verb.
|
||||
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
|
||||
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
|
||||
|
||||
// Exposed mainly for testing
|
||||
|
||||
future<> stream_blob_handler(replica::database& db,
|
||||
netw::messaging_service& ms,
|
||||
gms::inet_address from,
|
||||
locator::host_id from,
|
||||
streaming::stream_blob_meta meta,
|
||||
rpc::sink<streaming::stream_blob_cmd_data> sink,
|
||||
rpc::source<streaming::stream_blob_cmd_data> source,
|
||||
@@ -163,11 +163,9 @@ public:
|
||||
size_t stream_bytes = 0;
|
||||
};
|
||||
|
||||
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
|
||||
|
||||
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
|
||||
// stream sstables files specified by the stream_files_request req.
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip);
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
|
||||
|
||||
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
|
||||
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
|
||||
@@ -178,7 +176,6 @@ future<size_t> tablet_stream_files(netw::messaging_service& ms,
|
||||
std::vector<node_and_shard> targets,
|
||||
table_id table,
|
||||
file_stream_id ops_id,
|
||||
host2ip_t host2ip,
|
||||
service::frozen_topology_guard topo_guard,
|
||||
bool may_inject_errors = false
|
||||
);
|
||||
|
||||
@@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
}
|
||||
});
|
||||
ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
auto from = netw::messaging_service::get_source(cinfo).addr;
|
||||
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
||||
auto sink = _ms.local().make_sink_for_stream_blob(source);
|
||||
(void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) {
|
||||
sslog.warn("Failed to run stream blob handler: {}", eptr);
|
||||
@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
|
||||
// prepare tasks
|
||||
set_state(stream_session_state::PREPARING);
|
||||
auto& db = manager().db();
|
||||
for (auto& request : requests) {
|
||||
// always flush on stream request
|
||||
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
|
||||
const auto& ks = request.keyspace;
|
||||
// Make sure cf requested by peer node exists
|
||||
for (auto& cf : request.column_families) {
|
||||
try {
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
for (auto& summary : summaries) {
|
||||
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
|
||||
auto cf_id = summary.cf_id;
|
||||
// Make sure cf the peer node will send to us exists
|
||||
try {
|
||||
db.find_column_family(cf_id);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
|
||||
sslog.warn("{}", err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
}
|
||||
|
||||
|
||||
@@ -144,6 +144,13 @@ def test_tag_resource_write_isolation_values(scylla_only, test_table):
|
||||
test_table.meta.client.tag_resource(ResourceArn=arn, Tags=[{'Key':'system:write_isolation', 'Value':i}])
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table.meta.client.tag_resource(ResourceArn=arn, Tags=[{'Key':'system:write_isolation', 'Value':'bah'}])
|
||||
# Verify that reading system:write_isolation is possible (we didn't
|
||||
# accidentally prevent it while fixing #24098)
|
||||
keys = [tag['Key'] for tag in test_table.meta.client.list_tags_of_resource(ResourceArn=arn)['Tags']]
|
||||
assert 'system:write_isolation' in keys
|
||||
# Finally remove the system:write_isolation tag so to not modify the
|
||||
# default behavior of test_table.
|
||||
test_table.meta.client.untag_resource(ResourceArn=arn, TagKeys=['system:write_isolation'])
|
||||
|
||||
# Test that if trying to create a table with forbidden tags (in this test,
|
||||
# a list of tags longer than the maximum allowed of 50 tags), the table
|
||||
@@ -168,9 +175,9 @@ def test_too_long_tags_from_creation(dynamodb):
|
||||
dynamodb.meta.client.describe_table(TableName=name)
|
||||
|
||||
# This test is similar to the above, but uses another case of forbidden tags -
|
||||
# here an illegal value for the system::write_isolation tag. This is a
|
||||
# here an illegal value for the system:write_isolation tag. This is a
|
||||
# scylla_only test because only Alternator checks the validity of the
|
||||
# system::write_isolation tag.
|
||||
# system:write_isolation tag.
|
||||
# Reproduces issue #6809, where the table creation appeared to fail, but it
|
||||
# was actually created (without the tag).
|
||||
def test_forbidden_tags_from_creation(scylla_only, dynamodb):
|
||||
|
||||
@@ -13,8 +13,7 @@ from decimal import Decimal
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.util import new_test_table, random_string, full_query, unique_table_name, is_aws, \
|
||||
client_no_transform
|
||||
from .util import new_test_table, random_string, full_query, unique_table_name, is_aws, client_no_transform, multiset
|
||||
|
||||
# All tests in this file are expected to fail with tablets due to #16567.
|
||||
# To ensure that Alternator TTL is still being tested, instead of
|
||||
@@ -809,3 +808,42 @@ def test_ttl_expiration_long(dynamodb, waits_for_expiration):
|
||||
break
|
||||
time.sleep(max_duration/100.0)
|
||||
assert count == 99*N
|
||||
|
||||
# Alternator uses a tag "system:ttl_attribute" to store the TTL attribute
|
||||
# chosen by UpdateTimeToLive. However, this tag is not supposed to be
|
||||
# readable or writable by the user directly - it should be read or written
|
||||
# only with the usual UpdateTimeToLive and DescribeTimeToLive operations.
|
||||
# The following two test confirms that this is the case. The first test
|
||||
# checks that the internal tag is invisible, i.e., not returned by
|
||||
# ListTagsOfResource. Basically we check that enabling TTL does not add
|
||||
# any tags to the list of tags.
|
||||
# Reproduces issue #24098.
|
||||
def test_ttl_tag_is_invisible(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
Tags=TAGS,
|
||||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]
|
||||
) as table:
|
||||
client = table.meta.client
|
||||
client.update_time_to_live(TableName=table.name,
|
||||
TimeToLiveSpecification={'AttributeName': 'x', 'Enabled': True})
|
||||
# Verify that TTL is set for this table, but no extra tags
|
||||
# like "system:ttl_attribute" (or anything else) are visible
|
||||
# in ListTagsOfResource:
|
||||
assert client.describe_time_to_live(TableName=table.name)['TimeToLiveDescription'] == {'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'x'}
|
||||
arn = client.describe_table(TableName=table.name)['Table']['TableArn']
|
||||
assert multiset(TAGS) == multiset(client.list_tags_of_resource(ResourceArn=arn)['Tags'])
|
||||
|
||||
# Now check that the internal tag system:ttl_attribute cannot be written with
|
||||
# TagResource or UntagResource (it can only be modified by UpdateTimeToLive).
|
||||
# This is an Scylla-only test because in DynamoDB, there is nothing
|
||||
# special about the tag name "system:ttl_attribute", and it can be written.
|
||||
# Reproduces issue #24098.
|
||||
def test_ttl_tag_is_unwritable(test_table, scylla_only):
|
||||
tag_name = 'system:ttl_attribute'
|
||||
client = test_table.meta.client
|
||||
arn = client.describe_table(TableName=test_table.name)['Table']['TableArn']
|
||||
with pytest.raises(ClientError, match='ValidationException.*internal'):
|
||||
client.tag_resource(ResourceArn=arn, Tags=[{'Key': tag_name, 'Value': 'x'}])
|
||||
with pytest.raises(ClientError, match='ValidationException.*internal'):
|
||||
client.untag_resource(ResourceArn=arn, TagKeys=[tag_name])
|
||||
|
||||
@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
|
||||
secondary_index_test.cc
|
||||
sessions_test.cc
|
||||
sstable_compaction_test.cc
|
||||
sstable_compressor_factory_test.cc
|
||||
sstable_directory_test.cc
|
||||
sstable_set_test.cc
|
||||
statement_restrictions_test.cc
|
||||
|
||||
@@ -596,9 +596,10 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
|
||||
cf_name))
|
||||
.get();
|
||||
f1.get();
|
||||
e.get_system_keyspace().local().load_built_views().get();
|
||||
|
||||
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
|
||||
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
|
||||
f2.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "ent/encryption/encryption.hh"
|
||||
#include "ent/encryption/symmetric_key.hh"
|
||||
#include "ent/encryption/local_file_provider.hh"
|
||||
#include "ent/encryption/encryption_exceptions.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
@@ -649,10 +650,20 @@ SEASTAR_TEST_CASE(test_kms_provider_with_master_key_in_cf, *check_run_test_decor
|
||||
);
|
||||
|
||||
// should fail
|
||||
BOOST_REQUIRE_THROW(
|
||||
co_await test_provider("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", tmp, yaml)
|
||||
, std::exception
|
||||
);
|
||||
try {
|
||||
try {
|
||||
co_await test_provider("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', "
|
||||
"'secret_key_strength': 128",
|
||||
tmp, yaml);
|
||||
} catch (std::nested_exception& ex) {
|
||||
std::rethrow_if_nested(ex);
|
||||
}
|
||||
BOOST_FAIL("Required an exception to be re-thrown");
|
||||
} catch (encryption::configuration_error&) {
|
||||
// EXPECTED
|
||||
} catch (...) {
|
||||
BOOST_FAIL(format("Unexpected exception: {}", std::current_exception()));
|
||||
}
|
||||
|
||||
// should be ok
|
||||
co_await test_provider(fmt::format("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'master_key': '{}', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", kms_key_alias)
|
||||
@@ -949,10 +960,21 @@ SEASTAR_TEST_CASE(test_gcp_provider_with_master_key_in_cf, *check_run_test_decor
|
||||
);
|
||||
|
||||
// should fail
|
||||
BOOST_REQUIRE_THROW(
|
||||
co_await test_provider("'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", tmp, yaml)
|
||||
, std::exception
|
||||
);
|
||||
try {
|
||||
try {
|
||||
co_await test_provider(
|
||||
"'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128",
|
||||
tmp,
|
||||
yaml);
|
||||
} catch (std::nested_exception& ex) {
|
||||
std::rethrow_if_nested(ex);
|
||||
}
|
||||
BOOST_FAIL("Required an exception to be re-thrown");
|
||||
} catch (encryption::configuration_error&) {
|
||||
// EXPECTED
|
||||
} catch (...) {
|
||||
BOOST_FAIL(format("Unexpected exception: {}", std::current_exception()));
|
||||
}
|
||||
|
||||
// should be ok
|
||||
co_await test_provider(fmt::format("'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'master_key': '{}', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", gcp.key_name)
|
||||
@@ -1072,7 +1094,7 @@ static future<> network_error_test_helper(const tmpdir& tmp, const std::string&
|
||||
|
||||
BOOST_REQUIRE_THROW(
|
||||
co_await test_broken_encrypted_commitlog(args, scopts);
|
||||
, std::exception
|
||||
, exceptions::mutation_write_timeout_exception
|
||||
);
|
||||
|
||||
co_await proxy.stop();
|
||||
|
||||
@@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
|
||||
if (!verb_register) {
|
||||
co_await smp::invoke_on_all([&] {
|
||||
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
||||
auto from = netw::messaging_service::get_source(cinfo).addr;
|
||||
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
||||
auto sink = global_ms.local().make_sink_for_stream_blob(source);
|
||||
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
|
||||
auto path = meta.filename + suffix;
|
||||
@@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
|
||||
co_return make_file_input_stream(std::move(file), foptions);
|
||||
};
|
||||
}
|
||||
auto host2ip = [&global_db] (locator::host_id id) -> future<gms::inet_address> {
|
||||
co_return global_db.local().get_token_metadata().get_topology().my_address();
|
||||
};
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error);
|
||||
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
|
||||
co_await mark_tablet_stream_done(ops_id);
|
||||
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
|
||||
ret = true;
|
||||
|
||||
@@ -812,27 +812,20 @@ SEASTAR_THREAD_TEST_CASE(background_reclaim) {
|
||||
logalloc::shard_tracker().stop().get();
|
||||
});
|
||||
|
||||
sleep(500ms).get(); // sleep a little, to give the reclaimer a head start
|
||||
|
||||
std::vector<managed_bytes> std_allocs;
|
||||
size_t std_alloc_size = 1000000; // note that managed_bytes fragments these, even in std
|
||||
for (int i = 0; i < 50; ++i) {
|
||||
// Background reclaim is supposed to eventually ensure a certain amount of free memory.
|
||||
while (memory::free_memory() < background_reclaim_free_memory_threshold) {
|
||||
thread::maybe_yield();
|
||||
}
|
||||
|
||||
auto compacted_pre = logalloc::shard_tracker().statistics().memory_compacted;
|
||||
fmt::print("compacted {} items {} (pre)\n", compacted_pre, evictable_allocs.size());
|
||||
std_allocs.emplace_back(managed_bytes::initialized_later(), std_alloc_size);
|
||||
auto compacted_post = logalloc::shard_tracker().statistics().memory_compacted;
|
||||
fmt::print("compacted {} items {} (post)\n", compacted_post, evictable_allocs.size());
|
||||
BOOST_REQUIRE_EQUAL(compacted_pre, compacted_post);
|
||||
|
||||
// Pretend to do some work. Sleeping would be too easy, as the background reclaim group would use
|
||||
// all that time.
|
||||
//
|
||||
// Use thread_cputime_clock to prevent overcommitted test machines from stealing CPU time
|
||||
// and causing test failures.
|
||||
auto deadline = thread_cputime_clock::now() + 100ms;
|
||||
while (thread_cputime_clock::now() < deadline) {
|
||||
thread::maybe_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
return sstables::test_env::do_with([] (sstables::test_env& env) {
|
||||
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("v", bytes_type)
|
||||
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
|
||||
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
|
||||
return seastar::async([&env, s, &cf] {
|
||||
// populate
|
||||
auto new_key = [&] {
|
||||
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
|
||||
flushed.get();
|
||||
});
|
||||
}).then([cf_stats] {});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4023,3 +4023,108 @@ SEASTAR_THREAD_TEST_CASE(test_to_data_query_results_with_distinct_and_per_partit
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), pkeys.size() * 2);
|
||||
}
|
||||
}
|
||||
|
||||
// Max-purgeable has two values: one for regular and one for shadowable
|
||||
// tombstones. Check that the value is not sticky -- if a shadowable is requested
|
||||
// first, it won't apply to regular tombstones and vice-versa.
|
||||
SEASTAR_THREAD_TEST_CASE(test_mutation_compactor_sticky_max_purgeable) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto dk = ss.make_pkey(1);
|
||||
|
||||
const auto& v_def = *s->get_column_definition(to_bytes("v"));
|
||||
const auto value = serialized("v");
|
||||
|
||||
const auto deletion_time = gc_clock::now() - std::chrono::hours(1) - s->gc_grace_seconds();
|
||||
const auto compaction_time = gc_clock::now();
|
||||
const api::timestamp_type shadowable_max_purgeable = 110;
|
||||
const api::timestamp_type regular_max_purgeable = 50;
|
||||
const api::timestamp_type timestamp = 100;
|
||||
|
||||
class mutation_rebuilding_consumer {
|
||||
mutation_rebuilder_v2 _mr;
|
||||
|
||||
public:
|
||||
explicit mutation_rebuilding_consumer(schema_ptr s) : _mr(std::move(s)) { }
|
||||
void consume_new_partition(dht::decorated_key dk) { _mr.consume_new_partition(std::move(dk)); }
|
||||
void consume(tombstone t) { _mr.consume(t); }
|
||||
stop_iteration consume(static_row&& sr, tombstone, bool) { return _mr.consume(std::move(sr)); }
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _mr.consume(std::move(cr)); }
|
||||
stop_iteration consume(range_tombstone_change&& rtc) { return _mr.consume(std::move(rtc)); }
|
||||
stop_iteration consume_end_of_partition() { return _mr.consume_end_of_partition(); }
|
||||
mutation_opt consume_end_of_stream() { return _mr.consume_end_of_stream(); }
|
||||
};
|
||||
|
||||
auto get_max_purgeable = [] (const dht::decorated_key&, is_shadowable is) {
|
||||
return is == is_shadowable::yes ? shadowable_max_purgeable : regular_max_purgeable;
|
||||
};
|
||||
|
||||
auto compact_and_expire = [&] (mutation mut) {
|
||||
auto reader = make_mutation_reader_from_mutations(s, permit, std::move(mut));
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
auto compactor = compact_for_compaction_v2<mutation_rebuilding_consumer>(
|
||||
*s,
|
||||
compaction_time,
|
||||
get_max_purgeable,
|
||||
tombstone_gc_state(nullptr),
|
||||
mutation_rebuilding_consumer(s));
|
||||
auto mut_opt = reader.consume(std::move(compactor)).get();
|
||||
|
||||
BOOST_REQUIRE(mut_opt);
|
||||
|
||||
return *mut_opt;
|
||||
};
|
||||
|
||||
// max-purgeable returned for shadowable tombstone becomes sticky and applies to row tombstone after it
|
||||
{
|
||||
mutation mut(s, dk);
|
||||
mutation mut_compacted(s, dk);
|
||||
|
||||
auto row1 = clustering_row(ss.make_ckey(1));
|
||||
row1.apply(shadowable_tombstone(timestamp, deletion_time));
|
||||
|
||||
auto row2 = clustering_row(ss.make_ckey(2));
|
||||
row2.apply(tombstone(timestamp, deletion_time));
|
||||
|
||||
auto row3 = clustering_row(ss.make_ckey(3));
|
||||
row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value));
|
||||
|
||||
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row2)));
|
||||
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3)));
|
||||
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row1)));
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row2)));
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row3)));
|
||||
|
||||
assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted);
|
||||
}
|
||||
|
||||
// max-purgeable returned for regular tombstone becomes sticky and applies to shadowable tombstone after it
|
||||
{
|
||||
mutation mut(s, dk);
|
||||
mutation mut_compacted(s, dk);
|
||||
|
||||
auto row1 = clustering_row(ss.make_ckey(1));
|
||||
row1.apply(tombstone(timestamp, deletion_time));
|
||||
|
||||
auto row2 = clustering_row(ss.make_ckey(2));
|
||||
row2.apply(shadowable_tombstone(timestamp, deletion_time));
|
||||
|
||||
auto row3 = clustering_row(ss.make_ckey(3));
|
||||
row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value));
|
||||
|
||||
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row1)));
|
||||
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3)));
|
||||
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row1)));
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row2)));
|
||||
mut.apply(mutation_fragment(*s, permit, std::move(row3)));
|
||||
|
||||
assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
|
||||
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
|
||||
}
|
||||
|
||||
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
|
||||
const schema& s = *expected.schema();
|
||||
auto expected_cont = expected.entry().squashed_continuity(s);
|
||||
auto actual_cont = actual.get_continuity(s);
|
||||
bool actual_static_cont = actual.static_row_continuous();
|
||||
bool expected_static_cont = expected.squashed().static_row_continuous();
|
||||
if (actual_static_cont != expected_static_cont) {
|
||||
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
if (!expected_cont.equals(s, actual_cont)) {
|
||||
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
|
||||
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
// Tests that reading many versions using a cursor gives the logical mutation back.
|
||||
return seastar::async([] {
|
||||
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
auto snap = e.read();
|
||||
auto actual = read_using_cursor(*snap);
|
||||
|
||||
assert_that(s, actual).has_same_continuity(expected);
|
||||
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
|
||||
// Note: squashed continuity of an entry is slightly different than the continuity
|
||||
// of a squashed entry.
|
||||
//
|
||||
// Squashed continuity is the union of continuities of all versions in the entry,
|
||||
// and in particular it includes empty dummy rows resulting in the logical merge
|
||||
// of version.
|
||||
// The process of actually squashing an entry is allowed to
|
||||
// remove those empty dummies, so the squashed entry can have slightly
|
||||
// smaller continuity.
|
||||
//
|
||||
// Since a cursor isn't allowed to remove dummy rows, the strongest test
|
||||
// we can do here is to compare the continuity of the cursor-read mutation
|
||||
// with the squashed continuity of the entry.
|
||||
assert_has_same_squashed_continuity(actual, e);
|
||||
assert_that(s, actual).is_equal_to_compacted(expected);
|
||||
|
||||
// Reversed iteration
|
||||
|
||||
@@ -242,7 +242,8 @@ SEASTAR_THREAD_TEST_CASE(test_result_map_reduce) {
|
||||
auto bar_exc = [] () { return result<sstring>(bo::failure(bar_exception())); };
|
||||
auto foo_throw = [] () { return make_exception_future<result<sstring>>(foo_exception()); };
|
||||
|
||||
BOOST_REQUIRE_EQUAL(reduce(sstring("brown"), sstring("fox")).value(), "the brown fox");
|
||||
auto res = reduce(sstring("brown"), sstring("fox")).value();
|
||||
BOOST_REQUIRE(res == "the brown fox" || res == "the fox brown");
|
||||
BOOST_REQUIRE_EQUAL(reduce(foo_exc(), sstring("fox")).error(), exc_container(foo_exception()));
|
||||
BOOST_REQUIRE_EQUAL(reduce(sstring("brown"), foo_exc()).error(), exc_container(foo_exception()));
|
||||
BOOST_REQUIRE_EQUAL(reduce(foo_exc(), bar_exc()).error(), exc_container(foo_exception()));
|
||||
|
||||
@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
|
||||
auto ts_in_ms = std::chrono::milliseconds(ts);
|
||||
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
|
||||
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
|
||||
|
||||
BOOST_REQUIRE(ret.second == expected);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2319,6 +2318,7 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
uint32_t _seed;
|
||||
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
|
||||
@@ -2336,7 +2336,7 @@ public:
|
||||
compress))
|
||||
, _random_schema(_seed, *_random_schema_spec)
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
testlog.info("random_schema: {}", _random_schema.cql());
|
||||
}
|
||||
|
||||
@@ -2402,12 +2402,14 @@ public:
|
||||
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
|
||||
|
||||
private:
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
sharded<test_env> _env;
|
||||
|
||||
public:
|
||||
scrub_test_framework()
|
||||
{
|
||||
_env.start().get();
|
||||
_env.start(test_env_config(), std::ref(*scf)).get();
|
||||
}
|
||||
|
||||
~scrub_test_framework() {
|
||||
|
||||
137
test/boost/sstable_compressor_factory_test.cc
Normal file
137
test/boost/sstable_compressor_factory_test.cc
Normal file
@@ -0,0 +1,137 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(sstable_compressor_factory_test)
|
||||
|
||||
// 1. Create a random message.
|
||||
// 2. Set this random message as the recommended dict.
|
||||
// 3. On all shards, create compressors.
|
||||
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
|
||||
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
|
||||
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
|
||||
// 7. Repeat this a few times for both lz4 and zstd.
|
||||
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
|
||||
|
||||
// Create a compressor factory.
|
||||
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
|
||||
auto config = default_sstable_compressor_factory::config{
|
||||
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
|
||||
};
|
||||
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
|
||||
sstable_compressor_factory.start(std::cref(config)).get();
|
||||
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
|
||||
|
||||
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
|
||||
auto table = table_id::create_random_id();
|
||||
|
||||
// Retry a few times just to check that it works more than once.
|
||||
for (int retry = 0; retry < 3; ++retry) {
|
||||
// Generate a random (and hence uhcompressible without a dict) message.
|
||||
auto message = tests::random::get_sstring(4096);
|
||||
auto dict_view = std::as_bytes(std::span(message));
|
||||
// Set the message as the dict to make the message perfectly compressible.
|
||||
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
|
||||
|
||||
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
|
||||
std::vector<unsigned> compressor_numa_nodes(smp::count);
|
||||
std::vector<unsigned> decompressor_numa_nodes(smp::count);
|
||||
|
||||
// Try for both algorithms, just in case there are some differences in how dictionary
|
||||
// distribution over shards is implemented between them.
|
||||
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
|
||||
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
|
||||
// Validate that the dictionaries work as intended,
|
||||
// and check that their owner is as expected.
|
||||
|
||||
auto params = compression_parameters(algo);
|
||||
|
||||
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
|
||||
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
|
||||
|
||||
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
|
||||
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
|
||||
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
|
||||
|
||||
// Check that the dictionary used by this shard lies on the same NUMA node.
|
||||
// This is important to avoid cross-node memory accesses on the hot path.
|
||||
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
|
||||
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
|
||||
|
||||
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
|
||||
auto output_max_size = compressor->compress_max_size(message.size());
|
||||
auto compressed = std::vector<char>(output_max_size);
|
||||
auto compressed_size = compressor->compress(
|
||||
reinterpret_cast<const char*>(message.data()), message.size(),
|
||||
reinterpret_cast<char*>(compressed.data()), compressed.size());
|
||||
BOOST_REQUIRE_GE(compressed_size, 0);
|
||||
compressed.resize(compressed_size);
|
||||
|
||||
// Validate that the recommeded dict was actually used.
|
||||
BOOST_CHECK(compressed.size() < message.size() / 10);
|
||||
|
||||
auto decompressed = std::vector<char>(message.size());
|
||||
auto decompressed_size = decompressor->uncompress(
|
||||
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
|
||||
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
|
||||
BOOST_REQUIRE_GE(decompressed_size, 0);
|
||||
decompressed.resize(decompressed_size);
|
||||
|
||||
// Validate that the roundtrip through compressor and decompressor
|
||||
// resulted in the original message.
|
||||
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
|
||||
})).get();
|
||||
}
|
||||
|
||||
// Check that the number of owners (and hence, copies) is equal to the number
|
||||
// of NUMA nodes.
|
||||
// This isn't that important, but we don't want to duplicate dictionaries
|
||||
// within a NUMA node unnecessarily.
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
BOOST_CHECK_EQUAL(
|
||||
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
|
||||
{
|
||||
std::vector<unsigned> one_numa_node(smp::count);
|
||||
test_one_numa_topology(one_numa_node);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> two_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
|
||||
two_numa_nodes[i] = i % 2;
|
||||
}
|
||||
test_one_numa_topology(two_numa_nodes);
|
||||
}
|
||||
{
|
||||
std::vector<unsigned> n_numa_nodes(smp::count);
|
||||
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
|
||||
n_numa_nodes[i] = i;
|
||||
}
|
||||
test_one_numa_topology(n_numa_nodes);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto builder = schema_builder("test", "summary_test")
|
||||
.with_column("a", int32_type, column_kind::partition_key);
|
||||
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
|
||||
BOOST_REQUIRE(list.size() == 130);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
// d int,
|
||||
// e blob
|
||||
//);
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
|
||||
return seastar::async([&env, version] {
|
||||
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
|
||||
auto builder = schema_builder("test", "test_multi_schema")
|
||||
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
|
||||
BOOST_REQUIRE(!m);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
|
||||
BOOST_REQUIRE(insert(2, 2) == true);
|
||||
BOOST_REQUIRE(insert(5, 5) == true);
|
||||
BOOST_REQUIRE(run.all().size() == 5);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
|
||||
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
sstables::sstable_generation_generator gen_generator{0};
|
||||
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
|
||||
tmpdir tmp;
|
||||
auto env = test_env();
|
||||
|
||||
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
||||
auto env = test_env({}, *scf);
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
// please note, the SSTables used by this test are stored under
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "readers/from_mutations.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(sstable_set_test)
|
||||
|
||||
@@ -203,4 +204,224 @@ SEASTAR_TEST_CASE(test_tablet_sstable_set_copy_ctor) {
|
||||
}, std::move(cfg));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_set_fast_forward_by_cache_reader_simulation) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto pks = tests::generate_partition_keys(6, s);
|
||||
std::vector<mutation> muts;
|
||||
for (auto pk : pks) {
|
||||
auto mut = mutation(s, pk);
|
||||
ss.add_row(mut, ss.make_ckey(0), "val");
|
||||
muts.push_back(std::move(mut));
|
||||
}
|
||||
|
||||
sstable_writer_config cfg = env.manager().configure_writer("");
|
||||
|
||||
std::vector<sstables::shared_sstable> ssts;
|
||||
|
||||
{
|
||||
auto mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), {muts[0], muts[1], muts[2]});
|
||||
auto sst = make_sstable_easy(env, std::move(mr), cfg);
|
||||
testlog.info("sstable [{}, {}]", sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
|
||||
ssts.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
{
|
||||
auto mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), {muts[4], muts[5]});
|
||||
auto sst = make_sstable_easy(env, std::move(mr), cfg);
|
||||
testlog.info("sstable [{}, {}]", sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
|
||||
ssts.push_back(std::move(sst));
|
||||
}
|
||||
auto token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
auto set = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), token_range));
|
||||
for (auto& sst : ssts) {
|
||||
set->insert(sst);
|
||||
}
|
||||
|
||||
// simulation of full scan on range [0, 5]
|
||||
// cache reader fetches [0, 1] -> next [4]
|
||||
// [2] consumed from cache
|
||||
// fast forward to [3, 5]
|
||||
|
||||
auto first_range = dht::partition_range::make({pks[0]}, {pks[1]});
|
||||
auto reader = set->make_range_sstable_reader(s, env.make_reader_permit(),
|
||||
first_range,
|
||||
s->full_slice(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes);
|
||||
auto close_r = deferred_close(reader);
|
||||
|
||||
auto mopt = read_mutation_from_mutation_reader(reader).get();
|
||||
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[0]));
|
||||
|
||||
mopt = read_mutation_from_mutation_reader(reader).get();
|
||||
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[1]));
|
||||
|
||||
auto second_range = dht::partition_range::make({pks[3]}, {pks[5]});
|
||||
|
||||
reader.fast_forward_to(second_range).get();
|
||||
|
||||
mopt = read_mutation_from_mutation_reader(reader).get();
|
||||
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[4]));
|
||||
|
||||
mopt = read_mutation_from_mutation_reader(reader).get();
|
||||
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[5]));
|
||||
// EOS
|
||||
BOOST_REQUIRE(!read_mutation_from_mutation_reader(reader).get());
|
||||
});
|
||||
}
|
||||
|
||||
static future<> guarantee_all_tablet_replicas_on_shard0(cql_test_env& env) {
|
||||
auto& ss = env.get_storage_service().local();
|
||||
auto& stm = env.get_shared_token_metadata().local();
|
||||
auto my_host_id = ss.get_token_metadata_ptr()->get_topology().my_host_id();
|
||||
|
||||
co_await ss.set_tablet_balancing_enabled(false);
|
||||
co_await stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> {
|
||||
tm.update_topology(my_host_id, locator::endpoint_dc_rack::default_location, locator::node::state::normal, 1);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_tablet_sstable_set_fast_forward_across_tablet_ranges) {
|
||||
// enable tablets, to get access to tablet_storage_group_manager
|
||||
cql_test_config cfg;
|
||||
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
|
||||
|
||||
return do_with_cql_env_thread([&](cql_test_env& env) {
|
||||
guarantee_all_tablet_replicas_on_shard0(env).get();
|
||||
|
||||
env.execute_cql("CREATE KEYSPACE test_tablet_sstable_set"
|
||||
" WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1} AND TABLETS = {'enabled': true, 'initial': 2};").get();
|
||||
env.execute_cql("CREATE TABLE test_tablet_sstable_set.test (pk int PRIMARY KEY)").get();
|
||||
|
||||
auto& table = env.local_db().find_column_family("test_tablet_sstable_set", "test");
|
||||
auto s = table.schema();
|
||||
auto& sgm = column_family_test::get_storage_group_manager(table);
|
||||
auto erm = table.get_effective_replication_map();
|
||||
auto& tmap = erm->get_token_metadata().tablets().get_tablet_map(s->id());
|
||||
|
||||
std::unordered_map<locator::tablet_id, std::vector<dht::decorated_key>> keys_per_tablet;
|
||||
|
||||
table.disable_auto_compaction().get();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
env.execute_cql(fmt::format("INSERT INTO test_tablet_sstable_set.test (pk) VALUES ({})", i)).get();
|
||||
auto key = dht::decorate_key(*s, partition_key::from_singular(*s, i));
|
||||
keys_per_tablet[tmap.get_tablet_id(key.token())].push_back(key);
|
||||
// produces single-partition sstables, to stress incremental selector.
|
||||
table.flush().get();
|
||||
}
|
||||
|
||||
for (auto& [_, keys] : keys_per_tablet) {
|
||||
auto cmp = dht::decorated_key::less_comparator(s);
|
||||
std::ranges::sort(keys, cmp);
|
||||
}
|
||||
|
||||
auto set = replica::make_tablet_sstable_set(s, *sgm.get(), tmap);
|
||||
|
||||
utils::get_local_injector().enable("enable_read_debug_log");
|
||||
testlog.info("first tablet range: {}", tmap.get_token_range(locator::tablet_id(0)));
|
||||
testlog.info("second tablet range: {}", tmap.get_token_range(locator::tablet_id(1)));
|
||||
|
||||
auto& keys_for_first_tablet = keys_per_tablet.at(locator::tablet_id(0));
|
||||
auto& keys_for_second_tablet = keys_per_tablet.at(locator::tablet_id(1));
|
||||
|
||||
auto create_reader = [&] (const dht::partition_range& range) {
|
||||
return set->make_range_sstable_reader(s, make_reader_permit(env),
|
||||
range,
|
||||
s->full_slice(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes);
|
||||
};
|
||||
|
||||
auto read_and_check = [&] (auto& reader, const dht::decorated_key& expected) {
|
||||
auto mopt = read_mutation_from_mutation_reader(reader).get();
|
||||
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, expected));
|
||||
};
|
||||
auto end_of_stream_check = [&] (auto& reader) {
|
||||
BOOST_REQUIRE(!read_mutation_from_mutation_reader(reader).get());
|
||||
};
|
||||
|
||||
// simulation of full scan on tablet ranges
|
||||
// cache reader fetches range of first tablet
|
||||
// fast forward to range of second tablet
|
||||
{
|
||||
auto first_range = dht::partition_range::make({keys_for_first_tablet.front()}, {keys_for_first_tablet.back()});
|
||||
auto reader = create_reader(first_range);
|
||||
auto close_r = deferred_close(reader);
|
||||
|
||||
for (auto& k : keys_for_first_tablet) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
|
||||
auto second_range = dht::partition_range::make({keys_for_second_tablet.front()}, {keys_for_second_tablet.back()});
|
||||
reader.fast_forward_to(second_range).get();
|
||||
|
||||
for (auto& k: keys_for_second_tablet) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
end_of_stream_check(reader);
|
||||
}
|
||||
|
||||
// verify that fast forward will be able to create reader when the new range goes across tablet boundaries.
|
||||
{
|
||||
auto first_range = dht::partition_range::make({keys_for_first_tablet[0]}, {keys_for_first_tablet[0]});
|
||||
auto reader = create_reader(first_range);
|
||||
auto close_r = deferred_close(reader);
|
||||
|
||||
for (auto& k : std::span{keys_for_first_tablet.begin(), 1}) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
|
||||
auto second_range = dht::partition_range::make({keys_for_first_tablet[1]}, {keys_for_second_tablet.back()});
|
||||
reader.fast_forward_to(second_range).get();
|
||||
|
||||
for (auto& k : std::span{keys_for_first_tablet.begin() + 1, keys_for_first_tablet.size() - 1}) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
for (auto& k: keys_for_second_tablet) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
end_of_stream_check(reader);
|
||||
}
|
||||
|
||||
// Reproduces a scenario of range scan where fast forward will overlap with next position returned by selector
|
||||
// full scan: [0, 20]
|
||||
// 1) cache reader emits [0, 10) (position 10 is cached)
|
||||
// 2) incremental selector returns 0 sstables, next position of 16 (the start of a sstable)
|
||||
// 3) fast forward to range [14, 20]
|
||||
// fast forward might expect new range to be after next position (16), but [14, 20] is before and overlaps with next position.
|
||||
// the incremental selector must be called also when new range overlaps with next position. otherwise, there's chance of
|
||||
// missing data.
|
||||
{
|
||||
auto first_token = tmap.get_first_token(locator::tablet_id(0));
|
||||
auto first_range = dht::partition_range::make({dht::ring_position::starting_at(first_token)},
|
||||
{dht::ring_position::ending_at(first_token)});
|
||||
auto reader = create_reader(first_range);
|
||||
auto close_r = deferred_close(reader);
|
||||
|
||||
end_of_stream_check(reader);
|
||||
|
||||
auto& keys_for_second_tablet = keys_per_tablet.at(locator::tablet_id(1));
|
||||
auto second_range = dht::partition_range::make({dht::ring_position::starting_at(dht::next_token(first_token))},
|
||||
{keys_for_second_tablet.back()});
|
||||
|
||||
reader.fast_forward_to(second_range).get();
|
||||
|
||||
for (auto& k : keys_for_first_tablet) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
for (auto& k: keys_for_second_tablet) {
|
||||
read_and_check(reader, k);
|
||||
}
|
||||
end_of_stream_check(reader);
|
||||
}
|
||||
|
||||
}, std::move(cfg));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <source_location>
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/random_utils.hh"
|
||||
@@ -1687,6 +1688,12 @@ void check_no_rack_overload(const token_metadata& tm) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
|
||||
cql_test_config cfg{};
|
||||
// This test relies on the fact that we use an RF strictly smaller than the number of racks.
|
||||
// Because of that, we cannot enable `rf_rack_valid_keyspaces` in this test because we won't
|
||||
// be able to create a keyspace.
|
||||
cfg.db_config->rf_rack_valid_keyspaces.set(false);
|
||||
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
|
||||
@@ -1733,7 +1740,7 @@ SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count());
|
||||
}).get();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) {
|
||||
@@ -2123,9 +2130,10 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
|
||||
// which is a proof that it doesn't stop due to active migrations.
|
||||
|
||||
topology_builder topo(e);
|
||||
auto host1 = topo.add_node(node_state::normal, 1);
|
||||
auto host1 = topo.add_node(node_state::normal, 2);
|
||||
topo.start_new_rack();
|
||||
auto host2 = topo.add_node(node_state::normal, 1);
|
||||
auto host3 = topo.add_node(node_state::normal, 2);
|
||||
auto host3 = topo.add_node(node_state::normal, 1);
|
||||
|
||||
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4);
|
||||
auto table1 = add_table(e, ks_name).get();
|
||||
@@ -2146,8 +2154,8 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
|
||||
tablet_transition_stage::allow_write_both_read_old,
|
||||
tablet_transition_kind::migration,
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host3, 0},
|
||||
tablet_replica {host2, 0},
|
||||
},
|
||||
tablet_replica {host3, 0}
|
||||
});
|
||||
@@ -2183,6 +2191,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
|
||||
topology_builder topo(e);
|
||||
|
||||
auto host1 = topo.add_node(node_state::normal, 1);
|
||||
topo.start_new_rack();
|
||||
auto host2 = topo.add_node(node_state::normal, 1);
|
||||
topo.add_node(node_state::normal, 2);
|
||||
|
||||
@@ -2225,10 +2234,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
|
||||
topology_builder topo(e);
|
||||
|
||||
const auto shard_count = 2;
|
||||
auto host1 = topo.add_node(node_state::normal, shard_count);
|
||||
auto host2 = topo.add_node(node_state::normal, shard_count);
|
||||
auto host3 = topo.add_node(node_state::normal, shard_count);
|
||||
auto host4 = topo.add_node(node_state::normal, shard_count);
|
||||
auto rack1 = topo.rack();
|
||||
auto rack2 = topo.start_new_rack();
|
||||
|
||||
auto host1 = topo.add_node(node_state::normal, shard_count, rack1);
|
||||
auto host2 = topo.add_node(node_state::normal, shard_count, rack2);
|
||||
auto host3 = topo.add_node(node_state::normal, shard_count, rack1);
|
||||
auto host4 = topo.add_node(node_state::normal, shard_count, rack2);
|
||||
|
||||
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16);
|
||||
auto table1 = add_table(e, ks_name).get();
|
||||
@@ -2533,35 +2545,39 @@ allocate_replicas_in_racks(const std::vector<endpoint_dc_rack>& racks, int rf,
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
const int n_hosts = 6;
|
||||
auto shard_count = 2;
|
||||
auto do_test_case = [] (const shard_id rf) {
|
||||
return do_with_cql_env_thread([rf] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
const int n_hosts = 6;
|
||||
auto shard_count = 2;
|
||||
|
||||
std::vector<host_id> hosts;
|
||||
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
|
||||
// Sanity check just in case someone modifies the caller of this lambda
|
||||
// and starts providing RF > n_hosts. In that case, we wouldn't be able
|
||||
// to create an RF-rack-valid keyspace.
|
||||
assert(rf <= n_hosts);
|
||||
|
||||
std::vector<endpoint_dc_rack> racks {
|
||||
topo.rack(),
|
||||
topo.start_new_rack(),
|
||||
};
|
||||
std::vector<host_id> hosts;
|
||||
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
|
||||
|
||||
for (int i = 0; i < n_hosts; ++i) {
|
||||
auto rack = racks[(i + 1) % racks.size()];
|
||||
auto h = topo.add_node(node_state::normal, shard_count, rack);
|
||||
if (i) {
|
||||
// Leave the first host empty by making it invisible to allocation algorithm.
|
||||
hosts_by_rack[rack.rack].push_back(h);
|
||||
std::vector<endpoint_dc_rack> racks{topo.rack()};
|
||||
for (shard_id i = 1; i < rf; ++i) {
|
||||
racks.push_back(topo.start_new_rack());
|
||||
}
|
||||
}
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
for (int i = 0; i < n_hosts; ++i) {
|
||||
auto rack = racks[(i + 1) % racks.size()];
|
||||
auto h = topo.add_node(node_state::normal, shard_count, rack);
|
||||
if (i) {
|
||||
// Leave the first host empty by making it invisible to allocation algorithm.
|
||||
hosts_by_rack[rack.rack].push_back(h);
|
||||
}
|
||||
}
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
|
||||
for (int i = 0; i < 13; ++i) {
|
||||
size_t total_tablet_count = 0;
|
||||
std::vector<sstring> keyspaces;
|
||||
size_t tablet_count_bits = 8;
|
||||
int rf = tests::random::get_int<shard_id>(2, 4);
|
||||
for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) {
|
||||
if (tests::random::get_bool()) {
|
||||
continue;
|
||||
@@ -2622,8 +2638,15 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
|
||||
seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) {
|
||||
return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result();
|
||||
}).get();
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
};
|
||||
|
||||
const int test_case_number = 13;
|
||||
for (int i = 0; i < test_case_number; ++i) {
|
||||
const shard_id rf = tests::random::get_int<shard_id>(2, 4);
|
||||
testlog.info("{}: Starting test case {} for RF={}", std::source_location::current().function_name(), i + 1, rf);
|
||||
do_test_case(rf).get();
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_balancing_heterogeneous_cluster) {
|
||||
@@ -2820,6 +2843,13 @@ SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables_imbala
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
|
||||
cql_test_config cfg = tablet_cql_test_config();
|
||||
// FIXME: This test creates two keyspaces with two different replication factors.
|
||||
// What's more, we distribute the nodes across only two racks. Because of that,
|
||||
// we won't be able to enable `rf_rack_valid_keyspaces`. That would require
|
||||
// increasing the number of racks to three, as well as implementing scylladb/scylladb#23426.
|
||||
cfg.db_config->rf_rack_valid_keyspaces.set(false);
|
||||
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal();
|
||||
|
||||
@@ -2871,7 +2901,7 @@ SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
|
||||
BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal);
|
||||
}
|
||||
}
|
||||
}, tablet_cql_test_config()).get();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// This test verifies that per-table tablet count is adjusted
|
||||
@@ -3174,6 +3204,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load)
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) {
|
||||
cql_test_config cfg{};
|
||||
// This test purposefully uses just one rack, which means that we cannot enable
|
||||
// the `rf_rack_valid_keyspaces` configuration option because we won't be able to create
|
||||
// a keyspace with RF > 1.
|
||||
cfg.db_config->rf_rack_valid_keyspaces.set(false);
|
||||
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
const int rf = 2;
|
||||
const int n_racks = 1;
|
||||
@@ -3200,7 +3236,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack)
|
||||
};
|
||||
|
||||
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
|
||||
}).get();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Verify merge can proceed with multiple racks and RF=#racks
|
||||
@@ -3243,6 +3279,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_multiple_rack
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) {
|
||||
cql_test_config cfg{};
|
||||
// The scenario this test addresses cannot happen with `rf_rack_valid_keyspaces` set to true.
|
||||
//
|
||||
// Among the tablet replicas for a given tablet, there CANNOT be two nodes from the same rack.
|
||||
// After the decommission of B, both tablets will reside on ALL other nodes, which implies that
|
||||
// they're on pairwise distinct racks. However, since B was taking part in replication of the
|
||||
// tablets, it must've been among the replicas of at least one of the tablets and, for the very
|
||||
// same reason, it must be on a separate rack. Hence, all nodes must reside on pairwise distinct racks.
|
||||
//
|
||||
// So, we if want to keep the current number of nodes and RF, we must have 4 racks. But we cannot
|
||||
// do that until we've implemented scylladb/scylladb#23737. Besides, the test seems to rely on
|
||||
// using just one rack, which makes it incompatible with `rf_rack_valid_keyspaces: true` anyway.
|
||||
cfg.db_config->rf_rack_valid_keyspaces.set(false);
|
||||
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
const int rf = 3;
|
||||
const int n_racks = 1;
|
||||
@@ -3292,7 +3342,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission)
|
||||
};
|
||||
|
||||
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
|
||||
}).get();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
|
||||
@@ -3300,6 +3350,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
|
||||
topology_builder topo(e);
|
||||
|
||||
topo.add_node(node_state::normal, 2);
|
||||
topo.start_new_rack();
|
||||
topo.add_node(node_state::normal, 2);
|
||||
|
||||
const size_t initial_tablets = 2;
|
||||
|
||||
@@ -14,11 +14,12 @@ from test.pylib.internal_types import HostID
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id
|
||||
from test.cluster.mv.tablets.test_mv_tablets import get_tablet_replicas
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.cluster.util import new_test_keyspace, wait_for
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,6 +45,14 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
|
||||
|
||||
async def replicas_balanced():
|
||||
base_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "test", 0)]
|
||||
view_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "tv", 0)]
|
||||
return len(set(base_replicas) & set(view_replicas)) == 0 or None
|
||||
# There's 4 nodes and 4 tablets, so even if the initial placement is not balanced,
|
||||
# each node should get 1 replica after some time.
|
||||
await wait_for(replicas_balanced, time.time() + 60)
|
||||
|
||||
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
@@ -400,24 +400,18 @@ class topo:
|
||||
self.racks = racks
|
||||
self.dcs = dcs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 3, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
])
|
||||
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology):
|
||||
'''Check that restoring of a cluster with stream scopes works'''
|
||||
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, s3_server=None):
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
|
||||
|
||||
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
|
||||
if s3_server:
|
||||
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
|
||||
cfg['object_storage_endpoints'] = objconf
|
||||
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks')
|
||||
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
|
||||
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300 }
|
||||
cmd = [ '--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug' ]
|
||||
servers = []
|
||||
host_ids = {}
|
||||
|
||||
for s in range(topology.nodes):
|
||||
dc = f'dc{s % topology.dcs}'
|
||||
rack = f'rack{s % topology.racks}'
|
||||
@@ -426,13 +420,12 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
servers.append(s)
|
||||
host_ids[s.server_id] = await manager.get_host_id(s.server_id)
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
return servers,host_ids
|
||||
|
||||
def create_dataset(manager, ks, cf, topology, logger):
|
||||
cql = manager.get_cql()
|
||||
logger.info(f'Create keyspace, rf={topology.rf}')
|
||||
keys = range(256)
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy', 'replication_factor': f'{topology.rf}'})
|
||||
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
|
||||
|
||||
@@ -441,9 +434,11 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
for k in keys:
|
||||
cql.execute(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES ({k}, '{k}');")
|
||||
|
||||
snap_name = unique_name('backup_')
|
||||
return schema, keys, replication_opts
|
||||
|
||||
async def take_snapshot(ks, servers, manager, logger):
|
||||
logger.info(f'Take snapshot and collect sstables lists')
|
||||
snap_name = unique_name('backup_')
|
||||
sstables = []
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
@@ -454,27 +449,9 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}')
|
||||
sstables += tocs
|
||||
|
||||
logger.info(f'Backup to {snap_name}')
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
async def do_backup(s):
|
||||
tid = await manager.api.backup(s.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(s.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
await asyncio.gather(*(do_backup(s) for s in servers))
|
||||
|
||||
logger.info(f'Re-initialize keyspace')
|
||||
cql.execute(f'DROP KEYSPACE {ks}')
|
||||
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
|
||||
cql.execute(schema)
|
||||
|
||||
logger.info(f'Restore')
|
||||
async def do_restore(s, toc_names, scope):
|
||||
logger.info(f'Restore {s.ip_addr} with {toc_names}, scope={scope}')
|
||||
tid = await manager.api.restore(s.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names, scope)
|
||||
status = await manager.api.wait_task(s.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
return snap_name,sstables
|
||||
|
||||
def compute_scope(topology, servers):
|
||||
if topology.dcs > 1:
|
||||
scope = 'dc'
|
||||
r_servers = servers[:topology.dcs]
|
||||
@@ -485,10 +462,11 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
scope = 'node'
|
||||
r_servers = servers
|
||||
|
||||
await asyncio.gather(*(do_restore(s, sstables, scope) for s in r_servers))
|
||||
return scope,r_servers
|
||||
|
||||
async def check_data_is_back(manager, logger, cql, ks, cf, keys, servers, topology, r_servers, host_ids, scope):
|
||||
logger.info(f'Check the data is back')
|
||||
async def collect_mutations(server, key):
|
||||
async def collect_mutations(server):
|
||||
host = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 30)
|
||||
await read_barrier(manager.api, server.ip_addr) # scylladb/scylladb#18199
|
||||
ret = {}
|
||||
@@ -498,7 +476,7 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
ret[frag.pk].append({'mutation_source': frag.mutation_source, 'partition_region': frag.partition_region, 'node': server.ip_addr})
|
||||
return ret
|
||||
|
||||
by_node = await asyncio.gather(*(collect_mutations(s, k) for s in servers))
|
||||
by_node = await asyncio.gather(*(collect_mutations(s) for s in servers))
|
||||
mutations = {}
|
||||
for node_frags in by_node:
|
||||
for pk in node_frags:
|
||||
@@ -507,7 +485,6 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
mutations[pk].append(node_frags[pk])
|
||||
|
||||
for k in random.sample(keys, 17):
|
||||
real_rf = 0
|
||||
if not k in mutations:
|
||||
logger.info(f'{k} not found in mutations')
|
||||
logger.info(f'Mutations: {mutations}')
|
||||
@@ -531,6 +508,58 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
|
||||
logger.info(f'{s.ip_addr} streamed to {streamed_to}, expected {scope_nodes}')
|
||||
assert streamed_to == scope_nodes
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 3, nodes = 6, racks = 3, dcs = 1), True),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
])
|
||||
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology_rf_validity):
|
||||
'''Check that restoring of a cluster with stream scopes works'''
|
||||
|
||||
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
||||
|
||||
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, s3_server)
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
|
||||
logger.info(f'Backup to {snap_name}')
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
async def do_backup(s):
|
||||
tid = await manager.api.backup(s.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(s.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
await asyncio.gather(*(do_backup(s) for s in servers))
|
||||
|
||||
logger.info(f'Re-initialize keyspace')
|
||||
cql.execute(f'DROP KEYSPACE {ks}')
|
||||
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
|
||||
cql.execute(schema)
|
||||
|
||||
logger.info(f'Restore')
|
||||
async def do_restore(s, toc_names, scope):
|
||||
logger.info(f'Restore {s.ip_addr} with {toc_names}, scope={scope}')
|
||||
tid = await manager.api.restore(s.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names, scope)
|
||||
status = await manager.api.wait_task(s.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
scope,r_servers = compute_scope(topology, servers)
|
||||
|
||||
await asyncio.gather(*(do_restore(s, sstables, scope) for s in r_servers))
|
||||
|
||||
await check_data_is_back(manager, logger, cql, ks, cf, keys, servers, topology, r_servers, host_ids, scope)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restore_with_non_existing_sstable(manager: ManagerClient, s3_server):
|
||||
|
||||
@@ -474,7 +474,7 @@ async def add_new_node(manager: ManagerClient,
|
||||
yield
|
||||
|
||||
LOGGER.info("Add a new node to the cluster")
|
||||
await manager.server_add(timeout=TOPOLOGY_TIMEOUT)
|
||||
await manager.server_add(config={"rf_rack_valid_keyspaces": False}, timeout=TOPOLOGY_TIMEOUT)
|
||||
|
||||
yield
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
|
||||
async def four_nodes_cluster(manager: ManagerClient) -> None:
|
||||
LOGGER.info("Booting initial 4-node cluster.")
|
||||
for _ in range(4):
|
||||
server = await manager.server_add()
|
||||
server = await manager.server_add(config={"rf_rack_valid_keyspaces": False})
|
||||
await manager.api.enable_injection(
|
||||
node_ip=server.ip_addr,
|
||||
injection="raft_server_set_snapshot_thresholds",
|
||||
@@ -93,6 +93,8 @@ async def test_random_failures(manager: ManagerClient,
|
||||
TESTS_COUNT, TESTS_SHUFFLE_SEED, ERROR_INJECTIONS_COUNT, CLUSTER_EVENTS_COUNT,
|
||||
)
|
||||
|
||||
rf_rack_cfg = {"rf_rack_valid_keyspaces": False}
|
||||
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
await table.insert_seq()
|
||||
|
||||
@@ -116,7 +118,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
)
|
||||
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
|
||||
coordinator_log_mark = await coordinator_log.mark()
|
||||
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED)
|
||||
s_info = await manager.server_add(config=rf_rack_cfg, expected_server_up_state=ServerUpState.PROCESS_STARTED)
|
||||
await coordinator_log.wait_for(
|
||||
pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting",
|
||||
from_mark=coordinator_log_mark,
|
||||
@@ -128,7 +130,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
)
|
||||
else:
|
||||
s_info = await manager.server_add(
|
||||
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]},
|
||||
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]} | rf_rack_cfg,
|
||||
expected_server_up_state=ServerUpState.PROCESS_STARTED,
|
||||
)
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ extra_scylla_config_options:
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
enable_user_defined_functions: False
|
||||
rf_rack_valid_keyspaces: True
|
||||
tablets_mode_for_new_keyspaces: enabled
|
||||
run_first:
|
||||
- test_raft_recovery_stuck
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.rest_client import inject_error, read_barrier
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import wait_for_cdc_generations_publishing, \
|
||||
@@ -14,6 +14,7 @@ from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
@@ -64,6 +65,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after second clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 2 and first_gen_id in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
second_gen_id = max(gen_ids)
|
||||
|
||||
@@ -75,6 +77,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after third clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 1 and first_gen_id not in gen_ids and second_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
third_gen_id = max(gen_ids)
|
||||
|
||||
@@ -85,6 +88,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after fourth clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 1 and third_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
|
||||
@@ -140,6 +144,7 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
|
||||
mark = await log_file1.mark()
|
||||
gen_ids = await get_gen_ids()
|
||||
assert len(gen_ids) == 2 and first_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])
|
||||
|
||||
# Allow the CDC generation publisher to finish its job. One generation should remain.
|
||||
@@ -147,4 +152,5 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
|
||||
await log_file1.wait_for(f"CDC generation publisher fiber has nothing to do. Sleeping.", mark)
|
||||
gen_ids = await get_gen_ids()
|
||||
assert len(gen_ids) == 1
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])
|
||||
|
||||
@@ -82,7 +82,7 @@ async def test_multiple_unpublished_cdc_generations(request, manager: ManagerCli
|
||||
"""Test that the CDC generation publisher works correctly when there is more than one unpublished CDC generation."""
|
||||
query_gen_timestamps = SimpleStatement(
|
||||
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
|
||||
consistency_level = ConsistencyLevel.ONE)
|
||||
consistency_level = ConsistencyLevel.ALL)
|
||||
|
||||
logger.info("Bootstrapping first node")
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
@@ -20,7 +20,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
|
||||
from test.cluster.util import reconnect_driver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
|
||||
@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
|
||||
# IP-s before they are send back to servers[1] and servers[2],
|
||||
# and the mentioned above code is not exercised by this test.
|
||||
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
|
||||
# sleep_before_start_gossiping injections are needed to reproduce #22777
|
||||
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_start(servers[1].server_id)
|
||||
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
|
||||
if build_mode != 'release':
|
||||
|
||||
@@ -32,7 +32,7 @@ N_SERVERS = 2
|
||||
@pytest.fixture
|
||||
async def two_nodes_cluster(manager: ManagerClient) -> list[ServerNum]:
|
||||
logger.info(f"Booting initial 2-nodes cluster")
|
||||
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS)]
|
||||
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS, auto_rack_dc="dc1")]
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
return servers
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureReq
|
||||
|
||||
"""
|
||||
logger.info("Creating a new cluster")
|
||||
srvs = await manager.servers_add(3)
|
||||
srvs = await manager.servers_add(3, auto_rack_dc="dc1")
|
||||
cql, _ = await manager.get_ready_cql(srvs)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") as ks:
|
||||
|
||||
@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -39,7 +39,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
"""
|
||||
cmdline = ["--hinted-handoff-enabled", "0", "--cache-hit-rate-read-balancing", "0", "--logger-log-level", "debug_error_injection=trace"]
|
||||
|
||||
nodes = await manager.servers_add(3, cmdline=cmdline)
|
||||
nodes = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
node1, node2, node3 = nodes
|
||||
|
||||
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
|
||||
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
|
||||
|
||||
def execute_with_tracing(cql, statement, *args, **kwargs):
|
||||
kwargs['trace'] = True
|
||||
query_result = cql.execute(statement, *args, **kwargs)
|
||||
|
||||
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
|
||||
page_traces = []
|
||||
for trace in tracing:
|
||||
trace_events = []
|
||||
for event in trace.events:
|
||||
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
|
||||
page_traces.append("\n".join(trace_events))
|
||||
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
|
||||
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
|
||||
" WITH speculative_retry = 'NONE'"
|
||||
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
|
||||
" AND compaction = {'class': 'NullCompactionStrategy'}")
|
||||
|
||||
for write_statement, delete_statement in statement_pairs:
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
|
||||
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks))
|
||||
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
|
||||
await manager.api.disable_injection(node3.ip_addr, "database_apply")
|
||||
|
||||
def check_data(host, data):
|
||||
|
||||
@@ -64,12 +64,15 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
cfg = {'tablets_mode_for_new_keyspaces' : 'enabled' if tablets_enabled else 'disabled'}
|
||||
|
||||
logger.info("Bootstrapping first two nodes")
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
servers = await manager.servers_add(2, config=cfg, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
# The third node is started as the last one, so we can be sure that is has
|
||||
# the latest topology version
|
||||
logger.info("Bootstrapping the last node")
|
||||
servers += [await manager.server_add(config=cfg)]
|
||||
servers += [await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r3"})]
|
||||
|
||||
# Disable load balancer as it might bump topology version, undoing the decrement below.
|
||||
# This should be done before adding the last two servers,
|
||||
@@ -123,9 +126,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_fence_hints(request, manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster with three nodes")
|
||||
s0 = await manager.server_add(config={
|
||||
'error_injections_at_startup': ['decrease_hints_flush_period']
|
||||
}, cmdline=['--logger-log-level', 'hints_manager=trace'])
|
||||
s0 = await manager.server_add(
|
||||
config={'error_injections_at_startup': ['decrease_hints_flush_period']},
|
||||
cmdline=['--logger-log-level', 'hints_manager=trace'],
|
||||
property_file={"dc": "dc1", "rack": "r1"})
|
||||
|
||||
# Disable load balancer as it might bump topology version, potentially creating a race condition
|
||||
# with read modify write below.
|
||||
@@ -134,7 +138,10 @@ async def test_fence_hints(request, manager: ManagerClient):
|
||||
# which the test relies on.
|
||||
await manager.api.disable_tablet_balancing(s0.ip_addr)
|
||||
|
||||
[s1, s2] = await manager.servers_add(2)
|
||||
[s1, s2] = await manager.servers_add(2, property_file=[
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"}
|
||||
])
|
||||
|
||||
logger.info(f'Creating test table')
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
|
||||
|
||||
@@ -10,7 +10,7 @@ import uuid
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.prepare_3_nodes_cluster
|
||||
pytestmark = pytest.mark.prepare_3_racks_cluster
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -21,8 +21,8 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
|
||||
since ignore node is permanent now and B is removed from the quorum early so it is enough to
|
||||
have two live nodes for the quorum.
|
||||
"""
|
||||
await manager.servers_add(2)
|
||||
servers = await manager.running_servers()
|
||||
servers += await manager.servers_add(2, property_file=[servers[1].property_file(), servers[2].property_file()])
|
||||
await manager.server_stop_gracefully(servers[3].server_id)
|
||||
await manager.server_stop_gracefully(servers[4].server_id)
|
||||
# test that non existing uuid is rejected
|
||||
@@ -37,6 +37,6 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
|
||||
# is 2
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(start=False, replace_cfg=replace_cfg)
|
||||
await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[2].property_file())
|
||||
|
||||
|
||||
|
||||
@@ -58,10 +58,10 @@ async def test_putget_2dc_with_rf(
|
||||
table_name = "test_table_name"
|
||||
columns = [Column("name", TextType), Column("value", TextType)]
|
||||
logger.info("Create two servers in different DC's")
|
||||
for i in nodes_list:
|
||||
for rack_idx, dc_idx in enumerate(nodes_list):
|
||||
s_info = await manager.server_add(
|
||||
config=CONFIG,
|
||||
property_file={"dc": f"dc{i}", "rack": "myrack"},
|
||||
property_file={"dc": f"dc{dc_idx}", "rack": f"rack{rack_idx}"},
|
||||
)
|
||||
logger.info(s_info)
|
||||
conn = manager.get_cql()
|
||||
|
||||
@@ -23,14 +23,15 @@ async def test_not_enough_token_owners(manager: ManagerClient):
|
||||
"""
|
||||
logging.info('Trying to add a zero-token server as the first server in the cluster')
|
||||
await manager.server_add(config={'join_ring': False},
|
||||
property_file={"dc": "dc1", "rack": "rz"},
|
||||
expected_error='Cannot start the first node in the cluster as zero-token')
|
||||
|
||||
logging.info('Adding the first server')
|
||||
server_a = await manager.server_add()
|
||||
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
|
||||
|
||||
logging.info('Adding two zero-token servers')
|
||||
# The second server is needed only to preserve the Raft majority.
|
||||
server_b = (await manager.servers_add(2, config={'join_ring': False}))[0]
|
||||
server_b = (await manager.servers_add(2, config={'join_ring': False}, property_file={"dc": "dc1", "rack": "rz"}))[0]
|
||||
|
||||
logging.info(f'Trying to decommission the only token owner {server_a}')
|
||||
await manager.decommission_node(server_a.server_id,
|
||||
@@ -47,7 +48,7 @@ async def test_not_enough_token_owners(manager: ManagerClient):
|
||||
await manager.server_start(server_a.server_id)
|
||||
|
||||
logging.info('Adding a normal server')
|
||||
await manager.server_add()
|
||||
await manager.server_add(property_file={"dc": "dc1", "rack": "r2"})
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user