Compare commits
2 Commits
copilot/ad
...
copilot/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e806cb3f7 | ||
|
|
f267af38bd |
2
.github/workflows/trigger-scylla-ci.yaml
vendored
2
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
|
||||
JENKINS_URL: "https://jenkins.scylladb.com"
|
||||
run: |
|
||||
PR_NUMBER=${{ github.event.issue.number || github.event.pull_request.number }}
|
||||
PR_NUMBER=${{ github.event.issue.number }}
|
||||
PR_REPO_NAME=${{ github.event.repository.full_name }}
|
||||
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
|
||||
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v
|
||||
|
||||
@@ -244,7 +244,10 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
|
||||
|
||||
// Check if two JSON-encoded values match with the CONTAINS relation
|
||||
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
|
||||
if (!v1) {
|
||||
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
if (!v2.IsObject() || v2.MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
const auto& kv1 = *v1->MemberBegin();
|
||||
@@ -618,7 +621,7 @@ conditional_operator_type get_conditional_operator(const rjson::value& req) {
|
||||
// Check if the existing values of the item (previous_item) match the
|
||||
// conditions given by the Expected and ConditionalOperator parameters
|
||||
// (if they exist) in the request (an UpdateItem, PutItem or DeleteItem).
|
||||
// This function can throw a ValidationException API error if there
|
||||
// This function can throw an ValidationException API error if there
|
||||
// are errors in the format of the condition itself.
|
||||
bool verify_expected(const rjson::value& req, const rjson::value* previous_item) {
|
||||
const rjson::value* expected = rjson::find(req, "Expected");
|
||||
|
||||
@@ -53,7 +53,9 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
|
||||
}
|
||||
|
||||
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
|
||||
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
|
||||
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
|
||||
// by using division with modulo instead of addition before division
|
||||
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
|
||||
|
||||
if (is_quorum) {
|
||||
half_units *= 2;
|
||||
|
||||
@@ -237,7 +237,7 @@ static void validate_is_object(const rjson::value& value, const char* caller) {
|
||||
}
|
||||
|
||||
// This function assumes the given value is an object and returns requested member value.
|
||||
// If it is not possible, an api_error::validation is thrown.
|
||||
// If it is not possible an api_error::validation is thrown.
|
||||
static const rjson::value& get_member(const rjson::value& obj, const char* member_name, const char* caller) {
|
||||
validate_is_object(obj, caller);
|
||||
const rjson::value* ret = rjson::find(obj, member_name);
|
||||
@@ -249,7 +249,7 @@ static const rjson::value& get_member(const rjson::value& obj, const char* membe
|
||||
|
||||
|
||||
// This function assumes the given value is an object with a single member, and returns this member.
|
||||
// In case the requirements are not met, an api_error::validation is thrown.
|
||||
// In case the requirements are not met an api_error::validation is thrown.
|
||||
static const rjson::value::Member& get_single_member(const rjson::value& v, const char* caller) {
|
||||
if (!v.IsObject() || v.MemberCount() != 1) {
|
||||
throw api_error::validation(format("{}: expected an object with a single member.", caller));
|
||||
@@ -682,7 +682,7 @@ static std::optional<int> get_int_attribute(const rjson::value& value, std::stri
|
||||
}
|
||||
|
||||
// Sets a KeySchema object inside the given JSON parent describing the key
|
||||
// attributes of the given schema as being either HASH or RANGE keys.
|
||||
// attributes of the the given schema as being either HASH or RANGE keys.
|
||||
// Additionally, adds to a given map mappings between the key attribute
|
||||
// names and their type (as a DynamoDB type string).
|
||||
void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>* attribute_types, const std::map<sstring, sstring> *tags) {
|
||||
@@ -834,11 +834,13 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
// A race condition is possible: if a DescribeTable request arrives on a different shard before
|
||||
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
|
||||
// 1. Both calculations will cache their results with an expiry time
|
||||
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
|
||||
// 3. Even if expiry times match, different shards may briefly return different table sizes
|
||||
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
|
||||
// exact precision for DescribeTable size information
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
@@ -916,7 +918,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
sstring index_name = cf_name.substr(delim_it + 1);
|
||||
rjson::add(view_entry, "IndexName", rjson::from_string(index_name));
|
||||
rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name));
|
||||
// Add index's KeySchema and collect types for AttributeDefinitions:
|
||||
// Add indexes's KeySchema and collect types for AttributeDefinitions:
|
||||
executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr));
|
||||
// Add projection type
|
||||
rjson::value projection = rjson::empty_object();
|
||||
@@ -2435,7 +2437,7 @@ std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table
|
||||
// case, this function simply won't be called for this attribute.)
|
||||
//
|
||||
// This function checks if the given attribute update is an update to some
|
||||
// GSI's key, and if the value is unsuitable, an api_error::validation is
|
||||
// GSI's key, and if the value is unsuitable, a api_error::validation is
|
||||
// thrown. The checking here is similar to the checking done in
|
||||
// get_key_from_typed_value() for the base table's key columns.
|
||||
//
|
||||
@@ -3548,7 +3550,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
return true;
|
||||
}
|
||||
|
||||
// Add a path to an attribute_path_map. Throws a validation error if the path
|
||||
// Add a path to a attribute_path_map. Throws a validation error if the path
|
||||
// "overlaps" with one already in the filter (one is a sub-path of the other)
|
||||
// or "conflicts" with it (both a member and index is requested).
|
||||
template<typename T>
|
||||
|
||||
@@ -50,7 +50,7 @@ public:
|
||||
_operators.emplace_back(i);
|
||||
check_depth_limit();
|
||||
}
|
||||
void add_dot(std::string name) {
|
||||
void add_dot(std::string(name)) {
|
||||
_operators.emplace_back(std::move(name));
|
||||
check_depth_limit();
|
||||
}
|
||||
@@ -85,7 +85,7 @@ struct constant {
|
||||
}
|
||||
};
|
||||
|
||||
// "value" is a value used in the right hand side of an assignment
|
||||
// "value" is is a value used in the right hand side of an assignment
|
||||
// expression, "SET a = ...". It can be a constant (a reference to a value
|
||||
// included in the request, e.g., ":val"), a path to an attribute from the
|
||||
// existing item (e.g., "a.b[3].c"), or a function of other such values.
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
// The supported primitive conditions are:
|
||||
// 1. Binary operators - v1 OP v2, where OP is =, <>, <, <=, >, or >= and
|
||||
// v1 and v2 are values - from the item (an attribute path), the query
|
||||
// (a ":val" reference), or a function of the above (only the size()
|
||||
// (a ":val" reference), or a function of the the above (only the size()
|
||||
// function is supported).
|
||||
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
|
||||
// 3. N-ary operator - v1 IN ( v2, v3, ... )
|
||||
|
||||
@@ -55,7 +55,7 @@ partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
|
||||
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
|
||||
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
|
||||
|
||||
// If v encodes a number (i.e., it is a {"N": [...]}), returns an object representing it. Otherwise,
|
||||
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
|
||||
// raises ValidationException with diagnostic.
|
||||
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic);
|
||||
|
||||
|
||||
@@ -141,7 +141,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
|
||||
|
||||
// expiration_service is a sharded service responsible for cleaning up expired
|
||||
// items in all tables with per-item expiration enabled. Currently, this means
|
||||
// Alternator tables with TTL configured via an UpdateTimeToLive request.
|
||||
// Alternator tables with TTL configured via a UpdateTimeToLive request.
|
||||
//
|
||||
// Here is a brief overview of how the expiration service works:
|
||||
//
|
||||
@@ -593,7 +593,7 @@ static future<> scan_table_ranges(
|
||||
if (retries >= 10) {
|
||||
// Don't get stuck forever asking the same page, maybe there's
|
||||
// a bug or a real problem in several replicas. Give up on
|
||||
// this scan and retry the scan from a random position later,
|
||||
// this scan an retry the scan from a random position later,
|
||||
// in the next scan period.
|
||||
throw runtime_exception("scanner thread failed after too many timeouts for the same page");
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ namespace alternator {
|
||||
|
||||
// expiration_service is a sharded service responsible for cleaning up expired
|
||||
// items in all tables with per-item expiration enabled. Currently, this means
|
||||
// Alternator tables with TTL configured via an UpdateTimeToLive request.
|
||||
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
|
||||
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
|
||||
public:
|
||||
// Object holding per-shard statistics related to the expiration service.
|
||||
@@ -52,7 +52,7 @@ private:
|
||||
data_dictionary::database _db;
|
||||
service::storage_proxy& _proxy;
|
||||
gms::gossiper& _gossiper;
|
||||
// _end is set by start(), and resolves when the background service
|
||||
// _end is set by start(), and resolves when the the background service
|
||||
// started by it ends. To ask the background service to end, _abort_source
|
||||
// should be triggered. stop() below uses both _abort_source and _end.
|
||||
std::optional<future<>> _end;
|
||||
|
||||
@@ -814,7 +814,8 @@ generation_service::generation_service(
|
||||
config cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f,
|
||||
replica::database& db)
|
||||
replica::database& db,
|
||||
std::function<bool()> raft_topology_change_enabled)
|
||||
: _cfg(std::move(cfg))
|
||||
, _gossiper(g)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
@@ -823,6 +824,7 @@ generation_service::generation_service(
|
||||
, _token_metadata(stm)
|
||||
, _feature_service(f)
|
||||
, _db(db)
|
||||
, _raft_topology_change_enabled(std::move(raft_topology_change_enabled))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -876,7 +878,16 @@ future<> generation_service::on_join(gms::inet_address ep, locator::host_id id,
|
||||
future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
|
||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||
|
||||
return make_ready_future<>();
|
||||
if (_raft_topology_change_enabled()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) {
|
||||
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
|
||||
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
|
||||
|
||||
return legacy_handle_cdc_generation(gen_id);
|
||||
});
|
||||
}
|
||||
|
||||
future<> generation_service::check_and_repair_cdc_streams() {
|
||||
|
||||
@@ -79,12 +79,17 @@ private:
|
||||
std::optional<cdc::generation_id> _gen_id;
|
||||
future<> _cdc_streams_rewrite_complete = make_ready_future<>();
|
||||
|
||||
/* Returns true if raft topology changes are enabled.
|
||||
* Can only be called from shard 0.
|
||||
*/
|
||||
std::function<bool()> _raft_topology_change_enabled;
|
||||
public:
|
||||
generation_service(config cfg, gms::gossiper&,
|
||||
sharded<db::system_distributed_keyspace>&,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
abort_source&, const locator::shared_token_metadata&,
|
||||
gms::feature_service&, replica::database& db);
|
||||
gms::feature_service&, replica::database& db,
|
||||
std::function<bool()> raft_topology_change_enabled);
|
||||
|
||||
future<> stop();
|
||||
~generation_service();
|
||||
|
||||
47
configure.py
47
configure.py
@@ -730,6 +730,28 @@ vector_search_tests = set([
|
||||
'test/vector_search/rescoring_test'
|
||||
])
|
||||
|
||||
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
|
||||
vector_search_validator_deps = set([
|
||||
'test/vector_search_validator/build-validator',
|
||||
'test/vector_search_validator/Cargo.toml',
|
||||
'test/vector_search_validator/crates/validator/Cargo.toml',
|
||||
'test/vector_search_validator/crates/validator/src/main.rs',
|
||||
'test/vector_search_validator/crates/validator-scylla/Cargo.toml',
|
||||
'test/vector_search_validator/crates/validator-scylla/src/lib.rs',
|
||||
'test/vector_search_validator/crates/validator-scylla/src/cql.rs',
|
||||
])
|
||||
|
||||
vector_store_bin = 'vector-search-validator/bin/vector-store'
|
||||
vector_store_deps = set([
|
||||
'test/vector_search_validator/build-env',
|
||||
'test/vector_search_validator/build-vector-store',
|
||||
])
|
||||
|
||||
vector_search_validator_bins = set([
|
||||
vector_search_validator_bin,
|
||||
vector_store_bin,
|
||||
])
|
||||
|
||||
wasms = set([
|
||||
'wasm/return_input.wat',
|
||||
'wasm/test_complex_null_values.wat',
|
||||
@@ -763,7 +785,7 @@ other = set([
|
||||
'iotune',
|
||||
])
|
||||
|
||||
all_artifacts = apps | cpp_apps | tests | other | wasms
|
||||
all_artifacts = apps | cpp_apps | tests | other | wasms | vector_search_validator_bins
|
||||
|
||||
arg_parser = argparse.ArgumentParser('Configure scylla', add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
arg_parser.add_argument('--out', dest='buildfile', action='store', default='build.ninja',
|
||||
@@ -2563,10 +2585,11 @@ def write_build_file(f,
|
||||
description = RUST_LIB $out
|
||||
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
|
||||
f.write(
|
||||
'build {mode}-build: phony {artifacts} {wasms}\n'.format(
|
||||
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
|
||||
mode=mode,
|
||||
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms)]),
|
||||
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms - vector_search_validator_bins)]),
|
||||
wasms = str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & wasms)]),
|
||||
vector_search_validator_bins=str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & vector_search_validator_bins)]),
|
||||
)
|
||||
)
|
||||
if profile_recipe := modes[mode].get('profile_recipe'):
|
||||
@@ -2596,7 +2619,7 @@ def write_build_file(f,
|
||||
continue
|
||||
profile_dep = modes[mode].get('profile_target', "")
|
||||
|
||||
if binary in other or binary in wasms:
|
||||
if binary in other or binary in wasms or binary in vector_search_validator_bins:
|
||||
continue
|
||||
srcs = deps[binary]
|
||||
# 'scylla'
|
||||
@@ -2707,10 +2730,11 @@ def write_build_file(f,
|
||||
)
|
||||
|
||||
f.write(
|
||||
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms}\n'.format(
|
||||
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms} {vector_search_validator_bins} \n'.format(
|
||||
mode=mode,
|
||||
test_executables=' '.join(['$builddir/{}/{}'.format(mode, binary) for binary in sorted(tests)]),
|
||||
wasms=' '.join([f'$builddir/{binary}' for binary in sorted(wasms)]),
|
||||
vector_search_validator_bins=' '.join([f'$builddir/{binary}' for binary in sorted(vector_search_validator_bins)]),
|
||||
)
|
||||
)
|
||||
f.write(
|
||||
@@ -2878,6 +2902,19 @@ def write_build_file(f,
|
||||
'build compiler-training: phony {}\n'.format(' '.join(['{mode}-compiler-training'.format(mode=mode) for mode in default_modes]))
|
||||
)
|
||||
|
||||
f.write(textwrap.dedent(f'''\
|
||||
rule build-vector-search-validator
|
||||
command = test/vector_search_validator/build-validator $builddir
|
||||
rule build-vector-store
|
||||
command = test/vector_search_validator/build-vector-store $builddir
|
||||
'''))
|
||||
f.write(
|
||||
'build $builddir/{vector_search_validator_bin}: build-vector-search-validator {}\n'.format(' '.join([dep for dep in sorted(vector_search_validator_deps)]), vector_search_validator_bin=vector_search_validator_bin)
|
||||
)
|
||||
f.write(
|
||||
'build $builddir/{vector_store_bin}: build-vector-store {}\n'.format(' '.join([dep for dep in sorted(vector_store_deps)]), vector_store_bin=vector_store_bin)
|
||||
)
|
||||
|
||||
f.write(textwrap.dedent(f'''\
|
||||
build dist-unified-tar: phony {' '.join([f'$builddir/{mode}/dist/tar/{scylla_product}-unified-{scylla_version}-{scylla_release}.{arch}.tar.gz' for mode in default_modes])}
|
||||
build dist-unified: phony dist-unified-tar
|
||||
|
||||
12
cql3/Cql.g
12
cql3/Cql.g
@@ -389,10 +389,8 @@ selectStatement returns [std::unique_ptr<raw::select_statement> expr]
|
||||
bool is_ann_ordering = false;
|
||||
}
|
||||
: K_SELECT (
|
||||
( (K_JSON K_DISTINCT)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
|
||||
| (K_JSON selectClause K_FROM)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
|
||||
)?
|
||||
( (K_DISTINCT selectClause K_FROM)=> K_DISTINCT { is_distinct = true; } )?
|
||||
( K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; } )?
|
||||
( K_DISTINCT { is_distinct = true; } )?
|
||||
sclause=selectClause
|
||||
)
|
||||
K_FROM (
|
||||
@@ -427,7 +425,6 @@ selector returns [shared_ptr<raw_selector> s]
|
||||
|
||||
unaliasedSelector returns [uexpression tmp]
|
||||
: ( c=cident { tmp = unresolved_identifier{std::move(c)}; }
|
||||
| v=value { tmp = std::move(v); }
|
||||
| K_COUNT '(' countArgument ')' { tmp = make_count_rows_function_expression(); }
|
||||
| K_WRITETIME '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::writetime,
|
||||
unresolved_identifier{std::move(c)}}; }
|
||||
@@ -458,11 +455,14 @@ vectorSimilarityArgs returns [std::vector<expression> a]
|
||||
|
||||
vectorSimilarityArg returns [uexpression a]
|
||||
: s=unaliasedSelector { a = std::move(s); }
|
||||
| v=value { a = std::move(v); }
|
||||
;
|
||||
|
||||
countArgument
|
||||
: '*'
|
||||
/* COUNT(1) is also allowed, it is recognized via the general function(args) path */
|
||||
| i=INTEGER { if (i->getText() != "1") {
|
||||
add_recognition_error("Only COUNT(1) is supported, got COUNT(" + i->getText() + ")");
|
||||
} }
|
||||
;
|
||||
|
||||
whereClause returns [uexpression clause]
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#include "expr-utils.hh"
|
||||
#include "evaluate.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/functions/aggregate_fcts.hh"
|
||||
#include "cql3/functions/castas_fcts.hh"
|
||||
#include "cql3/functions/scalar_function.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
@@ -1048,47 +1047,8 @@ prepare_function_args_for_type_inference(std::span<const expression> args, data_
|
||||
return partially_prepared_args;
|
||||
}
|
||||
|
||||
// Special case for count(1) - recognize it as the countRows() function. Note it is quite
|
||||
// artificial and we might relax it to the more general count(expression) later.
|
||||
static
|
||||
std::optional<expression>
|
||||
try_prepare_count_rows(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
|
||||
return std::visit(overloaded_functor{
|
||||
[&] (const functions::function_name& name) -> std::optional<expression> {
|
||||
auto native_name = name;
|
||||
if (!native_name.has_keyspace()) {
|
||||
native_name = name.as_native_function();
|
||||
}
|
||||
// Collapse count(1) into countRows()
|
||||
if (native_name == functions::function_name::native_function("count")) {
|
||||
if (fc.args.size() == 1) {
|
||||
if (auto uc_arg = expr::as_if<expr::untyped_constant>(&fc.args[0])) {
|
||||
if (uc_arg->partial_type == expr::untyped_constant::type_class::integer
|
||||
&& uc_arg->raw_text == "1") {
|
||||
return expr::function_call{
|
||||
.func = functions::aggregate_fcts::make_count_rows_function(),
|
||||
.args = {},
|
||||
};
|
||||
} else {
|
||||
throw exceptions::invalid_request_exception(format("count() expects a column or the literal 1 as an argument", fc.args[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
},
|
||||
[] (const shared_ptr<functions::function>&) -> std::optional<expression> {
|
||||
// Already prepared, nothing to do
|
||||
return std::nullopt;
|
||||
},
|
||||
}, fc.func);
|
||||
}
|
||||
|
||||
std::optional<expression>
|
||||
prepare_function_call(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
|
||||
if (auto prepared = try_prepare_count_rows(fc, db, keyspace, schema_opt, receiver)) {
|
||||
return prepared;
|
||||
}
|
||||
// Try to extract a column family name from the available information.
|
||||
// Most functions can be prepared without information about the column family, usually just the keyspace is enough.
|
||||
// One exception is the token() function - in order to prepare system.token() we have to know the partition key of the table,
|
||||
|
||||
@@ -23,7 +23,6 @@
|
||||
#include "index/vector_index.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
#include "types/types.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
@@ -330,19 +329,6 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
|
||||
"*/",
|
||||
*table_desc.create_statement);
|
||||
|
||||
table_desc.create_statement = std::move(os).to_managed_string();
|
||||
} else if (service::paxos::paxos_store::try_get_base_table(name)) {
|
||||
// Paxos state table is internally managed by Scylla and it shouldn't be exposed to the user.
|
||||
// The table is allowed to be described as a comment to ease administrative work but it's hidden from all listings.
|
||||
fragmented_ostringstream os{};
|
||||
|
||||
fmt::format_to(os.to_iter(),
|
||||
"/* Do NOT execute this statement! It's only for informational purposes.\n"
|
||||
" A paxos state table is created automatically when enabling LWT on a base table.\n"
|
||||
"\n{}\n"
|
||||
"*/",
|
||||
*table_desc.create_statement);
|
||||
|
||||
table_desc.create_statement = std::move(os).to_managed_string();
|
||||
}
|
||||
result.push_back(std::move(table_desc));
|
||||
@@ -378,7 +364,7 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
|
||||
future<std::vector<description>> tables(const data_dictionary::database& db, const lw_shared_ptr<keyspace_metadata>& ks, std::optional<bool> with_internals = std::nullopt) {
|
||||
auto& replica_db = db.real_database();
|
||||
auto tables = ks->tables() | std::views::filter([&replica_db] (const schema_ptr& s) {
|
||||
return !cdc::is_log_for_some_table(replica_db, s->ks_name(), s->cf_name()) && !service::paxos::paxos_store::try_get_base_table(s->cf_name());
|
||||
return !cdc::is_log_for_some_table(replica_db, s->ks_name(), s->cf_name());
|
||||
}) | std::ranges::to<std::vector<schema_ptr>>();
|
||||
std::ranges::sort(tables, std::ranges::less(), std::mem_fn(&schema::cf_name));
|
||||
|
||||
|
||||
@@ -259,9 +259,11 @@ uint32_t select_statement::get_bound_terms() const {
|
||||
|
||||
future<> select_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
try {
|
||||
auto cdc = qp.db().get_cdc_base_table(*_schema);
|
||||
auto& cf_name = _schema->is_view()
|
||||
? _schema->view_info()->base_name()
|
||||
const data_dictionary::database db = qp.db();
|
||||
auto&& s = db.find_schema(keyspace(), column_family());
|
||||
auto cdc = db.get_cdc_base_table(*s);
|
||||
auto& cf_name = s->is_view()
|
||||
? s->view_info()->base_name()
|
||||
: (cdc ? cdc->cf_name() : column_family());
|
||||
const schema_ptr& base_schema = cdc ? cdc : _schema;
|
||||
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*base_schema);
|
||||
|
||||
@@ -1498,7 +1498,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, index_cache_fraction(this, "index_cache_fraction", liveness::LiveUpdate, value_status::Used, 0.2,
|
||||
"The maximum fraction of cache memory permitted for use by index cache. Clamped to the [0.0; 1.0] range. Must be small enough to not deprive the row cache of memory, but should be big enough to fit a large fraction of the index. The default value 0.2 means that at least 80\% of cache memory is reserved for the row cache, while at most 20\% is usable by the index cache.")
|
||||
, consistent_cluster_management(this, "consistent_cluster_management", value_status::Deprecated, true, "Use RAFT for cluster management and DDL.")
|
||||
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Deprecated, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
|
||||
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Used, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
|
||||
, recovery_leader(this, "recovery_leader", liveness::LiveUpdate, value_status::Used, utils::null_uuid(), "Host ID of the node restarted first while performing the Manual Raft-based Recovery Procedure. Warning: this option disables some guardrails for the needs of the Manual Raft-based Recovery Procedure. Make sure you unset it at the end of the procedure.")
|
||||
, wasm_cache_memory_fraction(this, "wasm_cache_memory_fraction", value_status::Used, 0.01, "Maximum total size of all WASM instances stored in the cache as fraction of total shard memory.")
|
||||
, wasm_cache_timeout_in_ms(this, "wasm_cache_timeout_in_ms", value_status::Used, 5000, "Time after which an instance is evicted from the cache.")
|
||||
|
||||
@@ -215,8 +215,6 @@ public:
|
||||
static constexpr auto BUILT_VIEWS = "built_views";
|
||||
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
|
||||
static constexpr auto CDC_LOCAL = "cdc_local";
|
||||
static constexpr auto CDC_TIMESTAMPS = "cdc_timestamps";
|
||||
static constexpr auto CDC_STREAMS = "cdc_streams";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
|
||||
@@ -588,7 +588,11 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
|
||||
utils::get_local_injector().inject("do_build_range_fail",
|
||||
[] { throw std::runtime_error("do_build_range failed due to error injection"); });
|
||||
|
||||
return seastar::async([this, base_id, views_ids = std::move(views_ids), last_token, &as] {
|
||||
// Run the view building in the streaming scheduling group
|
||||
// so that it doesn't impact other tasks with higher priority.
|
||||
seastar::thread_attributes attr;
|
||||
attr.sched_group = _db.get_streaming_scheduling_group();
|
||||
return seastar::async(std::move(attr), [this, base_id, views_ids = std::move(views_ids), last_token, &as] {
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto base_cf = _db.find_column_family(base_id).shared_from_this();
|
||||
reader_permit permit = _db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "build_views_range", db::no_timeout, {});
|
||||
|
||||
@@ -67,7 +67,6 @@ public:
|
||||
return schema_builder(system_keyspace::NAME, "cluster_status", std::make_optional(id))
|
||||
.with_column("peer", inet_addr_type, column_kind::partition_key)
|
||||
.with_column("dc", utf8_type)
|
||||
.with_column("rack", utf8_type)
|
||||
.with_column("up", boolean_type)
|
||||
.with_column("draining", boolean_type)
|
||||
.with_column("excluded", boolean_type)
|
||||
@@ -112,9 +111,7 @@ public:
|
||||
// Not all entries in gossiper are present in the topology
|
||||
auto& node = tm.get_topology().get_node(hostid);
|
||||
sstring dc = node.dc_rack().dc;
|
||||
sstring rack = node.dc_rack().rack;
|
||||
set_cell(cr, "dc", dc);
|
||||
set_cell(cr, "rack", rack);
|
||||
set_cell(cr, "draining", node.is_draining());
|
||||
set_cell(cr, "excluded", node.is_excluded());
|
||||
}
|
||||
@@ -1348,8 +1345,8 @@ public:
|
||||
|
||||
private:
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS, std::make_optional(id))
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_timestamps");
|
||||
return schema_builder(system_keyspace::NAME, "cdc_timestamps", std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("timestamp", reversed_type_impl::get_instance(timestamp_type), column_kind::clustering_key)
|
||||
@@ -1431,8 +1428,8 @@ public:
|
||||
}
|
||||
private:
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_STREAMS);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::CDC_STREAMS, std::make_optional(id))
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_streams");
|
||||
return schema_builder(system_keyspace::NAME, "cdc_streams", std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("timestamp", timestamp_type, column_kind::clustering_key)
|
||||
|
||||
1
debug.cc
1
debug.cc
@@ -12,6 +12,5 @@ namespace debug {
|
||||
|
||||
seastar::sharded<replica::database>* volatile the_database = nullptr;
|
||||
seastar::scheduling_group streaming_scheduling_group;
|
||||
seastar::scheduling_group gossip_scheduling_group;
|
||||
|
||||
}
|
||||
|
||||
1
debug.hh
1
debug.hh
@@ -18,7 +18,6 @@ namespace debug {
|
||||
|
||||
extern seastar::sharded<replica::database>* volatile the_database;
|
||||
extern seastar::scheduling_group streaming_scheduling_group;
|
||||
extern seastar::scheduling_group gossip_scheduling_group;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
# Move the OS Support page
|
||||
|
||||
/stable/getting-started/os-support.html: https://docs.scylladb.com/stable/versioning/os-support-per-version.html
|
||||
|
||||
# Remove an outdated KB
|
||||
|
||||
/stable/kb/perftune-modes-sync.html: /stable/kb/index.html
|
||||
|
||||
@@ -25,8 +25,6 @@ Querying data from data is done using a ``SELECT`` statement:
|
||||
: | CAST '(' `selector` AS `cql_type` ')'
|
||||
: | `function_name` '(' [ `selector` ( ',' `selector` )* ] ')'
|
||||
: | COUNT '(' '*' ')'
|
||||
: | literal
|
||||
: | bind_marker
|
||||
: )
|
||||
: ( '.' `field_name` | '[' `term` ']' )*
|
||||
where_clause: `relation` ( AND `relation` )*
|
||||
@@ -37,8 +35,6 @@ Querying data from data is done using a ``SELECT`` statement:
|
||||
operator: '=' | '<' | '>' | '<=' | '>=' | IN | NOT IN | CONTAINS | CONTAINS KEY
|
||||
ordering_clause: `column_name` [ ASC | DESC ] ( ',' `column_name` [ ASC | DESC ] )*
|
||||
timeout: `duration`
|
||||
literal: number | 'string' | boolean | NULL | tuple_literal | list_literal | map_literal
|
||||
bind_marker: '?' | ':' `identifier`
|
||||
|
||||
For instance::
|
||||
|
||||
@@ -85,13 +81,6 @@ A :token:`selector` can be one of the following:
|
||||
- A casting, which allows you to convert a nested selector to a (compatible) type.
|
||||
- A function call, where the arguments are selector themselves.
|
||||
- A call to the :ref:`COUNT function <count-function>`, which counts all non-null results.
|
||||
- A literal value (constant).
|
||||
- A bind variable (`?` or `:name`).
|
||||
|
||||
Note that due to a quirk of the type system, literals and bind markers cannot be
|
||||
used as top-level selectors, as the parser cannot infer their type. However, they can be used
|
||||
when nested inside functions, as the function formal parameter types provide the
|
||||
necessary context.
|
||||
|
||||
Aliases
|
||||
```````
|
||||
@@ -292,8 +281,7 @@ For example::
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
|
||||
or columns provided in a definition of the index.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
|
||||
For example::
|
||||
|
||||
|
||||
@@ -140,83 +140,17 @@ Vector Index :label-note:`ScyllaDB Cloud`
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
|
||||
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
|
||||
index for indexing vectors per partition.
|
||||
similarity search on vector data.
|
||||
|
||||
The vector index is the only custom type index supported in ScyllaDB. It is created using
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
|
||||
add additional columns to the index for filtering the search results. The partition column
|
||||
specified in the global vector index definition must be the vector column, and any subsequent
|
||||
columns are treated as filtering columns. The local vector index requires that the partition key
|
||||
of the base table is also the partition key of the index and the vector column is the first one
|
||||
from the following columns.
|
||||
|
||||
Example of a simple index:
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global vector index. Additional filtering can be performed on the primary key
|
||||
columns of the base table.
|
||||
|
||||
Example of a global vector index with additional filtering:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global index. Additional columns are added for filtering the search results.
|
||||
The filtering is possible on ``category``, ``info`` and all primary key columns
|
||||
of the base table.
|
||||
|
||||
Example of a local vector index:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed for similarity search (a local
|
||||
index) and additional columns are added for filtering the search results. The
|
||||
filtering is possible on ``category``, ``info`` and all primary key columns of
|
||||
the base table. The columns ``id`` and ``created_at`` must be the partition key
|
||||
of the base table.
|
||||
|
||||
Vector indexes support additional filtering columns of native data types
|
||||
(excluding counter and duration). The indexed column itself must be a vector
|
||||
column, while the extra columns can be used to filter search results.
|
||||
|
||||
The supported types are:
|
||||
|
||||
* ``ascii``
|
||||
* ``bigint``
|
||||
* ``blob``
|
||||
* ``boolean``
|
||||
* ``date``
|
||||
* ``decimal``
|
||||
* ``double``
|
||||
* ``float``
|
||||
* ``inet``
|
||||
* ``int``
|
||||
* ``smallint``
|
||||
* ``text``
|
||||
* ``varchar``
|
||||
* ``time``
|
||||
* ``timestamp``
|
||||
* ``timeuuid``
|
||||
* ``tinyint``
|
||||
* ``uuid``
|
||||
* ``varint``
|
||||
|
||||
|
||||
The following options are supported for vector indexes. All of them are optional.
|
||||
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
|
||||
@@ -156,7 +156,7 @@ How do I check the current version of ScyllaDB that I am running?
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
* On a regular system or VM (running Ubuntu, CentOS, or RedHat Enterprise): :code:`$ scylla --version`
|
||||
|
||||
Check the `Operating System Support Guide <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_ for a list of supported operating systems and versions.
|
||||
Check the :doc:`Operating System Support Guide </getting-started/os-support>` for a list of supported operating systems and versions.
|
||||
|
||||
* On a docker node: :code:`$ docker exec -it Node_Z scylla --version`
|
||||
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
Automatic Repair
|
||||
================
|
||||
|
||||
Traditionally, launching :doc:`repairs </operating-scylla/procedures/maintenance/repair>` in a ScyllaDB cluster is left to an external process, typically done via `Scylla Manager <https://manager.docs.scylladb.com/stable/repair/index.html>`_.
|
||||
Traditionally, launching `repairs </operating-scylla/procedures/maintenance/repair>`_ in a ScyllaDB cluster is left to an external process, typically done via `Scylla Manager <https://manager.docs.scylladb.com/stable/repair/index.html>`_.
|
||||
|
||||
Automatic repair offers built-in scheduling in ScyllaDB itself. If the time since the last repair is greater than the configured repair interval, ScyllaDB will start a repair for the :doc:`tablet table </architecture/tablets>` automatically.
|
||||
Automatic repair offers built-in scheduling in ScyllaDB itself. If the time since the last repair is greater than the configured repair interval, ScyllaDB will start a repair for the tablet `tablet </architecture/tablets>`_ automatically.
|
||||
Repairs are spread over time and among nodes and shards, to avoid load spikes or any adverse effects on user workloads.
|
||||
|
||||
To enable automatic repair, add this to the configuration (``scylla.yaml``):
|
||||
@@ -20,4 +20,4 @@ More featureful configuration methods will be implemented in the future.
|
||||
|
||||
To disable, set ``auto_repair_enabled_default: false``.
|
||||
|
||||
Automatic repair relies on :doc:`Incremental Repair </features/incremental-repair>` and as such it only works with :doc:`tablet </architecture/tablets>` tables.
|
||||
Automatic repair relies on `Incremental Repair </features/incremental-repair>`_ and as such it only works with `tablet </architecture/tablets>`_ tables.
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
Incremental Repair
|
||||
==================
|
||||
|
||||
ScyllaDB's standard :doc:`repair </operating-scylla/procedures/maintenance/repair>` process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
|
||||
ScyllaDB's standard `repair </operating-scylla/procedures/maintenance/repair>`_ process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
|
||||
|
||||
The core idea of incremental repair is to repair only the data that has been written or changed since the last repair was run. It intelligently skips data that has already been verified, dramatically reducing the time, I/O, and CPU resources required for the repair operation.
|
||||
|
||||
@@ -51,7 +51,7 @@ Benefits of Incremental Repair
|
||||
* **Reduced Resource Usage:** Consumes significantly less CPU, I/O, and network bandwidth compared to a full repair.
|
||||
* **More Frequent Repairs:** The efficiency of incremental repair allows you to run it more frequently, ensuring a higher level of data consistency across your cluster at all times.
|
||||
|
||||
Tables using Incremental Repair can schedule repairs in ScyllaDB itself, with :doc:`Automatic Repair </features/automatic-repair>`.
|
||||
Tables using Incremental Repair can schedule repairs in ScyllaDB itself, with `Automatic Repair </features/automatic-repair>`_.
|
||||
|
||||
Notes
|
||||
-----
|
||||
|
||||
@@ -18,7 +18,7 @@ Getting Started
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`ScyllaDB System Requirements Guide</getting-started/system-requirements/>`
|
||||
* `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
* :doc:`OS Support by Platform and Version</getting-started/os-support/>`
|
||||
|
||||
.. panel-box::
|
||||
:title: Install and Configure ScyllaDB
|
||||
|
||||
@@ -17,7 +17,7 @@ This article will help you install ScyllaDB on Linux using platform-specific pac
|
||||
Prerequisites
|
||||
----------------
|
||||
|
||||
* Ubuntu, Debian, CentOS, or RHEL (see `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
* Ubuntu, Debian, CentOS, or RHEL (see :doc:`OS Support by Platform and Version </getting-started/os-support>`
|
||||
for details about supported versions and architecture)
|
||||
* Root or ``sudo`` access to the system
|
||||
* Open :ref:`ports used by ScyllaDB <networking-ports>`
|
||||
|
||||
@@ -10,7 +10,7 @@ Prerequisites
|
||||
--------------
|
||||
|
||||
Ensure that your platform is supported by the ScyllaDB version you want to install.
|
||||
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_.
|
||||
See :doc:`OS Support by Platform and Version </getting-started/os-support/>`.
|
||||
|
||||
Install ScyllaDB with Web Installer
|
||||
---------------------------------------
|
||||
|
||||
@@ -12,8 +12,7 @@ the package manager (dnf and apt).
|
||||
Prerequisites
|
||||
---------------
|
||||
Ensure your platform is supported by the ScyllaDB version you want to install.
|
||||
See `OS Support <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported Linux distributions and versions.
|
||||
See :doc:`OS Support </getting-started/os-support>` for information about supported Linux distributions and versions.
|
||||
|
||||
Note that if you're on CentOS 7, only root offline installation is supported.
|
||||
|
||||
|
||||
26
docs/getting-started/os-support.rst
Normal file
26
docs/getting-started/os-support.rst
Normal file
@@ -0,0 +1,26 @@
|
||||
OS Support by Linux Distributions and Version
|
||||
==============================================
|
||||
|
||||
The following matrix shows which Linux distributions, containers, and images
|
||||
are :ref:`supported <os-support-definition>` with which versions of ScyllaDB.
|
||||
|
||||
.. datatemplate:json:: /_static/data/os-support.json
|
||||
:template: platforms.tmpl
|
||||
|
||||
``*`` 2024.1.9 and later
|
||||
|
||||
All releases are available as a Docker container, EC2 AMI, GCP, and Azure images.
|
||||
|
||||
.. _os-support-definition:
|
||||
|
||||
By *supported*, it is meant that:
|
||||
|
||||
- A binary installation package is available.
|
||||
- The download and install procedures are tested as part of the ScyllaDB release process for each version.
|
||||
- An automated install is included from :doc:`ScyllaDB Web Installer for Linux tool </getting-started/installation-common/scylla-web-installer>` (for the latest versions).
|
||||
|
||||
You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-prerequisites>`_
|
||||
on other x86_64 or aarch64 platforms, without any guarantees.
|
||||
|
||||
|
||||
|
||||
@@ -8,12 +8,12 @@ ScyllaDB Requirements
|
||||
:hidden:
|
||||
|
||||
system-requirements
|
||||
OS Support <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>
|
||||
OS Support <os-support>
|
||||
Cloud Instance Recommendations <cloud-instance-recommendations>
|
||||
scylla-in-a-shared-environment
|
||||
|
||||
* :doc:`System Requirements</getting-started/system-requirements/>`
|
||||
* `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
* :doc:`OS Support by Platform and Version</getting-started/os-support/>`
|
||||
* :doc:`Cloud Instance Recommendations AWS, GCP, and Azure </getting-started/cloud-instance-recommendations>`
|
||||
* :doc:`Running ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ Supported Platforms
|
||||
===================
|
||||
ScyllaDB runs on 64-bit Linux. The x86_64 and AArch64 architectures are supported (AArch64 support includes AWS EC2 Graviton).
|
||||
|
||||
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_ for information about
|
||||
See :doc:`OS Support by Platform and Version </getting-started/os-support>` for information about
|
||||
supported operating systems, distros, and versions.
|
||||
|
||||
See :doc:`Cloud Instance Recommendations for AWS, GCP, and Azure </getting-started/cloud-instance-recommendations>` for information
|
||||
|
||||
@@ -52,14 +52,18 @@ Row-level repair improves ScyllaDB in two ways:
|
||||
* keeping the data in a temporary buffer.
|
||||
* using the cached data to calculate the checksum and send it to the replicas.
|
||||
|
||||
See also the `ScyllaDB Manager documentation <https://manager.docs.scylladb.com/>`_.
|
||||
See also
|
||||
|
||||
* `ScyllaDB Manager documentation <https://manager.docs.scylladb.com/>`_
|
||||
|
||||
* `Blog: ScyllaDB Open Source 3.1: Efficiently Maintaining Consistency with Row-Level Repair <https://www.scylladb.com/2019/08/13/scylla-open-source-3-1-efficiently-maintaining-consistency-with-row-level-repair/>`_
|
||||
|
||||
Incremental Repair
|
||||
------------------
|
||||
|
||||
Built on top of :ref:`Row-level Repair <row-level-repair>` and :doc:`Tablets </architecture/tablets>`, Incremental Repair enables frequent and quick repairs. For more details, see :doc:`Incremental Repair </features/incremental-repair>`.
|
||||
Built on top of `Row-level Repair <row-level-repair_>`_ and `Tablets </architecture/tablets>`_, Incremental Repair enables frequent and quick repairs. For more details, see `Incremental Repair </features/incremental-repair>`_.
|
||||
|
||||
Automatic Repair
|
||||
----------------
|
||||
|
||||
Built on top of :doc:`Incremental Repair </features/incremental-repair>`, :doc:`Automatic Repair </features/automatic-repair>` offers repair scheduling and execution directly in ScyllaDB, without external processes.
|
||||
Built on top of `Incremental Repair </features/incremental-repair>`_, `Automatic Repair </features/automatic-repair>`_ offers repair scheduling and execution directly in ScyllaDB, without external processes.
|
||||
|
||||
@@ -14,7 +14,7 @@ if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL),
|
||||
CentOS, Debian, and Ubuntu.
|
||||
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
See :doc:`OS Support by Platform and Version </getting-started/os-support>`
|
||||
for information about supported versions.
|
||||
|
||||
It also applies to the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
@@ -17,7 +17,7 @@ This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME
|
||||
to |SCYLLA_NAME| |NEW_VERSION| and rollback to version |SRC_VERSION| if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian,
|
||||
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
and Ubuntu. See :doc:`OS Support by Platform and Version </getting-started/os-support>`
|
||||
for information about supported versions.
|
||||
|
||||
It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
@@ -2424,8 +2424,8 @@ bool gossiper::is_enabled() const {
|
||||
void gossiper::add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time) {
|
||||
auto now_ = now();
|
||||
auto diff = std::chrono::duration_cast<std::chrono::seconds>(expire_time - now_).count();
|
||||
logger.info("Node {} will be removed from gossip at [{:%Y-%m-%d %T %z}]: (expire = {}, now = {}, diff = {} seconds)",
|
||||
endpoint, fmt::gmtime(clk::to_time_t(expire_time)), expire_time.time_since_epoch().count(),
|
||||
logger.info("Node {} will be removed from gossip at [{:%Y-%m-%d %T}]: (expire = {}, now = {}, diff = {} seconds)",
|
||||
endpoint, fmt::localtime(clk::to_time_t(expire_time)), expire_time.time_since_epoch().count(),
|
||||
now_.time_since_epoch().count(), diff);
|
||||
_expire_time_endpoint_map[endpoint] = expire_time;
|
||||
}
|
||||
|
||||
@@ -153,8 +153,6 @@ public:
|
||||
}
|
||||
const std::set<inet_address>& get_seeds() const noexcept;
|
||||
|
||||
seastar::scheduling_group get_scheduling_group() const noexcept { return _gcfg.gossip_scheduling_group; }
|
||||
|
||||
public:
|
||||
static clk::time_point inline now() noexcept { return clk::now(); }
|
||||
public:
|
||||
|
||||
@@ -17,11 +17,11 @@
|
||||
#include "index/secondary_index.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
namespace secondary_index {
|
||||
|
||||
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
|
||||
@@ -147,88 +147,17 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
|
||||
}
|
||||
|
||||
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
|
||||
|
||||
struct validate_visitor {
|
||||
const class schema& schema;
|
||||
bool& is_vector;
|
||||
|
||||
/// Vector indexes support filtering on native types that can be used as primary key columns.
|
||||
/// There is no counter (it cannot be used with vector columns)
|
||||
/// and no duration (it cannot be used as a primary key or in secondary indexes).
|
||||
static bool is_supported_filtering_column(abstract_type const & kind_type) {
|
||||
switch (kind_type.get_kind()) {
|
||||
case abstract_type::kind::ascii:
|
||||
case abstract_type::kind::boolean:
|
||||
case abstract_type::kind::byte:
|
||||
case abstract_type::kind::bytes:
|
||||
case abstract_type::kind::date:
|
||||
case abstract_type::kind::decimal:
|
||||
case abstract_type::kind::double_kind:
|
||||
case abstract_type::kind::float_kind:
|
||||
case abstract_type::kind::inet:
|
||||
case abstract_type::kind::int32:
|
||||
case abstract_type::kind::long_kind:
|
||||
case abstract_type::kind::short_kind:
|
||||
case abstract_type::kind::simple_date:
|
||||
case abstract_type::kind::time:
|
||||
case abstract_type::kind::timestamp:
|
||||
case abstract_type::kind::timeuuid:
|
||||
case abstract_type::kind::utf8:
|
||||
case abstract_type::kind::uuid:
|
||||
case abstract_type::kind::varint:
|
||||
return true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void validate(cql3::column_identifier const& column, bool is_vector) const {
|
||||
auto const& c_name = column.to_string();
|
||||
auto const* c_def = schema.get_column_definition(column.name());
|
||||
if (c_def == nullptr) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
|
||||
}
|
||||
|
||||
auto type = c_def->type;
|
||||
|
||||
if (is_vector) {
|
||||
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
|
||||
if (vector_type == nullptr) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
|
||||
auto elements_type = vector_type->get_elements_type();
|
||||
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_supported_filtering_column(*type)) {
|
||||
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
|
||||
for (const auto& column : columns) {
|
||||
// CQL restricts the secondary local index to have multiple columns with partition key only.
|
||||
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
|
||||
// so we can assume here that these are non-vectors filtering columns.
|
||||
validate(*column, false);
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
|
||||
validate(*column, is_vector);
|
||||
// The first column is the vector column, the rest mustn't be vectors.
|
||||
is_vector = false;
|
||||
}
|
||||
};
|
||||
|
||||
bool is_vector = true;
|
||||
for (const auto& target : targets) {
|
||||
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
|
||||
if (targets.size() != 1) {
|
||||
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
|
||||
}
|
||||
auto target = targets[0];
|
||||
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
|
||||
if (!c_def) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
|
||||
}
|
||||
auto type = c_def->type;
|
||||
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
7
init.cc
7
init.cc
@@ -102,6 +102,13 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
|
||||
disabled.insert("STRONGLY_CONSISTENT_TABLES"s);
|
||||
}
|
||||
if (cfg.force_gossip_topology_changes()) {
|
||||
if (cfg.enable_tablets_by_default()) {
|
||||
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");
|
||||
}
|
||||
startlog.warn("The tablets feature is disabled due to forced gossip topology changes");
|
||||
disabled.insert("TABLETS"s);
|
||||
}
|
||||
if (!cfg.table_digest_insensitive_to_expiry()) {
|
||||
disabled.insert("TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"s);
|
||||
}
|
||||
|
||||
@@ -150,6 +150,7 @@ fedora_packages=(
|
||||
llvm
|
||||
openldap-servers
|
||||
openldap-devel
|
||||
toxiproxy
|
||||
cyrus-sasl
|
||||
fipscheck
|
||||
cpp-jwt-devel
|
||||
@@ -294,7 +295,6 @@ print_usage() {
|
||||
echo " --print-pip-runtime-packages Print required pip packages for Scylla"
|
||||
echo " --print-pip-symlinks Print list of pip provided commands which need to install to /usr/bin"
|
||||
echo " --print-node-exporter-filename Print node_exporter filename"
|
||||
echo " --future Install dependencies for future toolchain (Fedora rawhide based)"
|
||||
exit 1
|
||||
}
|
||||
|
||||
@@ -302,7 +302,6 @@ PRINT_PYTHON3=false
|
||||
PRINT_PIP=false
|
||||
PRINT_PIP_SYMLINK=false
|
||||
PRINT_NODE_EXPORTER=false
|
||||
FUTURE=false
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--print-python3-runtime-packages")
|
||||
@@ -321,10 +320,6 @@ while [ $# -gt 0 ]; do
|
||||
PRINT_NODE_EXPORTER=true
|
||||
shift 1
|
||||
;;
|
||||
"--future")
|
||||
FUTURE=true
|
||||
shift 1
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -355,10 +350,6 @@ if $PRINT_NODE_EXPORTER; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if ! $FUTURE; then
|
||||
fedora_packages+=(toxiproxy)
|
||||
fi
|
||||
|
||||
umask 0022
|
||||
|
||||
./seastar/install-dependencies.sh
|
||||
@@ -456,11 +447,3 @@ if [ ! -z "${CURL_ARGS}" ]; then
|
||||
else
|
||||
echo "Minio server and client are up-to-date, skipping download"
|
||||
fi
|
||||
|
||||
if $FUTURE ; then
|
||||
toxyproxy_version="v2.12.0"
|
||||
for bin in toxiproxy-cli toxiproxy-server; do
|
||||
curl -fSL -o "/usr/local/bin/${bin}" "https://github.com/Shopify/toxiproxy/releases/download/${toxyproxy_version}/${bin}-linux-$(go_arch)"
|
||||
chmod +x "/usr/local/bin/${bin}"
|
||||
done
|
||||
fi
|
||||
|
||||
11
main.cc
11
main.cc
@@ -571,7 +571,7 @@ sharded<service::storage_proxy> *the_storage_proxy;
|
||||
// This is used by perf-alternator to allow running scylla together with the tool
|
||||
// in a single process. So that it's easier to measure internals. It's not added
|
||||
// to main_func_type to not complicate common flow as no other tool needs such logic.
|
||||
std::function<void(lw_shared_ptr<db::config>)> after_init_func;
|
||||
std::function<future<>(lw_shared_ptr<db::config>, sharded<abort_source>&)> after_init_func;
|
||||
|
||||
static locator::host_id initialize_local_info_thread(sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<locator::snitch_ptr>& snitch,
|
||||
@@ -1150,7 +1150,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
dbcfg.memtable_scheduling_group = create_scheduling_group("memtable", "mt", 1000).get();
|
||||
dbcfg.memtable_to_cache_scheduling_group = create_scheduling_group("memtable_to_cache", "mt2c", 200).get();
|
||||
dbcfg.gossip_scheduling_group = create_scheduling_group("gossip", "gms", 1000).get();
|
||||
debug::gossip_scheduling_group = dbcfg.gossip_scheduling_group;
|
||||
dbcfg.commitlog_scheduling_group = create_scheduling_group("commitlog", "clog", 1000).get();
|
||||
dbcfg.schema_commitlog_scheduling_group = create_scheduling_group("schema_commitlog", "sclg", 1000).get();
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
@@ -2042,7 +2041,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
cdc_config.ring_delay = std::chrono::milliseconds(cfg->ring_delay_ms());
|
||||
cdc_config.dont_rewrite_streams = cfg->cdc_dont_rewrite_streams();
|
||||
cdc_generation_service.start(std::move(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
|
||||
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
|
||||
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db),
|
||||
[&ss] () -> bool { return ss.local().raft_topology_change_enabled(); }).get();
|
||||
auto stop_cdc_generation_service = defer_verbose_shutdown("CDC Generation Management service", [] {
|
||||
cdc_generation_service.stop().get();
|
||||
});
|
||||
@@ -2077,6 +2077,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
perm_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
|
||||
|
||||
auto start_auth_service = [&mm] (sharded<auth::service>& auth_service, std::any& stop_auth_service, const char* what) {
|
||||
supervisor::notify(fmt::format("starting {}", what));
|
||||
auth_service.invoke_on_all(&auth::service::start, std::ref(mm), std::ref(sys_ks)).get();
|
||||
|
||||
stop_auth_service = defer_verbose_shutdown(what, [&auth_service] {
|
||||
@@ -2581,11 +2582,13 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
supervisor::notify("serving");
|
||||
|
||||
startlog.info("Scylla version {} initialization completed.", scylla_version());
|
||||
future<> after_init_fut = make_ready_future<>();
|
||||
if (after_init_func) {
|
||||
after_init_func(cfg);
|
||||
after_init_fut = after_init_func(cfg, stop_signal.as_sharded_abort_source());
|
||||
}
|
||||
stop_signal.wait().get();
|
||||
startlog.info("Signal received; shutting down");
|
||||
std::move(after_init_fut).get();
|
||||
// At this point, all objects destructors and all shutdown hooks registered with defer() are executed
|
||||
} catch (const sleep_aborted&) {
|
||||
startlog.info("Startup interrupted");
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:cb48c6afc5bf2a62234e069c8dfc6ae491645f7fb200072bb73dac148349c472
|
||||
size 6543556
|
||||
oid sha256:a4710f1f0b0bb329721c21d133618e811e820f2e70553b0aca28fb278bff89c9
|
||||
size 6492280
|
||||
|
||||
@@ -103,8 +103,8 @@ thread_local dirty_memory_manager default_dirty_memory_manager;
|
||||
|
||||
inline
|
||||
flush_controller
|
||||
make_flush_controller(const db::config& cfg, const database_config& dbcfg, std::function<double()> fn) {
|
||||
return flush_controller(dbcfg.memtable_scheduling_group, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn));
|
||||
make_flush_controller(const db::config& cfg, backlog_controller::scheduling_group& sg, std::function<double()> fn) {
|
||||
return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn));
|
||||
}
|
||||
|
||||
keyspace::keyspace(config cfg, locator::effective_replication_map_factory& erm_factory)
|
||||
@@ -394,7 +394,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _system_dirty_memory_manager(*this, 10 << 20, cfg.unspooled_dirty_soft_limit(), default_scheduling_group())
|
||||
, _dirty_memory_manager(*this, dbcfg.available_memory * 0.50, cfg.unspooled_dirty_soft_limit(), dbcfg.statement_scheduling_group)
|
||||
, _dbcfg(dbcfg)
|
||||
, _memtable_controller(make_flush_controller(_cfg, _dbcfg, [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
, _flush_sg(dbcfg.memtable_scheduling_group)
|
||||
, _memtable_controller(make_flush_controller(_cfg, _flush_sg, [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
auto backlog = (_dirty_memory_manager.unspooled_dirty_memory()) / limit;
|
||||
if (_dirty_memory_manager.has_extraneous_flushes_requested()) {
|
||||
backlog = std::max(backlog, _memtable_controller.backlog_of_shares(200));
|
||||
|
||||
@@ -1617,6 +1617,7 @@ private:
|
||||
dirty_memory_manager _dirty_memory_manager;
|
||||
|
||||
database_config _dbcfg;
|
||||
backlog_controller::scheduling_group _flush_sg;
|
||||
flush_controller _memtable_controller;
|
||||
drain_progress _drain_progress {};
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ set -e
|
||||
trap 'echo "error $? in $0 line $LINENO"' ERR
|
||||
|
||||
SCRIPT_NAME=$(basename $0)
|
||||
SCYLLA_S3_RELOC_SERVER_DEFAULT_URL=https://api.backtrace.scylladb.com
|
||||
SCYLLA_S3_RELOC_SERVER_DEFAULT_URL=http://backtrace.scylladb.com
|
||||
|
||||
function print_usage {
|
||||
cat << EOF
|
||||
@@ -284,8 +284,7 @@ then
|
||||
|
||||
log "Build id: ${BUILD_ID}"
|
||||
|
||||
# https://api.backtrace.scylladb.com/api/docs#/default/search_by_build_id_search_build_id_get
|
||||
BUILD=$(curl "${SCYLLA_S3_RELOC_SERVER_URL}/api/search/build_id?build_id=${BUILD_ID}" -H 'accept: application/json')
|
||||
BUILD=$(curl -s -X GET "${SCYLLA_S3_RELOC_SERVER_URL}/build.json?build_id=${BUILD_ID}")
|
||||
|
||||
if [[ -z "$BUILD" ]]
|
||||
then
|
||||
@@ -294,16 +293,12 @@ then
|
||||
fi
|
||||
|
||||
RESPONSE_BUILD_ID=$(get_json_field "$BUILD" "build_id")
|
||||
BUILD_MODE=$(get_json_field "$BUILD" "build_type")
|
||||
PACKAGE_URL=$(get_json_field "$BUILD" "unstripped_url")
|
||||
BUILD_DATA=$(get_json_field "$BUILD" "build_data")
|
||||
|
||||
VERSION=$(get_json_field "$BUILD_DATA" "version")
|
||||
PRODUCT=$(get_json_field "$BUILD_DATA" "product")
|
||||
RELEASE=$(get_json_field "$BUILD_DATA" "release")
|
||||
ARCH=$(get_json_field "$BUILD_DATA" "platform")
|
||||
TIMESTAMP=$(get_json_field "$BUILD_DATA" "timestamp")
|
||||
|
||||
VERSION=$(get_json_field "$BUILD" "version")
|
||||
PRODUCT=$(get_json_field "$BUILD" "product")
|
||||
RELEASE=$(get_json_field "$BUILD" "release")
|
||||
ARCH=$(get_json_field "$BUILD" "arch")
|
||||
BUILD_MODE=$(get_json_field "$BUILD" "build_mode")
|
||||
PACKAGE_URL=$(get_json_field "$BUILD" "package_url" 1)
|
||||
|
||||
if [[ "$RESPONSE_BUILD_ID" != "$BUILD_ID" ]]
|
||||
then
|
||||
@@ -311,7 +306,7 @@ then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log "Matching build is ${PRODUCT}-${VERSION} ${RELEASE} ${BUILD_MODE}-${ARCH} from ${TIMESTAMP}"
|
||||
log "Matching build is ${PRODUCT}-${VERSION} ${RELEASE} ${BUILD_MODE}-${ARCH}"
|
||||
fi
|
||||
|
||||
if ! [[ -d ${ARTIFACT_DIR}/scylla.package ]]
|
||||
|
||||
@@ -217,8 +217,6 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
|
||||
static const std::unordered_set<auth::resource> vector_search_system_resources = {
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY),
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::VERSIONS),
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::CDC_STREAMS),
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::CDC_TIMESTAMPS),
|
||||
};
|
||||
|
||||
if ((cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) ||
|
||||
|
||||
@@ -56,9 +56,6 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, locator:
|
||||
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
|
||||
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
|
||||
_notifier(notifier)
|
||||
, _background_tasks("migration_manager::background_tasks")
|
||||
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss("migration_manager::storage_service"), _gossiper(gossiper), _group0_client(group0_client)
|
||||
, _sys_ks(sysks)
|
||||
, _group0_barrier(this_shard_id() == 0 ?
|
||||
std::function<future<>()>([this] () -> future<> {
|
||||
if ((co_await _group0_client.get_group0_upgrade_state()).second == group0_upgrade_state::use_pre_raft_procedures) {
|
||||
@@ -66,7 +63,7 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
|
||||
}
|
||||
|
||||
// This will run raft barrier and will sync schema with the leader
|
||||
co_await with_scheduling_group(_gossiper.get_scheduling_group(), [this] {
|
||||
co_await with_scheduling_group(_storage_proxy.get_db().local().get_gossip_scheduling_group(), [this] {
|
||||
return start_group0_operation().discard_result();
|
||||
});
|
||||
}) :
|
||||
@@ -77,6 +74,9 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
|
||||
});
|
||||
})
|
||||
)
|
||||
, _background_tasks("migration_manager::background_tasks")
|
||||
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss("migration_manager::storage_service"), _gossiper(gossiper), _group0_client(group0_client)
|
||||
, _sys_ks(sysks)
|
||||
, _schema_push([this] { return passive_announce(); })
|
||||
, _concurrent_ddl_retries{10}
|
||||
{
|
||||
|
||||
@@ -57,6 +57,7 @@ private:
|
||||
migration_notifier& _notifier;
|
||||
|
||||
std::unordered_map<locator::host_id, serialized_action> _schema_pulls;
|
||||
serialized_action _group0_barrier;
|
||||
std::vector<gms::feature::listener_registration> _feature_listeners;
|
||||
seastar::named_gate _background_tasks;
|
||||
static const std::chrono::milliseconds migration_delay;
|
||||
@@ -68,7 +69,6 @@ private:
|
||||
seastar::abort_source _as;
|
||||
service::raft_group0_client& _group0_client;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
serialized_action _group0_barrier;
|
||||
serialized_action _schema_push;
|
||||
table_schema_version _schema_version_to_publish;
|
||||
|
||||
|
||||
@@ -123,7 +123,12 @@ utils::small_vector<locator::host_id, N> addr_vector_to_id(const gms::gossiper&
|
||||
// Check the effective replication map consistency:
|
||||
// we have an inconsistent effective replication map in case we the number of
|
||||
// read replicas is higher than the replication factor.
|
||||
[[maybe_unused]] void validate_read_replicas(const locator::effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) {
|
||||
void validate_read_replicas(const locator::effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) {
|
||||
// Skip for non-debug builds.
|
||||
if constexpr (!tools::build_info::is_debug_build()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const sstring error = erm.get_replication_strategy().sanity_check_read_replicas(erm, read_replicas);
|
||||
if (!error.empty()) {
|
||||
on_internal_error(slogger, error);
|
||||
@@ -6967,12 +6972,7 @@ host_id_vector_replica_set storage_proxy::get_endpoints_for_reading(const schema
|
||||
return host_id_vector_replica_set{my_host_id(erm)};
|
||||
}
|
||||
auto endpoints = erm.get_replicas_for_reading(token);
|
||||
// Skip for non-debug builds and maintenance mode.
|
||||
if constexpr (tools::build_info::is_debug_build()) {
|
||||
if (!_db.local().get_config().maintenance_mode()) {
|
||||
validate_read_replicas(erm, endpoints);
|
||||
}
|
||||
}
|
||||
validate_read_replicas(erm, endpoints);
|
||||
auto it = std::ranges::remove_if(endpoints, std::not_fn(std::bind_front(&storage_proxy::is_alive, this, std::cref(erm)))).begin();
|
||||
endpoints.erase(it, endpoints.end());
|
||||
sort_endpoints_by_proximity(erm, endpoints);
|
||||
|
||||
@@ -125,7 +125,6 @@
|
||||
#include "utils/labels.hh"
|
||||
#include "view_info.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "debug.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
@@ -174,10 +173,11 @@ void check_raft_rpc_scheduling_group(const replica::database& db, const gms::fea
|
||||
return;
|
||||
}
|
||||
|
||||
if (current_scheduling_group() != debug::gossip_scheduling_group) {
|
||||
const auto gossip_scheduling_group = db.get_gossip_scheduling_group();
|
||||
if (current_scheduling_group() != gossip_scheduling_group) {
|
||||
on_internal_error_noexcept(
|
||||
slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group, current group is [{}], operation [{}].",
|
||||
current_scheduling_group().name(), rpc_name));
|
||||
slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group [{}], current group is [{}], operation [{}].",
|
||||
gossip_scheduling_group.name(), current_scheduling_group().name(), rpc_name));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,16 +532,9 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
|
||||
}
|
||||
|
||||
static std::unordered_set<locator::host_id> get_released_nodes(const service::topology& topology, const locator::token_metadata& tm) {
|
||||
return boost::join(topology.left_nodes, topology.ignored_nodes)
|
||||
| std::views::transform([] (const auto& raft_id) { return locator::host_id(raft_id.uuid()); })
|
||||
| std::views::filter([&] (const auto& h) { return !tm.get_topology().has_node(h); })
|
||||
| std::ranges::to<std::unordered_set<locator::host_id>>();
|
||||
}
|
||||
|
||||
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal, std::optional<std::unordered_set<locator::host_id>> prev_released) {
|
||||
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal) {
|
||||
nodes_to_notify_after_sync nodes_to_notify;
|
||||
|
||||
rtlogger.trace("Start sync_raft_topology_nodes");
|
||||
@@ -695,10 +688,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
}
|
||||
}
|
||||
|
||||
if (prev_released) {
|
||||
auto nodes_to_release = get_released_nodes(t, *tmptr);
|
||||
std::erase_if(nodes_to_release, [&] (const auto& host_id) { return prev_released->contains(host_id); });
|
||||
std::copy(nodes_to_release.begin(), nodes_to_release.end(), std::back_inserter(nodes_to_notify.released));
|
||||
auto nodes_to_release = t.left_nodes;
|
||||
nodes_to_release.insert(t.ignored_nodes.begin(), t.ignored_nodes.end());
|
||||
for (const auto& id: nodes_to_release) {
|
||||
auto host_id = locator::host_id(id.uuid());
|
||||
if (!tmptr->get_topology().find_node(host_id)) {
|
||||
nodes_to_notify.released.push_back(host_id);
|
||||
}
|
||||
}
|
||||
|
||||
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
|
||||
@@ -736,10 +732,6 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
|
||||
rtlogger.debug("reload raft topology state");
|
||||
std::unordered_set<raft::server_id> prev_normal = _topology_state_machine._topology.normal_nodes | std::views::keys | std::ranges::to<std::unordered_set>();
|
||||
std::optional<std::unordered_set<locator::host_id>> prev_released;
|
||||
if (!_topology_state_machine._topology.is_empty()) {
|
||||
prev_released = get_released_nodes(_topology_state_machine._topology, get_token_metadata());
|
||||
}
|
||||
|
||||
std::unordered_set<locator::host_id> tablet_hosts = co_await replica::read_required_hosts(_qp);
|
||||
|
||||
@@ -840,7 +832,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
}, topology.tstate);
|
||||
tmptr->set_read_new(read_new);
|
||||
|
||||
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal), std::move(prev_released));
|
||||
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));
|
||||
|
||||
std::optional<locator::tablet_metadata> tablets;
|
||||
if (hint.tablets_hint) {
|
||||
@@ -3197,6 +3189,9 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
throw std::runtime_error(
|
||||
"Cannot start in the Raft-based recovery procedure - Raft-based topology has not been enabled");
|
||||
}
|
||||
if (_db.local().get_config().force_gossip_topology_changes()) {
|
||||
throw std::runtime_error("Cannot force gossip topology changes in the Raft-based recovery procedure");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3220,6 +3215,9 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
} else if (_group0->joined_group0()) {
|
||||
// We are a part of group 0.
|
||||
set_topology_change_kind(upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state));
|
||||
if (_db.local().get_config().force_gossip_topology_changes() && raft_topology_change_enabled()) {
|
||||
throw std::runtime_error("Cannot force gossip topology changes - the cluster is using raft-based topology");
|
||||
}
|
||||
slogger.info("The node is already in group 0 and will restart in {} mode", raft_topology_change_enabled() ? "raft" : "legacy");
|
||||
} else if (_sys_ks.local().bootstrap_complete()) {
|
||||
if (co_await _sys_ks.local().load_topology_features_state()) {
|
||||
@@ -3240,8 +3238,13 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
|
||||
if (_group0->load_my_id() == g0_info.id) {
|
||||
// We're creating the group 0.
|
||||
slogger.info("We are creating the group 0. Start in raft topology operations mode");
|
||||
set_topology_change_kind(topology_change_kind::raft);
|
||||
if (_db.local().get_config().force_gossip_topology_changes()) {
|
||||
slogger.info("We are creating the group 0. Start in legacy topology operations mode by force");
|
||||
set_topology_change_kind(topology_change_kind::legacy);
|
||||
} else {
|
||||
slogger.info("We are creating the group 0. Start in raft topology operations mode");
|
||||
set_topology_change_kind(topology_change_kind::raft);
|
||||
}
|
||||
} else {
|
||||
// Ask the current member of the raft group about which mode to use
|
||||
auto params = join_node_query_params {};
|
||||
@@ -3249,6 +3252,9 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
&_messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, std::move(params));
|
||||
switch (result.topo_mode) {
|
||||
case join_node_query_result::topology_mode::raft:
|
||||
if (_db.local().get_config().force_gossip_topology_changes()) {
|
||||
throw std::runtime_error("Cannot force gossip topology changes - joining the cluster that is using raft-based topology");
|
||||
}
|
||||
slogger.info("Will join existing cluster in raft topology operations mode");
|
||||
set_topology_change_kind(topology_change_kind::raft);
|
||||
break;
|
||||
@@ -6269,7 +6275,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
break;
|
||||
case raft_topology_cmd::command::stream_ranges: {
|
||||
co_await with_scheduling_group(_stream_manager.local().get_scheduling_group(), coroutine::lambda([&] () -> future<> {
|
||||
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
|
||||
const auto rs = _topology_state_machine._topology.find(id)->second;
|
||||
auto tstate = _topology_state_machine._topology.tstate;
|
||||
auto session = _topology_state_machine._topology.session;
|
||||
@@ -8425,7 +8431,6 @@ future<> storage_service::start_maintenance_mode() {
|
||||
set_mode(mode::MAINTENANCE);
|
||||
|
||||
return mutate_token_metadata([this] (mutable_token_metadata_ptr token_metadata) -> future<> {
|
||||
token_metadata->update_topology(my_host_id(), _snitch.local()->get_location(), locator::node::state::normal, smp::count);
|
||||
return token_metadata->update_normal_tokens({ dht::token{} }, my_host_id());
|
||||
}, acquire_merge_lock::yes);
|
||||
}
|
||||
|
||||
@@ -1115,7 +1115,7 @@ private:
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
// Optional target_node can be provided to restrict the synchronization to the specified node.
|
||||
// Returns a structure that describes which notifications to trigger after token metadata is updated.
|
||||
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal, std::optional<std::unordered_set<locator::host_id>> prev_released);
|
||||
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
|
||||
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
|
||||
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
|
||||
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
|
||||
|
||||
@@ -1441,43 +1441,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
}
|
||||
}
|
||||
|
||||
void log_active_transitions(size_t max_count) {
|
||||
auto tm = get_token_metadata_ptr();
|
||||
size_t logged_count = 0;
|
||||
size_t total_count = 0;
|
||||
bool should_break = false;
|
||||
for (auto&& [base_table, tables [[maybe_unused]]] : tm->tablets().all_table_groups()) {
|
||||
const auto& tmap = tm->tablets().get_tablet_map(base_table);
|
||||
if (should_break) {
|
||||
total_count += tmap.transitions().size();
|
||||
continue;
|
||||
}
|
||||
for (auto&& [tablet, trinfo]: tmap.transitions()) {
|
||||
total_count++;
|
||||
if (logged_count < max_count) {
|
||||
locator::global_tablet_id gid { base_table, tablet };
|
||||
const auto& tinfo = tmap.get_tablet_info(tablet);
|
||||
// Log only the replicas involved in the transition (leaving/pending)
|
||||
// rather than all replicas, to focus on what's actually changing
|
||||
auto leaving = locator::get_leaving_replica(tinfo, trinfo);
|
||||
auto pending = trinfo.pending_replica;
|
||||
rtlogger.info("Active {} transition: tablet={}, stage={}{}{}",
|
||||
trinfo.transition, gid, trinfo.stage,
|
||||
leaving ? fmt::format(", leaving={}", *leaving) : "",
|
||||
pending ? fmt::format(", pending={}", *pending) : "");
|
||||
logged_count++;
|
||||
if (logged_count >= max_count) {
|
||||
should_break = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (total_count > max_count) {
|
||||
rtlogger.info("... and {} more active transitions", total_count - max_count);
|
||||
}
|
||||
}
|
||||
|
||||
// When "drain" is true, we migrate tablets only as long as there are nodes to drain
|
||||
// and then change the transition state to write_both_read_old. Also, while draining,
|
||||
// we ignore pending topology requests which normally interrupt load balancing.
|
||||
@@ -1902,8 +1865,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
|
||||
.table_uuid = gid.table,
|
||||
};
|
||||
auto request_type = tinfo.repair_task_info.request_type;
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={} request_type={}", dst, gid, request_type);
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
|
||||
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
|
||||
service::session_id::create_random_id() : trinfo->session_id;
|
||||
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
|
||||
@@ -1915,8 +1877,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
entry.timestamp = db_clock::now();
|
||||
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
|
||||
}
|
||||
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={} request_type={}",
|
||||
dst, tablet, duration, res.repair_time, request_type);
|
||||
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
|
||||
dst, tablet, duration, res.repair_time);
|
||||
})) {
|
||||
if (utils::get_local_injector().enter("delay_end_repair_update")) {
|
||||
break;
|
||||
@@ -2063,7 +2025,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
// to check atomically with event.wait()
|
||||
if (!_tablets_ready) {
|
||||
rtlogger.debug("Going to sleep with active tablet transitions");
|
||||
log_active_transitions(5);
|
||||
release_guard(std::move(guard));
|
||||
co_await await_event();
|
||||
}
|
||||
@@ -3735,7 +3696,7 @@ public:
|
||||
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
|
||||
, _cdc_gens(cdc_gens)
|
||||
, _tablet_load_stats_refresh([this] {
|
||||
return with_scheduling_group(_gossiper.get_scheduling_group(), [this] {
|
||||
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
|
||||
return refresh_tablet_load_stats();
|
||||
});
|
||||
})
|
||||
@@ -3915,9 +3876,6 @@ future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
for (auto& [table_id, table_stats] : dc_stats.tables) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
if (!_db.column_family_exists(table_id)) {
|
||||
continue;
|
||||
}
|
||||
auto& t = _db.find_column_family(table_id);
|
||||
auto& rs = t.get_effective_replication_map()->get_replication_strategy();
|
||||
if (!rs.uses_tablets()) {
|
||||
@@ -3941,9 +3899,6 @@ future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
}
|
||||
|
||||
for (auto& [table_id, table_load_stats] : stats.tables) {
|
||||
if (!total_replicas.contains(table_id)) {
|
||||
continue;
|
||||
}
|
||||
auto table_total_replicas = total_replicas.at(table_id);
|
||||
if (table_total_replicas == 0) {
|
||||
continue;
|
||||
|
||||
@@ -196,8 +196,6 @@ public:
|
||||
}
|
||||
|
||||
future<> fail_stream_plan(streaming::plan_id plan_id);
|
||||
|
||||
scheduling_group get_scheduling_group() const noexcept { return _streaming_group; }
|
||||
};
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
1
test.py
1
test.py
@@ -61,6 +61,7 @@ PYTEST_RUNNER_DIRECTORIES = [
|
||||
TEST_DIR / 'raft',
|
||||
TEST_DIR / 'unit',
|
||||
TEST_DIR / 'vector_search',
|
||||
TEST_DIR / 'vector_search_validator',
|
||||
TEST_DIR / 'alternator',
|
||||
TEST_DIR / 'broadcast_tables',
|
||||
TEST_DIR / 'cql',
|
||||
|
||||
@@ -103,6 +103,7 @@ if(BUILD_TESTING)
|
||||
add_subdirectory(raft)
|
||||
add_subdirectory(resource/wasm)
|
||||
add_subdirectory(vector_search)
|
||||
add_subdirectory(vector_search_validator)
|
||||
|
||||
if(CMAKE_CONFIGURATION_TYPES)
|
||||
foreach(config ${CMAKE_CONFIGURATION_TYPES})
|
||||
|
||||
@@ -581,7 +581,8 @@ def test_update_item_many_items_fall_into_appropriate_buckets(dynamodb, test_tab
|
||||
# Verify that only the new item size is counted in the histogram if RBW is
|
||||
# disabled, and both sizes if it is enabled. The WCU is calculated as the
|
||||
# maximum of the old and new item sizes.
|
||||
@pytest.mark.parametrize("force_rbw", [pytest.param(True, marks=pytest.mark.xfail(reason="Updates don't consider the larger of the old item size and the new item size.")), False])
|
||||
@pytest.mark.xfail(reason="Updates don't consider the larger of the old item size and the new item size. This will be fixed in a next PR.")
|
||||
@pytest.mark.parametrize("force_rbw", [True, False])
|
||||
def test_update_item_increases_metrics_for_new_item_size_only(dynamodb, test_table_s, metrics, force_rbw):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_force_read_before_write', str(force_rbw).lower()):
|
||||
if force_rbw:
|
||||
|
||||
@@ -482,7 +482,6 @@ def test_get_records_nonexistent_iterator(dynamodbstreams):
|
||||
# and if in the future we can work around the DynamoDB problem, we can return
|
||||
# these fixtures to module scope.
|
||||
|
||||
@contextmanager
|
||||
def create_table_ss(dynamodb, dynamodbstreams, type):
|
||||
table = create_test_table(dynamodb,
|
||||
Tags=TAGS,
|
||||
@@ -530,23 +529,19 @@ def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams):
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_keys_only(dynamodb, dynamodbstreams):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream:
|
||||
yield stream
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY')
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_new_image(dynamodb, dynamodbstreams):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream:
|
||||
yield stream
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE')
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_old_image(dynamodb, dynamodbstreams):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream:
|
||||
yield stream
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE')
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
||||
yield stream
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
|
||||
@@ -659,17 +654,6 @@ def fetch_more(dynamodbstreams, iterators, output):
|
||||
assert len(set(new_iterators)) == len(new_iterators)
|
||||
return new_iterators
|
||||
|
||||
def print_events(expected_events, output, failed_at=None):
|
||||
if failed_at is None:
|
||||
print(f'compare_events: timeouted')
|
||||
else:
|
||||
print(f'compare_events: failed at output event {failed_at}')
|
||||
for index, event in enumerate(expected_events):
|
||||
expected_type, expected_key, expected_old_image, expected_new_image = event
|
||||
print(f'expected event {index}: type={expected_type}, key={expected_key}, old_image={expected_old_image}, new_image={expected_new_image}')
|
||||
for index, event in enumerate(output):
|
||||
print(f'output event {index}: type={event["eventName"]}, key={event["dynamodb"]["Keys"]}, old_image={event["dynamodb"].get("OldImage")}, new_image={event["dynamodb"].get("NewImage")}')
|
||||
|
||||
# Utility function for comparing "output" as fetched by fetch_more(), to a list
|
||||
# expected_events, each of which looks like:
|
||||
# [type, keys, old_image, new_image]
|
||||
@@ -702,75 +686,70 @@ def compare_events(expected_events, output, mode, expected_region):
|
||||
# Iterate over the events in output. An event for a certain key needs to
|
||||
# be the *first* remaining event for this key in expected_events_map (and
|
||||
# then we remove this matched even from expected_events_map)
|
||||
for e, event in enumerate(output):
|
||||
try:
|
||||
# In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to
|
||||
# a *different* value - 'scylladb:alternator'. Issue #6931.
|
||||
assert 'eventSource' in event
|
||||
# For lack of a direct equivalent of a region, Alternator provides the
|
||||
# DC name instead. Reproduces #6931.
|
||||
assert 'awsRegion' in event
|
||||
assert event['awsRegion'] == expected_region
|
||||
# Reproduces #6931.
|
||||
assert 'eventVersion' in event
|
||||
assert event['eventVersion'] in ['1.0', '1.1']
|
||||
# Check that eventID appears, but can't check much on what it is.
|
||||
assert 'eventID' in event
|
||||
op = event['eventName']
|
||||
record = event['dynamodb']
|
||||
# record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we
|
||||
# want to deserialize it to match our expected_events content.
|
||||
deserializer = TypeDeserializer()
|
||||
key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}
|
||||
expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)
|
||||
assert op == expected_type
|
||||
assert record['StreamViewType'] == mode
|
||||
# We don't know what ApproximateCreationDateTime should be, but we do
|
||||
# know it needs to be a timestamp - there is conflicting documentation
|
||||
# in what format (ISO 8601?). In any case, boto3 parses this timestamp
|
||||
# for us, so we can't check it here, beyond checking it exists.
|
||||
assert 'ApproximateCreationDateTime' in record
|
||||
# We don't know what SequenceNumber is supposed to be, but the DynamoDB
|
||||
# documentation requires that it contains only numeric characters and
|
||||
# some libraries rely on this. This reproduces issue #7158:
|
||||
assert 'SequenceNumber' in record
|
||||
assert record['SequenceNumber'].isdecimal()
|
||||
# Alternator doesn't set the SizeBytes member. Issue #6931.
|
||||
#assert 'SizeBytes' in record
|
||||
if mode == 'KEYS_ONLY':
|
||||
for event in output:
|
||||
# In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to
|
||||
# a *different* value - 'scylladb:alternator'. Issue #6931.
|
||||
assert 'eventSource' in event
|
||||
# For lack of a direct equivalent of a region, Alternator provides the
|
||||
# DC name instead. Reproduces #6931.
|
||||
assert 'awsRegion' in event
|
||||
assert event['awsRegion'] == expected_region
|
||||
# Reproduces #6931.
|
||||
assert 'eventVersion' in event
|
||||
assert event['eventVersion'] in ['1.0', '1.1']
|
||||
# Check that eventID appears, but can't check much on what it is.
|
||||
assert 'eventID' in event
|
||||
op = event['eventName']
|
||||
record = event['dynamodb']
|
||||
# record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we
|
||||
# want to deserialize it to match our expected_events content.
|
||||
deserializer = TypeDeserializer()
|
||||
key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}
|
||||
expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)
|
||||
assert op == expected_type
|
||||
assert record['StreamViewType'] == mode
|
||||
# We don't know what ApproximateCreationDateTime should be, but we do
|
||||
# know it needs to be a timestamp - there is conflicting documentation
|
||||
# in what format (ISO 8601?). In any case, boto3 parses this timestamp
|
||||
# for us, so we can't check it here, beyond checking it exists.
|
||||
assert 'ApproximateCreationDateTime' in record
|
||||
# We don't know what SequenceNumber is supposed to be, but the DynamoDB
|
||||
# documentation requires that it contains only numeric characters and
|
||||
# some libraries rely on this. This reproduces issue #7158:
|
||||
assert 'SequenceNumber' in record
|
||||
assert record['SequenceNumber'].isdecimal()
|
||||
# Alternator doesn't set the SizeBytes member. Issue #6931.
|
||||
#assert 'SizeBytes' in record
|
||||
if mode == 'KEYS_ONLY':
|
||||
assert not 'NewImage' in record
|
||||
assert not 'OldImage' in record
|
||||
elif mode == 'NEW_IMAGE':
|
||||
assert not 'OldImage' in record
|
||||
if expected_new_image == None:
|
||||
assert not 'NewImage' in record
|
||||
assert not 'OldImage' in record
|
||||
elif mode == 'NEW_IMAGE':
|
||||
assert not 'OldImage' in record
|
||||
if expected_new_image == None:
|
||||
assert not 'NewImage' in record
|
||||
else:
|
||||
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
||||
assert expected_new_image == new_image
|
||||
elif mode == 'OLD_IMAGE':
|
||||
assert not 'NewImage' in record
|
||||
if expected_old_image == None:
|
||||
assert not 'OldImage' in record
|
||||
else:
|
||||
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
||||
assert expected_old_image == old_image
|
||||
elif mode == 'NEW_AND_OLD_IMAGES':
|
||||
if expected_new_image == None:
|
||||
assert not 'NewImage' in record
|
||||
else:
|
||||
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
||||
assert expected_new_image == new_image
|
||||
if expected_old_image == None:
|
||||
assert not 'OldImage' in record
|
||||
else:
|
||||
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
||||
assert expected_old_image == old_image
|
||||
else:
|
||||
pytest.fail('cannot happen')
|
||||
except AssertionError:
|
||||
print_events(expected_events, output, failed_at=e)
|
||||
raise
|
||||
|
||||
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
||||
assert expected_new_image == new_image
|
||||
elif mode == 'OLD_IMAGE':
|
||||
assert not 'NewImage' in record
|
||||
if expected_old_image == None:
|
||||
assert not 'OldImage' in record
|
||||
else:
|
||||
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
||||
assert expected_old_image == old_image
|
||||
elif mode == 'NEW_AND_OLD_IMAGES':
|
||||
if expected_new_image == None:
|
||||
assert not 'NewImage' in record
|
||||
else:
|
||||
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
||||
assert expected_new_image == new_image
|
||||
if expected_old_image == None:
|
||||
assert not 'OldImage' in record
|
||||
else:
|
||||
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
||||
assert expected_old_image == old_image
|
||||
else:
|
||||
pytest.fail('cannot happen')
|
||||
# After the above loop, expected_events_map should remain empty arrays.
|
||||
# If it isn't, one of the expected events did not yet happen. Return False.
|
||||
for entry in expected_events_map.values():
|
||||
@@ -799,7 +778,6 @@ def fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_even
|
||||
return
|
||||
time.sleep(0.5)
|
||||
# If we're still here, the last compare_events returned false.
|
||||
print_events(expected_events, output)
|
||||
pytest.fail('missing events in output: {}'.format(output))
|
||||
|
||||
# Convenience function used to implement several tests below. It runs a given
|
||||
@@ -2016,33 +1994,6 @@ def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams):
|
||||
# is in the process of being added
|
||||
wait_for_active_stream(dynamodbstreams, table)
|
||||
|
||||
# In earlier tests, we tested the stream events logged for BatchWriteItem,
|
||||
# but it was usually a single item in the batch or in do_batch_test(),
|
||||
# it was multiple items in different partitions. This test checks the
|
||||
# remaining case, of a batch writing multiple items in one partition -
|
||||
# and checks that the correct events appear for them on the stream.
|
||||
# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt
|
||||
# write isolation mode, which writes all the items in the batch with the
|
||||
# same timestamp. The test is parameterized to try all write isolation
|
||||
# modes, and reproduces #28439 when it failed only in always_use_lwt mode.
|
||||
# This is a Scylla-only test because it checks write isolation modes, which
|
||||
# don't exist in DynamoDB.
|
||||
@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw'])
|
||||
def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
||||
table, stream_arn = stream
|
||||
# Set write isolation mode on the table to the chosen "mode":
|
||||
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
|
||||
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}])
|
||||
# Now try the test, a single BatchWriteItem writing three different
|
||||
# items in the same partition p:
|
||||
def do_updates(table, p, c):
|
||||
cs = [c + '1', c + '2', c + '3']
|
||||
table.meta.client.batch_write_item(RequestItems = {
|
||||
table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]})
|
||||
return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs]
|
||||
do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
# TODO: tests on multiple partitions
|
||||
# TODO: write a test that disabling the stream and re-enabling it works, but
|
||||
# requires the user to wait for the first stream to become DISABLED before
|
||||
|
||||
@@ -679,48 +679,3 @@ def test_create_table_spurious_attribute_definitions(dynamodb):
|
||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' }]) as table:
|
||||
pass
|
||||
|
||||
# DynamoDB supports many different types, but the documentation claims that
|
||||
# for keys, "The only data types allowed for primary key attributes are
|
||||
# string, number, or binary.". We have many tests for these types (and
|
||||
# shared test tables with those key types defined in conftest.py) - in this
|
||||
# test we verify that indeed all other types are NOT allowed - for neither
|
||||
# partition key nor sort key.
|
||||
# See also test_gsi.py::test_gsi_invalid_key_types which checks that the
|
||||
# same types are also forbidden as GSI keys.
|
||||
def test_forbidden_key_types(dynamodb):
|
||||
for t in ['BOOL', 'BS', 'L', 'M', 'NS', 'NULL', 'SS']:
|
||||
# Check that partition key of type t is forbidden.
|
||||
# The specific error message is different in DynamoDB and Alternator,
|
||||
# but both mention the requested type in the message in single quotes.
|
||||
with pytest.raises(ClientError, match=f"ValidationException.*'{t}'"):
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': t}]):
|
||||
pass
|
||||
# Check that sort key of type t is forbidden.
|
||||
with pytest.raises(ClientError, match=f"ValidationException.*'{t}'"):
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'},
|
||||
{'AttributeName': 'c', 'KeyType': 'RANGE'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'},
|
||||
{'AttributeName': 'c', 'AttributeType': t}]):
|
||||
pass
|
||||
|
||||
# Although as we tested in the previous test (test_forbidden_key_types) most
|
||||
# DynamoDB types are not allowed as key types (only S, B and N are allowed),
|
||||
# strangely the GetItem documentation claims that the Key parameter can
|
||||
# actually allow any type. This is a mistake in the documentation - this
|
||||
# test shows that when you try to GetItem with one of the forbidden types,
|
||||
# it fails. Note that actually what both DynamoDB and Alternator test is
|
||||
# whether the Key type is the same as the one in the table's schema - so
|
||||
# because we can't create a table with these types, GetItem with those
|
||||
# types is bound to fail.
|
||||
def test_forbidden_key_types_getitem(test_table_s):
|
||||
for p in [False, {b'hi', b'there'}, ['hi',3], {'hi': 3}, {1,2}, None, {'hi', 'there'}]:
|
||||
# Unfortunately the error message in DynamoDB ("The provided key
|
||||
# element does not match the schema") and Alternator ("Type mismatch:
|
||||
# expected type S for key column p, got type "BOOL") doesn't have
|
||||
# anything in common except the word "match".
|
||||
with pytest.raises(ClientError, match='ValidationException.*match'):
|
||||
test_table_s.get_item(Key={'p': p})
|
||||
|
||||
@@ -51,7 +51,7 @@
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from .util import create_test_table, random_string, new_test_table
|
||||
from .util import create_test_table, random_string
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def all_tests_are_scylla_only(scylla_only):
|
||||
@@ -430,53 +430,3 @@ def test_isolation_updateitem_returnvalues(table_forbid_rmw, tables_permit_rmw):
|
||||
UpdateExpression='SET a = :val',
|
||||
ExpressionAttributeValues={':val': 1},
|
||||
ReturnValues=returnvalues)
|
||||
|
||||
#############################################################################
|
||||
# BatchWriteItem tests.
|
||||
# BatchWriteItem writes are always pure write - never RMW (read-modify-write)
|
||||
# operations - because none of the RMW options are supported: Batch writes
|
||||
# don't support an UpdateExpression, a ConditionExpression or ReturnValues.
|
||||
# Still, even in the pure write case, the write code paths are different for
|
||||
# the different write isolation modes, and we need to exercise them.
|
||||
|
||||
# For completeness, this test exercises a single batch with more than one
|
||||
# partition, more than one clustering key in the same partition, and a
|
||||
# combination of PutRequest and DeleteRequest.
|
||||
def test_isolation_batchwriteitem(dynamodb):
|
||||
# Unfortunately we can't use the four table fixtures that all other tests
|
||||
# use, because those fixtures only have a partition key and we also want
|
||||
# a sort key (so we can test the case of multiple items in the same
|
||||
# partition). So we have to create four new tables just for this test.
|
||||
for mode in ['only_rmw_uses_lwt', 'always_use_lwt', 'unsafe_rmw', 'forbid_rmw']:
|
||||
with new_test_table(dynamodb,
|
||||
Tags=[{'Key': 'system:write_isolation', 'Value': mode}],
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||||
AttributeDefinitions=[
|
||||
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table:
|
||||
p1 = random_string()
|
||||
p2 = random_string()
|
||||
# Set up two items in p1, only one of them will be deleted later
|
||||
table.put_item(Item={'p': p1, 'c': 'item1', 'x': 'hello'})
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item1', 'x': 'hello'}
|
||||
table.put_item(Item={'p': p1, 'c': 'item2', 'x': 'hi'})
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'}
|
||||
# Perform the batch write, writing to two different partitions
|
||||
# (p1 and p2), multiple items in one partition (p1), and
|
||||
# one of the writes is a DeleteRequest (of item1 that we wrote
|
||||
# above).
|
||||
table.meta.client.batch_write_item(RequestItems = {
|
||||
table.name: [
|
||||
{'PutRequest': {'Item': {'p': p1, 'c': 'item3', 'x': 'dog'}}},
|
||||
{'PutRequest': {'Item': {'p': p1, 'c': 'item4', 'x': 'cat'}}},
|
||||
{'DeleteRequest': {'Key': {'p': p1, 'c': 'item1'}}},
|
||||
{'PutRequest': {'Item': {'p': p2, 'c': 'item5', 'x': 'mouse'}}}
|
||||
]})
|
||||
# After the batch write, item1 will be gone, item2..item5 should
|
||||
# exist with the right content.
|
||||
assert 'Item' not in table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'}
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item3'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item3', 'x': 'dog'}
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item4'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item4', 'x': 'cat'}
|
||||
assert table.get_item(Key={'p': p2, 'c': 'item5'}, ConsistentRead=True)['Item'] == {'p': p2, 'c': 'item5', 'x': 'mouse'}
|
||||
|
||||
@@ -391,31 +391,21 @@ SEASTAR_TEST_CASE(select_from_vector_search_system_table) {
|
||||
return do_with_cql_env_thread(
|
||||
[](auto&& env) {
|
||||
create_user_if_not_exists(env, bob);
|
||||
|
||||
// All tables in vector_search_system_resources from client_state.cc
|
||||
const std::vector<sstring> vector_search_system_tables = {
|
||||
"system.group0_history",
|
||||
"system.versions",
|
||||
"system.cdc_streams",
|
||||
"system.cdc_timestamps",
|
||||
};
|
||||
|
||||
// Without VECTOR_SEARCH_INDEXING permission, bob cannot select from these tables
|
||||
for (const auto& table : vector_search_system_tables) {
|
||||
with_user(env, bob, [&env, &table] {
|
||||
BOOST_REQUIRE_EXCEPTION(env.execute_cql(format("SELECT * FROM {}", table)).get(), exceptions::unauthorized_exception,
|
||||
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
|
||||
});
|
||||
}
|
||||
|
||||
with_user(env, bob, [&env] {
|
||||
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.group0_history").get(), exceptions::unauthorized_exception,
|
||||
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
|
||||
});
|
||||
with_user(env, bob, [&env] {
|
||||
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.versions").get(), exceptions::unauthorized_exception,
|
||||
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
|
||||
});
|
||||
cquery_nofail(env, "GRANT VECTOR_SEARCH_INDEXING ON ALL KEYSPACES TO bob");
|
||||
|
||||
// With VECTOR_SEARCH_INDEXING permission, bob can select from these tables
|
||||
for (const auto& table : vector_search_system_tables) {
|
||||
with_user(env, bob, [&env, &table] {
|
||||
cquery_nofail(env, format("SELECT * FROM {}", table));
|
||||
});
|
||||
}
|
||||
with_user(env, bob, [&env] {
|
||||
cquery_nofail(env, "SELECT * FROM system.group0_history");
|
||||
});
|
||||
with_user(env, bob, [&env] {
|
||||
cquery_nofail(env, "SELECT * FROM system.versions");
|
||||
});
|
||||
},
|
||||
db_config_with_auth());
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@
|
||||
#include "test/lib/key_utils.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include "dht/sharder.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "replica/cell_locking.hh"
|
||||
@@ -70,8 +69,6 @@
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(mutation_reader_test)
|
||||
|
||||
namespace test_label = boost::unit_test;
|
||||
|
||||
static schema_ptr make_schema() {
|
||||
return schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
@@ -1242,7 +1239,7 @@ SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) {
|
||||
}
|
||||
|
||||
// Best run with SMP >= 2
|
||||
SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source, *test_label::label("nightly")) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
|
||||
if (smp::count < 2) {
|
||||
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
|
||||
return;
|
||||
|
||||
@@ -14,11 +14,15 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_service_level_to_user(request, manager: ManagerClient):
|
||||
async def __test_attach_service_level_to_user(request, manager: ManagerClient, is_raft: bool):
|
||||
user = f"test_user_{unique_name()}"
|
||||
|
||||
# Start nodes with correct topology
|
||||
servers = await manager.servers_add(3, config=auth_config)
|
||||
if is_raft:
|
||||
servers = await manager.servers_add(3, config=auth_config)
|
||||
else:
|
||||
conf = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
servers = [await manager.server_add(config=conf) for _ in range(3)]
|
||||
|
||||
cql = manager.get_cql()
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
@@ -42,9 +46,28 @@ async def test_attach_service_level_to_user(request, manager: ManagerClient):
|
||||
for sl in sls:
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL {sl} TO {user}")
|
||||
|
||||
#if we are not using raft we have to switch the tenant and wait for it to take effect
|
||||
if not is_raft:
|
||||
for ip in ips:
|
||||
await manager.api.client.post('/service_levels/switch_tenants', host=ip)
|
||||
# Switching tenants may be blocked if a connection is waiting for a request (see 'generic_server::connection::process_until_tenant_switch()').
|
||||
# Execute enough cheap statements, so that connection on each shard will process at one statement and update its tenant.
|
||||
for _ in range(100):
|
||||
read_barrier(manager.api, ip)
|
||||
|
||||
assert verify_service_level(sl), f"All connections should be in {sl} service level"
|
||||
await cql.run_async(f"DETACH SERVICE LEVEL FROM {user}")
|
||||
|
||||
await cql.run_async(f"DROP ROLE {user}")
|
||||
for sl in sls:
|
||||
await cql.run_async(f"DROP SERVICE LEVEL {sl}")
|
||||
await cql.run_async(f"DROP SERVICE LEVEL {sl}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_service_level_with_raft(request, manager: ManagerClient):
|
||||
await __test_attach_service_level_to_user(request, manager, is_raft=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_service_level_with_gossip(request, manager: ManagerClient):
|
||||
await __test_attach_service_level_to_user(request, manager, is_raft=False)
|
||||
@@ -146,6 +146,47 @@ async def check_auth_v2_works(manager: ManagerClient, hosts):
|
||||
await asyncio.gather(*(cql.run_async(f"LIST ROLES OF {username}", host=host) for host in hosts))
|
||||
await cql.run_async(f"DROP ROLE {username}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_v2_migration(request, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
cfg.pop('force_gossip_topology_changes')
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
await populate_auth_v1_data(manager)
|
||||
await warmup_v1_static_values(manager, hosts)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking migrated data in system")
|
||||
await check_auth_v2_data_migration(manager, hosts)
|
||||
|
||||
logging.info("Checking auth statements after migration")
|
||||
await check_auth_v2_works(manager, hosts)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_v2_during_recovery(manager: ManagerClient):
|
||||
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.
|
||||
|
||||
@@ -62,6 +62,127 @@ async def test_service_levels_snapshot(manager: ManagerClient):
|
||||
|
||||
assert set([sl.service_level for sl in result]) == set([sl.service_level for sl in new_result])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_service_levels_upgrade(request, manager: ManagerClient, build_mode: str):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
cfg.pop('force_gossip_topology_changes')
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
cql = manager.get_cql()
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
sls = ["sl" + unique_name() for _ in range(5)]
|
||||
for sl in sls:
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl}")
|
||||
|
||||
result = await cql.run_async("SELECT service_level FROM system_distributed.service_levels")
|
||||
assert set([sl.service_level for sl in result]) == set(sls)
|
||||
|
||||
if build_mode in ("debug", "dev"):
|
||||
# See scylladb/scylladb/#24963 for more details
|
||||
logging.info("Enabling an error injection in legacy role manager, to check that we don't query auth in system_auth")
|
||||
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, "standard_role_manager_fail_legacy_query", one_shot=False) for s in servers))
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
await wait_until_driver_service_level_created(manager, time.time() + 60)
|
||||
|
||||
result_v2 = await cql.run_async("SELECT service_level FROM system.service_levels_v2")
|
||||
assert set([sl.service_level for sl in result_v2]) == set(sls + [DRIVER_SL_NAME])
|
||||
|
||||
sl_v2 = "sl" + unique_name()
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl_v2}")
|
||||
|
||||
await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts))
|
||||
result_with_sl_v2 = await cql.run_async(f"SELECT service_level FROM system.service_levels_v2")
|
||||
assert set([sl.service_level for sl in result_with_sl_v2]) == set(sls + [DRIVER_SL_NAME] + [sl_v2])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_service_levels_work_during_recovery(manager: ManagerClient):
|
||||
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.
|
||||
servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Creating a bunch of service levels")
|
||||
sls = ["sl" + unique_name() for _ in range(5)]
|
||||
for sl in sls:
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl}")
|
||||
|
||||
# insert a service levels into old table as if it was created before upgrade to v2 and later removed after upgrade
|
||||
sl_v1 = "sl" + unique_name()
|
||||
await cql.run_async(f"INSERT INTO system_distributed.service_levels (service_level) VALUES ('{sl_v1}')")
|
||||
|
||||
logging.info("Validating service levels were created in v2 table")
|
||||
result = await cql.run_async("SELECT service_level FROM system.service_levels_v2")
|
||||
for sl in result:
|
||||
assert sl.service_level in sls + [DRIVER_SL_NAME]
|
||||
|
||||
logging.info(f"Restarting hosts {hosts} in recovery mode")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await manager.rolling_restart(servers)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Checking service levels can be read and v2 table is used")
|
||||
recovery_result = await cql.run_async("LIST ALL SERVICE LEVELS")
|
||||
assert sl_v1 not in [sl.service_level for sl in recovery_result]
|
||||
assert set([sl.service_level for sl in recovery_result]) == set(sls + [DRIVER_SL_NAME])
|
||||
|
||||
logging.info("Checking changes to service levels are forbidden during recovery")
|
||||
with pytest.raises(InvalidRequest, match="The cluster is in recovery mode. Changes to service levels are not allowed."):
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL sl_{unique_name()}")
|
||||
with pytest.raises(InvalidRequest, match="The cluster is in recovery mode. Changes to service levels are not allowed."):
|
||||
await cql.run_async(f"ALTER SERVICE LEVEL {sls[0]} WITH timeout = 1h")
|
||||
with pytest.raises(InvalidRequest, match="The cluster is in recovery mode. Changes to service levels are not allowed."):
|
||||
await cql.run_async(f"DROP SERVICE LEVEL {sls[0]}")
|
||||
|
||||
logging.info("Restoring cluster to normal status")
|
||||
await asyncio.gather(*(delete_raft_topology_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
await manager.rolling_restart(servers)
|
||||
cql = await reconnect_driver(manager)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
await manager.servers_see_each_other(servers)
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
await wait_until_driver_service_level_created(manager, time.time() + 60)
|
||||
|
||||
logging.info("Validating service levels works in v2 mode after leaving recovery")
|
||||
new_sl = "sl" + unique_name()
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {new_sl}")
|
||||
|
||||
sls_list = await cql.run_async("LIST ALL SERVICE LEVELS")
|
||||
assert sl_v1 not in [sl.service_level for sl in sls_list]
|
||||
assert set([sl.service_level for sl in sls_list]) == set(sls + [new_sl] + [DRIVER_SL_NAME])
|
||||
|
||||
def default_timeout(mode):
|
||||
if mode == "dev":
|
||||
return "30s"
|
||||
@@ -263,6 +384,50 @@ async def test_shares_check(manager: ManagerClient):
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl2} WITH shares=500")
|
||||
await cql.run_async(f"ALTER SERVICE LEVEL {sl1} WITH shares=100")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injection is not supported in release mode')
|
||||
async def test_workload_prioritization_upgrade(manager: ManagerClient):
|
||||
# This test simulates OSS->enterprise upgrade in v1 service levels.
|
||||
# Using error injection, the test disables WORKLOAD_PRIORITIZATION feature
|
||||
# and removes `shares` column from system_distributed.service_levels table.
|
||||
config = {
|
||||
**auth_config,
|
||||
'authenticator': 'AllowAllAuthenticator',
|
||||
'authorizer': 'AllowAllAuthorizer',
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'suppress_features',
|
||||
'value': 'WORKLOAD_PRIORITIZATION'
|
||||
},
|
||||
{
|
||||
'name': 'service_levels_v1_table_without_shares'
|
||||
}
|
||||
]
|
||||
}
|
||||
servers = [await manager.server_add(config=config) for _ in range(3)]
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Validate that service levels' table has no `shares` column
|
||||
sl_schema = await cql.run_async("DESC TABLE system_distributed.service_levels")
|
||||
assert "shares int" not in sl_schema[0].create_statement
|
||||
with pytest.raises(InvalidRequest):
|
||||
await cql.run_async("CREATE SERVICE LEVEL sl1 WITH shares = 100")
|
||||
|
||||
# Do rolling restart of the cluster and remove error injections
|
||||
for server in servers:
|
||||
await manager.server_update_config(server.server_id, 'error_injections_at_startup', [])
|
||||
await manager.rolling_restart(servers)
|
||||
|
||||
# Validate that `shares` column was added
|
||||
logs = [await manager.server_open_log(server.server_id) for server in servers]
|
||||
await logs[0].wait_for("Workload prioritization v1 started|Workload prioritization v1 is already started", timeout=10)
|
||||
sl_schema_upgraded = await cql.run_async("DESC TABLE system_distributed.service_levels")
|
||||
assert "shares int" in sl_schema_upgraded[0].create_statement
|
||||
await cql.run_async("CREATE SERVICE LEVEL sl2 WITH shares = 100")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injection is disabled in release mode')
|
||||
async def test_service_levels_over_limit(manager: ManagerClient):
|
||||
|
||||
@@ -13,6 +13,7 @@ class DTestConfig:
|
||||
self.num_tokens = -1
|
||||
self.experimental_features = []
|
||||
self.tablets = False
|
||||
self.force_gossip_topology_changes = False
|
||||
self.scylla_features = set()
|
||||
|
||||
def setup(self, request):
|
||||
@@ -20,6 +21,7 @@ class DTestConfig:
|
||||
self.num_tokens = request.config.getoption("--num-tokens")
|
||||
self.experimental_features = request.config.getoption("--experimental-features") or set()
|
||||
self.tablets = request.config.getoption("--tablets", default=False)
|
||||
self.force_gossip_topology_changes = request.config.getoption("--force-gossip-topology-changes", default=False)
|
||||
self.scylla_features = request.config.scylla_features
|
||||
|
||||
@property
|
||||
|
||||
@@ -526,6 +526,10 @@ class DTestSetup:
|
||||
experimental_features.append(f)
|
||||
self.scylla_features |= set(values.get("experimental_features", []))
|
||||
|
||||
if self.dtest_config.force_gossip_topology_changes:
|
||||
logger.debug("Forcing gossip topology changes")
|
||||
values["force_gossip_topology_changes"] = True
|
||||
|
||||
logger.debug("Setting 'enable_tablets' to %s", self.dtest_config.tablets)
|
||||
values["enable_tablets"] = self.dtest_config.tablets
|
||||
values["tablets_mode_for_new_keyspaces"] = "enabled" if self.dtest_config.tablets else "disabled"
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from cassandra.cluster import Session
|
||||
from cassandra.protocol import ConfigurationException, InvalidRequest
|
||||
|
||||
from dtest_class import Tester
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_ks_and_assert_warning(session, query, ks_name, key_warn_msg_words):
|
||||
ret = session.execute_async(query)
|
||||
_ = ret.result()
|
||||
found = False
|
||||
if len(key_warn_msg_words) > 0:
|
||||
assert len(ret.warnings) >= 1, "Expected RF guardrail warning"
|
||||
for warning in ret.warnings:
|
||||
found = found or all(word in warning.lower() for word in key_warn_msg_words)
|
||||
assert found, "Didn't match all required keywords"
|
||||
session.execute(f"USE {ks_name}")
|
||||
|
||||
|
||||
def assert_creating_ks_fails(session, query, ks_name):
|
||||
with pytest.raises(ConfigurationException):
|
||||
session.execute(query)
|
||||
with pytest.raises(InvalidRequest):
|
||||
session.execute(f"USE {ks_name}")
|
||||
|
||||
|
||||
@pytest.mark.next_gating
|
||||
class TestGuardrails(Tester):
|
||||
def test_default_rf(self):
|
||||
"""
|
||||
As of now, the only RF guardrail enabled is a soft limit checking that RF >= 3. Not complying to this soft limit
|
||||
results in a CQL being executed, but with a warning. Also, whatever the guardrails' values, RF = 0 is always OK.
|
||||
"""
|
||||
cluster = self.cluster
|
||||
|
||||
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
|
||||
# we'll get a different error, so let's disable it for now. For more context, see issues:
|
||||
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
|
||||
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
|
||||
|
||||
cluster.populate([1, 1, 1]).start(wait_other_notice=True)
|
||||
session_dc1: Session = self.patient_cql_connection(cluster.nodelist()[0])
|
||||
|
||||
ks_name = "ks"
|
||||
rf = {"dc1": 2, "dc2": 3, "dc3": 0}
|
||||
query = "CREATE KEYSPACE %s WITH REPLICATION={%s}"
|
||||
options = ", ".join(["'%s':%d" % (dc_value, rf_value) for dc_value, rf_value in rf.items()])
|
||||
query = query % (ks_name, "'class':'NetworkTopologyStrategy', %s" % options)
|
||||
create_ks_and_assert_warning(session_dc1, query, ks_name, ["warn", "min", "replication", "factor", "3", "dc1", "2"])
|
||||
|
||||
def test_all_rf_limits(self):
|
||||
"""
|
||||
There're 4 limits for RF: soft/hard min and soft/hard max limits. Breaking soft limits issues a warning,
|
||||
breaking the hard limits prevents the query from being executed.
|
||||
"""
|
||||
cluster = self.cluster
|
||||
|
||||
MIN_FAIL_THRESHOLD = 2
|
||||
MIN_WARN_THRESHOLD = 3
|
||||
MAX_WARN_THRESHOLD = 4
|
||||
MAX_FAIL_THRESHOLD = 5
|
||||
|
||||
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
|
||||
# we'll get a different error, so let's disable it for now. For more context, see issues:
|
||||
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
|
||||
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
|
||||
|
||||
cluster.set_configuration_options(
|
||||
values={
|
||||
"minimum_replication_factor_fail_threshold": MIN_FAIL_THRESHOLD, "minimum_replication_factor_warn_threshold": MIN_WARN_THRESHOLD, "maximum_replication_factor_warn_threshold": MAX_WARN_THRESHOLD,
|
||||
"maximum_replication_factor_fail_threshold": MAX_FAIL_THRESHOLD
|
||||
}
|
||||
)
|
||||
|
||||
query = "CREATE KEYSPACE %s WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1': %s}"
|
||||
cluster.populate([1]).start()
|
||||
node = cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node)
|
||||
|
||||
def test_rf(rf):
|
||||
ks_name = f"ks_{rf}"
|
||||
if rf < MIN_FAIL_THRESHOLD or rf > MAX_FAIL_THRESHOLD:
|
||||
assert_creating_ks_fails(session, query % (ks_name, rf), ks_name)
|
||||
elif rf < MIN_WARN_THRESHOLD:
|
||||
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "min", "replication", "factor", str(MIN_WARN_THRESHOLD), "dc1", "2"])
|
||||
elif rf > MAX_WARN_THRESHOLD:
|
||||
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "max", "replication", "factor", str(MAX_WARN_THRESHOLD), "dc1", "5"])
|
||||
else:
|
||||
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, [])
|
||||
|
||||
for rf in range(MIN_FAIL_THRESHOLD - 1, MAX_FAIL_THRESHOLD + 1):
|
||||
test_rf(rf)
|
||||
@@ -33,7 +33,8 @@ logger = logging.getLogger(__name__)
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_mv_topology_change(manager: ManagerClient):
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
cfg = {'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
'error_injections_at_startup': ['delay_before_get_view_natural_endpoint']}
|
||||
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
|
||||
@@ -9,6 +9,8 @@ extra_scylla_config_options:
|
||||
rf_rack_valid_keyspaces: True
|
||||
tablets_mode_for_new_keyspaces: enabled
|
||||
run_first:
|
||||
- test_raft_recovery_stuck
|
||||
- test_raft_recovery_basic
|
||||
- test_group0_schema_versioning
|
||||
- test_tablets_migration
|
||||
- test_zero_token_nodes_topology_ops
|
||||
@@ -37,6 +39,7 @@ run_in_dev:
|
||||
- test_raft_ignore_nodes
|
||||
- test_group0_schema_versioning
|
||||
- test_different_group0_ids
|
||||
- test_replace_ignore_nodes
|
||||
- test_zero_token_nodes_no_replication
|
||||
- test_not_enough_token_owners
|
||||
- test_replace_alive_node
|
||||
@@ -48,6 +51,5 @@ run_in_dev:
|
||||
- dtest/commitlog_test
|
||||
- dtest/cfid_test
|
||||
- dtest/rebuild_test
|
||||
- dtest/guardrails_test
|
||||
run_in_debug:
|
||||
- random_failures/test_random_failures
|
||||
|
||||
52
test/cluster/test_boot_after_ip_change.py
Normal file
52
test/cluster/test_boot_after_ip_change.py
Normal file
@@ -0,0 +1,52 @@
|
||||
#
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import wait_for_token_ring_and_group0_consistency
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_boot_after_ip_change(manager: ManagerClient) -> None:
|
||||
"""Bootstrap a new node after existing one changed its IP.
|
||||
Regression test for #14468. Does not apply to Raft-topology mode.
|
||||
"""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
logger.info(f"Stopping server {servers[1]}")
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
logger.info(f"Changing IP of server {servers[1]}")
|
||||
new_ip = await manager.server_change_ip(servers[1].server_id)
|
||||
servers[1] = servers[1]._replace(ip_addr = new_ip)
|
||||
logger.info(f"New IP: {new_ip}")
|
||||
|
||||
logger.info(f"Restarting server {servers[1]}")
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
# We need to do this wait before we boot a new node.
|
||||
# Otherwise the newly booting node may contact servers[0] even before servers[0]
|
||||
# saw the new IP of servers[1], and then the booting node will try to wait
|
||||
# for servers[1] to be alive using its old IP (and eventually time out).
|
||||
#
|
||||
# Note that this still acts as a regression test for #14468.
|
||||
# In #14468, the problem was that a booting node would try to wait for the old IP
|
||||
# of servers[0] even after all existing servers saw the IP change.
|
||||
logger.info(f"Wait until {servers[0]} sees the new IP of {servers[1]}")
|
||||
await manager.server_sees_other_server(servers[0].ip_addr, servers[1].ip_addr)
|
||||
|
||||
logger.info(f"Booting new node")
|
||||
await manager.server_add(config=cfg)
|
||||
@@ -13,7 +13,6 @@ from test.pylib.util import wait_for_first_completed
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.xfail(reason="gossiper topology mode is no longer supported, need to rewrite the test using raft topology")
|
||||
async def test_different_group0_ids(manager: ManagerClient):
|
||||
"""
|
||||
The reproducer for #14448.
|
||||
|
||||
21
test/cluster/test_gossip_boot.py
Normal file
21
test/cluster/test_gossip_boot.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import pytest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_gossip_boot(manager: ManagerClient):
|
||||
"""
|
||||
Regression test for scylladb/scylladb#17493.
|
||||
"""
|
||||
|
||||
cfg = {'error_injections_at_startup': ['gossiper_replicate_sleep'],
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg, timeout=60) for _ in range(3)]
|
||||
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
||||
|
||||
for log in logs:
|
||||
for s in servers:
|
||||
await log.wait_for(f'handle_state_normal for {s.ip_addr}.*finished', timeout=60)
|
||||
358
test/cluster/test_group0_schema_versioning.py
Normal file
358
test/cluster/test_group0_schema_versioning.py
Normal file
@@ -0,0 +1,358 @@
|
||||
#
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import asyncio
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
import re
|
||||
from uuid import UUID
|
||||
|
||||
from cassandra.cluster import Session, ConsistencyLevel # type: ignore
|
||||
from cassandra.query import SimpleStatement # type: ignore
|
||||
from cassandra.pool import Host # type: ignore
|
||||
|
||||
from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.pylib.log_browsing import ScyllaLogFile
|
||||
from test.cluster.util import reconnect_driver, wait_until_upgrade_finishes, \
|
||||
enter_recovery_state, delete_raft_data_and_upgrade_state, new_test_keyspace
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
|
||||
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
|
||||
assert(rs)
|
||||
return rs[0].schema_version
|
||||
|
||||
|
||||
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
|
||||
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
|
||||
if rs:
|
||||
return UUID(rs[0].value)
|
||||
return None
|
||||
|
||||
|
||||
async def get_scylla_tables_versions(cql: Session, h: Host) -> list[tuple[str, str, UUID | None]]:
|
||||
rs = await cql.run_async("select keyspace_name, table_name, version from system_schema.scylla_tables", host=h)
|
||||
return [(r.keyspace_name, r.table_name, r.version) for r in rs]
|
||||
|
||||
|
||||
async def get_scylla_tables_version(cql: Session, h: Host, keyspace_name: str, table_name: str) -> UUID | None:
|
||||
rs = await cql.run_async(
|
||||
f"select version from system_schema.scylla_tables"
|
||||
f" where keyspace_name = '{keyspace_name}' and table_name = '{table_name}'",
|
||||
host=h)
|
||||
if not rs:
|
||||
pytest.fail(f"No scylla_tables row found for {keyspace_name}.{table_name}")
|
||||
return rs[0].version
|
||||
|
||||
|
||||
async def verify_local_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
||||
async def check():
|
||||
versions = {h: await get_local_schema_version(cql, h) for h in hs}
|
||||
logger.info(f"system.local schema_versions: {versions}")
|
||||
h1, v1 = next(iter(versions.items()))
|
||||
for h, v in versions.items():
|
||||
if v != v1:
|
||||
logger.info(f"{h1}'s system.local schema_version {v1} is different than {h}'s version {v}; retrying")
|
||||
return None
|
||||
return True
|
||||
await wait_for(check, deadline=time.time() + 5.0, period=1.0)
|
||||
|
||||
|
||||
async def verify_group0_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
||||
versions = {h: await get_group0_schema_version(cql, h) for h in hs}
|
||||
logger.info(f"system.scylla_local group0_schema_versions: {versions}")
|
||||
h1, v1 = next(iter(versions.items()))
|
||||
for h, v in versions.items():
|
||||
if v != v1:
|
||||
pytest.fail(f"{h1}'s system.scylla_local group0_schema_version {v1} is different than {h}'s version {v}")
|
||||
|
||||
|
||||
async def verify_scylla_tables_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool) -> None:
|
||||
versions = {h: set(await get_scylla_tables_versions(cql, h)) for h in hs}
|
||||
logger.info(f"system_schema.scylla_tables: {versions}")
|
||||
h1, v1 = next(iter(versions.items()))
|
||||
for h, v in versions.items():
|
||||
diff = v.symmetric_difference(v1)
|
||||
if ignore_system_tables:
|
||||
diff = {(k, t, v) for k, t, v in diff if k != "system"}
|
||||
if diff:
|
||||
pytest.fail(f"{h1}'s system_schema.scylla_tables contents is different than {h}'s, symmetric diff: {diff}")
|
||||
|
||||
|
||||
async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool = False) -> None:
|
||||
logger.info("Verifying that versions stored in tables are in sync")
|
||||
await verify_group0_schema_versions_synced(cql, hs)
|
||||
await verify_local_schema_versions_synced(cql, hs)
|
||||
await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables)
|
||||
|
||||
|
||||
async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int], table):
|
||||
"""
|
||||
Assumes that `logs` are log files of servers `srvs`, correspondingly in order.
|
||||
Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order.
|
||||
Assumes that an 'alter table {table} ...' statement was performed after obtaining `marks`.
|
||||
Checks that every server printed the same version in `Altering {table}...' log message.
|
||||
"""
|
||||
logger.info("Verifying that in-memory table schema versions are in sync")
|
||||
matches = [await log.grep(f"Altering {table}.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)]
|
||||
|
||||
def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]):
|
||||
if not matches:
|
||||
pytest.fail(f"Server {srv} didn't log 'Altering' message")
|
||||
_, match = matches[0]
|
||||
return UUID(match.group(1))
|
||||
|
||||
versions = {srv: get_version(srv, m) for srv, m in zip(srvs, matches)}
|
||||
logger.info(f"In-memory table versions: {versions}")
|
||||
|
||||
s1, v1 = next(iter(versions.items()))
|
||||
for s, v in versions.items():
|
||||
if v != v1:
|
||||
pytest.fail(f"{s1}'s in-memory table version {v1} is different than {s}'s version {v}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schema_versioning_with_recovery(manager: ManagerClient):
|
||||
"""
|
||||
Perform schema changes while mixing nodes in RECOVERY mode with nodes in group 0 mode.
|
||||
Schema changes originating from RECOVERY node use digest-based schema versioning.
|
||||
Schema changes originating from group 0 nodes use persisted versions committed through group 0.
|
||||
|
||||
Verify that schema versions are in sync after each schema change.
|
||||
"""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
logger.info("Booting cluster")
|
||||
# Must bootstrap sequentially because of gossip topology changes
|
||||
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Creating keyspace and table")
|
||||
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name:
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
table_name = "t"
|
||||
table = f"{ks_name}.{table_name}"
|
||||
await cql.run_async(f"create table {table} (pk int primary key)")
|
||||
|
||||
logger.info("Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
||||
assert ks_t_version
|
||||
|
||||
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
||||
marks = [await log.mark() for log in logs]
|
||||
|
||||
logger.info("Altering table")
|
||||
await cql.run_async(f"alter table {table} with comment = ''")
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await verify_in_memory_table_versions(servers, logs, marks, table)
|
||||
|
||||
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
||||
assert new_ks_t_version
|
||||
assert new_ks_t_version != ks_t_version
|
||||
ks_t_version = new_ks_t_version
|
||||
|
||||
# We still have a group 0 majority, don't do this at home.
|
||||
srv1 = servers[0]
|
||||
logger.info(f"Rebooting {srv1} in RECOVERY mode")
|
||||
h1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
||||
await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1)
|
||||
await manager.server_restart(srv1.server_id)
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
logger.info(f"Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
|
||||
# We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode.
|
||||
# Don't do this at home.
|
||||
#
|
||||
# Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything:
|
||||
# the RECOVERY node is operating using the old schema change procedure, which means
|
||||
# that it pushes the schema mutations to other nodes directly with RPC, modifying
|
||||
# the group 0 state machine on other two nodes.
|
||||
#
|
||||
# There is one problem with this however. If the RECOVERY node considers some other node
|
||||
# as DOWN, it will silently *not* push the schema change, completing the operation
|
||||
# "successfully" nevertheless (it will return to the driver without error).
|
||||
# Usually in this case we rely on eventual convergence of schema through gossip,
|
||||
# which will not happen here, because the group 0 nodes are not doing schema pulls!
|
||||
# So we need to make sure that the RECOVERY node sees the other nodes as UP before
|
||||
# we perform the schema change, so it pushes the mutations to them.
|
||||
logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP")
|
||||
await manager.server_sees_others(srv1.server_id, 2)
|
||||
|
||||
marks = [await log.mark() for log in logs]
|
||||
logger.info(f"Altering table on RECOVERY node ({srv1})")
|
||||
await cql.run_async(f"alter table {table} with comment = ''", host=h1)
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await verify_in_memory_table_versions(servers, logs, marks, table)
|
||||
|
||||
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
||||
assert not new_ks_t_version
|
||||
ks_t_version = new_ks_t_version
|
||||
|
||||
logger.info(f"Stopping {srv1} gracefully")
|
||||
await manager.server_stop_gracefully(srv1.server_id)
|
||||
|
||||
srv2 = servers[1]
|
||||
logger.info(f"Waiting until {srv2} sees {srv1} as dead")
|
||||
await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
||||
|
||||
# Now we modify schema through group 0 while the RECOVERY node is dead.
|
||||
# Don't do this at home.
|
||||
marks = [await log.mark() for log in logs]
|
||||
h2 = next(h for h in hosts if h.address == srv2.ip_addr)
|
||||
logger.info(f"Altering table on group 0 node {srv2}")
|
||||
await cql.run_async(f"alter table {table} with comment = ''", host=h2)
|
||||
|
||||
await manager.server_start(srv1.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
logger.info(f"Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info(f"Waiting until {srv2} sees {srv1} as UP")
|
||||
await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
||||
|
||||
# The RECOVERY node will pull schema when it gets a write.
|
||||
# The other group 0 node will do a barrier so it will also sync schema before the write returns.
|
||||
logger.info("Forcing schema sync through CL=ALL INSERT")
|
||||
await cql.run_async(SimpleStatement(f"insert into {table} (pk) values (0)", consistency_level=ConsistencyLevel.ALL),
|
||||
host=h2)
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await verify_in_memory_table_versions(servers, logs, marks, table)
|
||||
|
||||
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
||||
assert new_ks_t_version
|
||||
ks_t_version = new_ks_t_version
|
||||
|
||||
srv3 = servers[2]
|
||||
h3 = next(h for h in hosts if h.address == srv3.ip_addr)
|
||||
logger.info("Finishing recovery")
|
||||
for h in [h2, h3]:
|
||||
await cql.run_async(
|
||||
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h)
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3]))
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
logger.info("Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
for h in [h1, h2, h3]:
|
||||
await delete_raft_data_and_upgrade_state(cql, h)
|
||||
|
||||
logger.info("Restarting servers")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
logger.info("Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Waiting until upgrade finishes")
|
||||
for h in [h1, h2, h3]:
|
||||
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
|
||||
for change in [
|
||||
f"alter table {table} with comment = ''",
|
||||
f"alter table {table} add v int",
|
||||
f"alter table {table} alter v type blob"]:
|
||||
|
||||
marks = [await log.mark() for log in logs]
|
||||
logger.info(f"Altering table with \"{change}\"")
|
||||
await cql.run_async(change)
|
||||
|
||||
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
||||
assert new_ks_t_version
|
||||
assert new_ks_t_version != ks_t_version
|
||||
ks_t_version = new_ks_t_version
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await verify_in_memory_table_versions(servers, logs, marks, table)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upgrade(manager: ManagerClient):
|
||||
"""
|
||||
This test uses the gossip-based recovery procedure.
|
||||
|
||||
While Raft is disabled, we use digest-based schema versioning.
|
||||
Once Raft upgrade is complete, we use persisted versions committed through group 0.
|
||||
"""
|
||||
# Raft upgrade tests had to be replaced with recovery tests (scylladb/scylladb#16192)
|
||||
# as prerequisite for getting rid of `consistent_cluster_management` flag.
|
||||
# So we do the same here: start a cluster in Raft mode, then enter recovery
|
||||
# to simulate a non-Raft cluster.
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
logger.info("Booting cluster")
|
||||
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
||||
cql = manager.get_cql()
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts} and restarting")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Creating keyspace and table")
|
||||
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks_name:
|
||||
table = f"{ks_name}.t"
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await cql.run_async(f"create table {table} (pk int primary key)")
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {servers}")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logger.info("Waiting for driver")
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Waiting until Raft upgrade procedure finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
||||
|
||||
marks = [await log.mark() for log in logs]
|
||||
logger.info("Altering table")
|
||||
await cql.run_async(f"alter table {table} with comment = ''")
|
||||
|
||||
await verify_table_versions_synced(cql, hosts)
|
||||
await verify_in_memory_table_versions(servers, logs, marks, table)
|
||||
|
||||
# `group0_schema_version` should be present
|
||||
# and the version column for `{table}` should be non-null.
|
||||
for h in hosts:
|
||||
logger.info(f"Checking that `group0_schema_version` is set on {h}")
|
||||
assert (await get_group0_schema_version(cql, h)) is not None
|
||||
|
||||
for h in hosts:
|
||||
logger.info(f"Checking that `version` column for `{table}` is set on {h}")
|
||||
versions = await get_scylla_tables_versions(cql, h)
|
||||
for ks, _, v in versions:
|
||||
if ks == "ks":
|
||||
assert v is not None
|
||||
@@ -21,6 +21,13 @@ async def test_create_keyspace_with_default_replication_factor(manager: ManagerC
|
||||
def get_pf(dc: str, rack: str) -> dict:
|
||||
return {'dc': dc, 'rack': rack}
|
||||
|
||||
logging.info("Trying to add a zero-token server in the gossip-based topology")
|
||||
await manager.server_add(config={'join_ring': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'},
|
||||
property_file={'dc': 'dc1', 'rack': 'rz'},
|
||||
expected_error='the raft-based topology is disabled')
|
||||
|
||||
normal_cfg = {
|
||||
'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled',
|
||||
'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces
|
||||
|
||||
@@ -7,19 +7,16 @@
|
||||
from cassandra.protocol import ConfigurationException
|
||||
from cassandra.connection import UnixSocketEndPoint
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.conftest import cluster_con
|
||||
from test.pylib.util import gather_safely, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import create_new_test_keyspace
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import new_test_keyspace
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
from typing import TypeAlias
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -28,166 +25,80 @@ async def test_maintenance_mode(manager: ManagerClient):
|
||||
"""
|
||||
The test checks that in maintenance mode server A is not available for other nodes and for clients.
|
||||
It is possible to connect by the maintenance socket to server A and perform local CQL operations.
|
||||
|
||||
The test is run with multiple keyspaces with different configurations (replication strategy, RF, tablets enabled).
|
||||
It initially used only SimpleStrategy and RF=1, which hid https://github.com/scylladb/scylladb/issues/27988. To keep
|
||||
the test fast, the tasks for different keyspaces are performed concurrently, and server A is started in maintenance
|
||||
mode only once.
|
||||
"""
|
||||
max_rf = 3
|
||||
servers = await manager.servers_add(max_rf, auto_rack_dc='dc1')
|
||||
server_a = servers[0]
|
||||
host_id_a = await manager.get_host_id(server_a.server_id)
|
||||
|
||||
server_a, server_b = await manager.server_add(), await manager.server_add()
|
||||
socket_endpoint = UnixSocketEndPoint(await manager.server_get_maintenance_socket_path(server_a.server_id))
|
||||
|
||||
# For the move_tablet API.
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
# An exclusive connection to server A is needed for requests with LocalStrategy.
|
||||
cluster = cluster_con([server_a.ip_addr], load_balancing_policy=WhiteListRoundRobinPolicy([server_a.ip_addr]))
|
||||
cluster = cluster_con([server_b.ip_addr])
|
||||
cql = cluster.connect()
|
||||
|
||||
# (replication strategy, Optional[replication factor], tablets enabled)
|
||||
KeyspaceOptions: TypeAlias = tuple[str, int | None, bool]
|
||||
keyspace_options: list[KeyspaceOptions] = []
|
||||
keyspace_options.append(('EverywhereStrategy', None, False))
|
||||
keyspace_options.append(('LocalStrategy', None, False))
|
||||
for rf in range(1, max_rf + 1):
|
||||
keyspace_options.append(('SimpleStrategy', rf, False))
|
||||
for tablets_enabled in [True, False]:
|
||||
keyspace_options.append(('NetworkTopologyStrategy', rf, tablets_enabled))
|
||||
|
||||
key_on_server_a_per_table: dict[str, int] = dict()
|
||||
|
||||
async def prepare_table(options: KeyspaceOptions):
|
||||
replication_strategy, rf, tablets_enabled = options
|
||||
rf_string = "" if rf is None else f", 'replication_factor': {rf}"
|
||||
ks = await create_new_test_keyspace(cql,
|
||||
f"""WITH REPLICATION = {{'class': '{replication_strategy}'{rf_string}}}
|
||||
AND tablets = {{'enabled': {str(tablets_enabled).lower()}, 'initial': 1}}""")
|
||||
rf_tag = "" if rf is None else f"rf{rf}"
|
||||
tablets_tag = "tablets" if tablets_enabled else "vnodes"
|
||||
table_suffix = f"{replication_strategy.lower()}_{rf_tag}_{tablets_tag}"
|
||||
table = f"{ks}.{table_suffix}"
|
||||
async with new_test_keyspace(manager, "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") as ks:
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (k int PRIMARY KEY, v int)")
|
||||
logger.info(f"Created table {table}")
|
||||
|
||||
async def insert_one(cl: ConsistencyLevel):
|
||||
key = 1
|
||||
insert_stmt = SimpleStatement(f"INSERT INTO {table} (k, v) VALUES ({key}, {key})",
|
||||
consistency_level=cl)
|
||||
await cql.run_async(insert_stmt)
|
||||
key_on_server_a_per_table[table] = key
|
||||
|
||||
if replication_strategy == 'LocalStrategy':
|
||||
await insert_one(ConsistencyLevel.ONE)
|
||||
return
|
||||
|
||||
if tablets_enabled:
|
||||
await insert_one(ConsistencyLevel.ALL)
|
||||
|
||||
logger.info(f"Ensuring that a tablet replica is on {server_a} for table {table}")
|
||||
[tablet] = await get_all_tablet_replicas(manager, server_a, ks, table_suffix)
|
||||
if host_id_a not in [r[0] for r in tablet.replicas]:
|
||||
assert rf < max_rf
|
||||
any_replica = tablet.replicas[0]
|
||||
logger.info(f"Moving tablet from {any_replica} to {server_a} for table {table}")
|
||||
await manager.api.move_tablet(server_a.ip_addr, ks, table_suffix,
|
||||
any_replica[0], any_replica[1],
|
||||
host_id_a, 0,
|
||||
tablet.last_token)
|
||||
return
|
||||
|
||||
# This path is executed only for vnodes-based keyspaces.
|
||||
|
||||
# Token ranges of the server A
|
||||
# [(start_token, end_token)]
|
||||
ranges = [(int(row[0]), int(row[1])) for row in await cql.run_async(f"""SELECT start_token, end_token
|
||||
ranges = [(int(row[0]), int(row[1])) for row in await cql.run_async(f"""SELECT start_token, end_token, endpoint
|
||||
FROM system.token_ring WHERE keyspace_name = '{ks}'
|
||||
AND endpoint = '{server_a.ip_addr}' ALLOW FILTERING""")]
|
||||
|
||||
# Insert data to the cluster until a key is stored on server A.
|
||||
new_key = 0
|
||||
while table not in key_on_server_a_per_table:
|
||||
if new_key == 1000:
|
||||
# The probability of reaching this code is (2/3)^1000 for RF=1 and lower for greater RFs. This is much
|
||||
# less than, for example, the probability of a UUID collision, so worrying about this would be silly.
|
||||
# It could still happen due to a bug, and then we want to know about it, so we fail the test.
|
||||
pytest.fail(f"Could not find a key on server {server_a} after inserting 1000 keys")
|
||||
new_key += 1
|
||||
# Insert data to the cluster and find a key that is stored on server A.
|
||||
for i in range(256):
|
||||
await cql.run_async(f"INSERT INTO {table} (k, v) VALUES ({i}, {i})")
|
||||
|
||||
insert_stmt = SimpleStatement(f"INSERT INTO {table} (k, v) VALUES ({new_key}, {new_key})",
|
||||
consistency_level=ConsistencyLevel.ALL)
|
||||
await cql.run_async(insert_stmt)
|
||||
# [(key, token of this key)]
|
||||
keys_with_tokens = [(int(row[0]), int(row[1])) for row in await cql.run_async(f"SELECT k, token(k) FROM {table}")]
|
||||
key_on_server_a = None
|
||||
|
||||
res = await cql.run_async(f"SELECT token(k) FROM {table} WHERE k = {new_key}")
|
||||
assert len(res) == 1
|
||||
token = res[0][0]
|
||||
for key, token in keys_with_tokens:
|
||||
for start, end in ranges:
|
||||
if (start < end and start < token <= end) or (start >= end and (token <= end or start < token)):
|
||||
logger.info(f"Found key {new_key} with token {token} on server {server_a} for table {table}")
|
||||
key_on_server_a_per_table[table] = new_key
|
||||
key_on_server_a = key
|
||||
|
||||
logger.info("Preparing tables")
|
||||
await gather_safely(*(prepare_table(options) for options in keyspace_options))
|
||||
if key_on_server_a is None:
|
||||
# There is only a chance ~(1/2)^256 that all keys are stored on the server B
|
||||
# In this case we skip the test
|
||||
pytest.skip("All keys are stored on the server B")
|
||||
|
||||
# Start server A in maintenance mode
|
||||
await manager.server_stop_gracefully(server_a.server_id)
|
||||
await manager.server_update_config(server_a.server_id, "maintenance_mode", True)
|
||||
await manager.server_start(server_a.server_id)
|
||||
# Start server A in maintenance mode
|
||||
await manager.server_stop_gracefully(server_a.server_id)
|
||||
await manager.server_update_config(server_a.server_id, "maintenance_mode", "true")
|
||||
await manager.server_start(server_a.server_id)
|
||||
|
||||
log = await manager.server_open_log(server_a.server_id)
|
||||
await log.wait_for(r"initialization completed \(maintenance mode\)")
|
||||
log = await manager.server_open_log(server_a.server_id)
|
||||
await log.wait_for(r"initialization completed \(maintenance mode\)")
|
||||
|
||||
# Check that the regular CQL port is not available
|
||||
assert socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect_ex((server_a.ip_addr, 9042)) != 0
|
||||
# Check that the regular CQL port is not available
|
||||
assert socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect_ex((server_a.ip_addr, 9042)) != 0
|
||||
|
||||
maintenance_cluster = cluster_con([socket_endpoint],
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([socket_endpoint]))
|
||||
maintenance_cql = maintenance_cluster.connect()
|
||||
maintenance_cluster = cluster_con([socket_endpoint],
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([socket_endpoint]))
|
||||
maintenance_cql = maintenance_cluster.connect()
|
||||
|
||||
async def update_table_in_maintenance_mode(table: str, key: int):
|
||||
# Check that local data is available in maintenance mode
|
||||
select_stm = SimpleStatement(f"SELECT v FROM {table} WHERE k = {key}", consistency_level=ConsistencyLevel.ONE)
|
||||
res = await maintenance_cql.run_async(select_stm)
|
||||
assert len(res) == 1 and res[0][0] == key, f"Expected {key} for table {table}"
|
||||
res = await maintenance_cql.run_async(f"SELECT v FROM {table} WHERE k = {key_on_server_a}")
|
||||
assert res[0][0] == key_on_server_a
|
||||
|
||||
update_stm = SimpleStatement(f"UPDATE {table} SET v = {key + 1} WHERE k = {key}",
|
||||
consistency_level=ConsistencyLevel.ONE)
|
||||
await maintenance_cql.run_async(update_stm)
|
||||
# Check that group0 operations are disabled
|
||||
with pytest.raises(ConfigurationException):
|
||||
await maintenance_cql.run_async(f"CREATE TABLE {ks}.t2 (k int PRIMARY KEY, v int)")
|
||||
|
||||
logger.info("Updating tables in maintenance mode")
|
||||
await gather_safely(*(update_table_in_maintenance_mode(table, key)
|
||||
for table, key in key_on_server_a_per_table.items()))
|
||||
await maintenance_cql.run_async(f"UPDATE {table} SET v = {key_on_server_a + 1} WHERE k = {key_on_server_a}")
|
||||
|
||||
# Check that group0 operations are disabled
|
||||
with pytest.raises(ConfigurationException, match="cannot start group0 operation in the maintenance mode"):
|
||||
await create_new_test_keyspace(
|
||||
maintenance_cql, "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
|
||||
# Ensure that server B recognizes server A as being shutdown, not as being alive.
|
||||
res = await cql.run_async(f"SELECT status FROM system.cluster_status WHERE peer = '{server_a.ip_addr}'")
|
||||
assert res[0][0] == "shutdown"
|
||||
|
||||
# Ensure that another server recognizes server A as being shutdown, not as being alive.
|
||||
cql_b, [host_b] = await manager.get_ready_cql([servers[1]])
|
||||
res = await cql_b.run_async(f"SELECT status FROM system.cluster_status WHERE peer = '{server_a.ip_addr}'",
|
||||
host=host_b)
|
||||
assert len(res) == 1
|
||||
assert res[0][0] == "shutdown"
|
||||
await manager.server_stop_gracefully(server_a.server_id)
|
||||
|
||||
await manager.server_stop_gracefully(server_a.server_id)
|
||||
# Restart in normal mode to see if the changes made in maintenance mode are persisted
|
||||
await manager.server_update_config(server_a.server_id, "maintenance_mode", False)
|
||||
await manager.server_start(server_a.server_id, wait_others=1)
|
||||
await wait_for_cql_and_get_hosts(cql, [server_a], time.time() + 60)
|
||||
await manager.servers_see_each_other([server_a, server_b])
|
||||
|
||||
# Restart in normal mode
|
||||
await manager.server_update_config(server_a.server_id, "maintenance_mode", False)
|
||||
await manager.server_start(server_a.server_id, wait_others=1)
|
||||
await wait_for_cql_and_get_hosts(cql, [server_a], time.time() + 60)
|
||||
await manager.servers_see_each_other(servers)
|
||||
res = await cql.run_async(f"SELECT v FROM {table} WHERE k = {key_on_server_a}")
|
||||
assert res[0][0] == key_on_server_a + 1
|
||||
|
||||
async def check_table_in_normal_mode(table: str, key: int):
|
||||
# Check if the changes made in maintenance mode are persisted
|
||||
select_stm = SimpleStatement(f"SELECT v FROM {table} WHERE k = {key}", consistency_level=ConsistencyLevel.ALL)
|
||||
res = await cql.run_async(select_stm)
|
||||
assert len(res) == 1 and res[0][0] == key + 1, f"Expected {key + 1} for table {table}"
|
||||
|
||||
logger.info("Checking tables in normal mode")
|
||||
await gather_safely(*(check_table_in_normal_mode(table, key) for table, key in key_on_server_a_per_table.items()))
|
||||
|
||||
cluster.shutdown()
|
||||
maintenance_cluster.shutdown()
|
||||
|
||||
@@ -83,4 +83,51 @@ async def test_cannot_disable_cluster_feature_after_all_declare_support(manager:
|
||||
# Nodes should start supporting the feature
|
||||
cql = cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
await asyncio.gather(*(wait_for_feature('TEST_ONLY_FEATURE', cql, h, time.time() + 60) for h in hosts))
|
||||
await asyncio.gather(*(wait_for_feature('TEST_ONLY_FEATURE', cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_simulate_upgrade_legacy_to_raft_listener_registration(manager: ManagerClient):
|
||||
"""
|
||||
We simulate an upgrade from legacy mode to Raft. Our goal is
|
||||
to make sure that the cluster successfully reaches the state
|
||||
where it can start the upgrade procedure.
|
||||
|
||||
This test effectively reproduces the problem described
|
||||
in scylladb/scylladb#18049.
|
||||
"""
|
||||
|
||||
# We need this so that the first logs we wait for appear.
|
||||
cmdline = ["--logger-log-level", "raft_topology=debug"]
|
||||
# Tablets and legacy mode are incompatible with each other.
|
||||
config = {"force_gossip_topology_changes": True,
|
||||
"tablets_mode_for_new_keyspaces": "disabled"}
|
||||
|
||||
error_injection = { "name": "suppress_features", "value": "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"}
|
||||
bad_config = config | {"error_injections_at_startup": [error_injection]}
|
||||
|
||||
# We need to bootstrap the nodes one-by-one.
|
||||
# We can't do it concurrently without Raft.
|
||||
s1 = await manager.server_add(cmdline=cmdline, config=bad_config)
|
||||
s2 = await manager.server_add(cmdline=cmdline, config=bad_config)
|
||||
|
||||
# Simulate upgrading node 1.
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
await manager.server_update_config(s1.server_id, "error_injections_at_startup", [])
|
||||
|
||||
log = await manager.server_open_log(s1.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
await manager.server_start(s1.server_id)
|
||||
|
||||
# The node should block after this.
|
||||
await log.wait_for("Waiting for cluster feature `SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`", from_mark=mark)
|
||||
mark = await log.mark()
|
||||
|
||||
# Simulate upgrading node 2.
|
||||
await manager.server_stop_gracefully(s2.server_id)
|
||||
await manager.server_update_config(s2.server_id, "error_injections_at_startup", [])
|
||||
await manager.server_start(s2.server_id)
|
||||
|
||||
# If everything went smoothly, we'll get to this.
|
||||
await log.wait_for("The cluster is ready to start upgrade to the raft topology")
|
||||
|
||||
81
test/cluster/test_raft_fix_broken_snapshot.py
Normal file
81
test/cluster/test_raft_fix_broken_snapshot.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import pytest
|
||||
import time
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import reconnect_driver, enter_recovery_state, \
|
||||
delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \
|
||||
wait_for_token_ring_and_group0_consistency, new_test_keyspace
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_raft_fix_broken_snapshot(manager: ManagerClient):
|
||||
"""Reproducer for scylladb/scylladb#16683.
|
||||
|
||||
This test uses the gossip-based recovery procedure.
|
||||
|
||||
Simulate upgrade-to-Raft in old cluster (which doesn't have ff386e7a445)
|
||||
using RECOVERY mode and error injection.
|
||||
Then bootstrap a new server.
|
||||
|
||||
Thanks to the new logic we will detect lack of snapshot and create one,
|
||||
which the new server will receive, resulting in correct schema transfer.
|
||||
"""
|
||||
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
'error_injections_at_startup': ['raft_sys_table_storage::bootstrap/init_index_0']}
|
||||
srv = await manager.server_add(config=cfg)
|
||||
cql = manager.get_cql()
|
||||
h = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
|
||||
|
||||
# Enter RECOVERY mode, create a keyspace, leave RECOVERY to create new group 0
|
||||
# but with error injection that causes the snapshot to have index 0 (as in ScyllaDB 5.2).
|
||||
logger.info(f"Entering recovery state on {srv}")
|
||||
await enter_recovery_state(cql, h)
|
||||
await manager.server_restart(srv.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
|
||||
|
||||
logger.info(f"Creating keyspace")
|
||||
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
|
||||
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
||||
|
||||
logger.info(f"Leaving recovery state")
|
||||
await delete_raft_data_and_upgrade_state(cql, h)
|
||||
await manager.server_stop_gracefully(srv.server_id)
|
||||
await manager.server_start(srv.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
|
||||
|
||||
logger.info(f"Waiting for group 0 upgrade to finish")
|
||||
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
|
||||
|
||||
# The Raft log will only contain this change,
|
||||
# older schema changes can only be obtained through snapshot transfer.
|
||||
await cql.run_async(f"create table {ks}.t2 (pk int primary key)")
|
||||
|
||||
# Restarting the server should trigger snapshot creation.
|
||||
await manager.server_restart(srv.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
|
||||
|
||||
await manager.server_add(config=cfg)
|
||||
await manager.server_sees_others(srv.server_id, 1)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
|
||||
|
||||
# This would fail if snapshot creation wasn't triggered,
|
||||
# second node reporting 'Failed to apply mutation ... no_such_column_family`
|
||||
await cql.run_async(f"insert into {ks}.t (pk) values (0)", host=h)
|
||||
68
test/cluster/test_raft_recovery_basic.py
Normal file
68
test/cluster/test_raft_recovery_basic.py
Normal file
@@ -0,0 +1,68 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import reconnect_driver, enter_recovery_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_raft_recovery_basic(request, manager: ManagerClient):
|
||||
# This test uses the gossip-based recovery procedure.
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
cmd = ['--logger-log-level', 'raft=trace']
|
||||
|
||||
servers = [await manager.server_add(config=cfg, cmdline=cmd) for _ in range(3)]
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {servers}")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}. Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Upgrade finished. Creating a new table")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info("Checking group0_history")
|
||||
rs = await cql.run_async("select * from system.group0_history")
|
||||
assert(rs)
|
||||
logging.info(f"group0_history entry description: '{rs[0].description}'")
|
||||
assert(table.full_name in rs[0].description)
|
||||
|
||||
logging.info("Booting new node")
|
||||
await manager.server_add(config=cfg, cmdline=cmd)
|
||||
@@ -8,9 +8,6 @@ import logging
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from cassandra.cluster import Session
|
||||
from cassandra.pool import Host
|
||||
from uuid import UUID
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
@@ -18,17 +15,8 @@ from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import check_system_topology_and_cdc_generations_v3_consistency, \
|
||||
check_token_ring_and_group0_consistency, delete_discovery_state_and_group0_id, delete_raft_group_data, \
|
||||
reconnect_driver, wait_for_cdc_generations_publishing
|
||||
from test.cluster.test_group0_schema_versioning import get_group0_schema_version, get_local_schema_version
|
||||
|
||||
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
|
||||
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
|
||||
if rs:
|
||||
return UUID(rs[0].value)
|
||||
return None
|
||||
|
||||
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
|
||||
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
|
||||
assert(rs)
|
||||
return rs[0].schema_version
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raft_recovery_entry_loss(manager: ManagerClient):
|
||||
|
||||
84
test/cluster/test_raft_recovery_majority_loss.py
Normal file
84
test/cluster/test_raft_recovery_majority_loss.py
Normal file
@@ -0,0 +1,84 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import reconnect_driver, enter_recovery_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_recovery_after_majority_loss(request, manager: ManagerClient):
|
||||
"""
|
||||
This test uses the gossip-based recovery procedure.
|
||||
|
||||
All initial servers but one fail - group 0 is left without a majority. We create a new group
|
||||
0 by entering RECOVERY, using `removenode` to get rid of the other servers, clearing Raft
|
||||
data and restarting. The Raft upgrade procedure runs to establish a single-node group 0. We
|
||||
also verify that schema changes performed using the old group 0 are still there.
|
||||
Note: in general there's no guarantee that all schema changes will be present; the minority
|
||||
used to recover group 0 might have missed them. However in this test the driver waits
|
||||
for schema agreement to complete before proceeding, so we know that every server learned
|
||||
about the schema changes.
|
||||
"""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Creating a bunch of tables")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
tables = await asyncio.gather(*(random_tables.add_table(ncolumns=5) for _ in range(5)))
|
||||
|
||||
srv1, *others = servers
|
||||
|
||||
logging.info(f"Killing all nodes except {srv1}")
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in others))
|
||||
|
||||
logging.info(f"Entering recovery state on {srv1}")
|
||||
host1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
||||
await enter_recovery_state(cql, host1)
|
||||
await manager.server_restart(srv1.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Node restarted, waiting until driver connects")
|
||||
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
|
||||
|
||||
for i in range(len(others)):
|
||||
to_remove = others[i]
|
||||
ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]]
|
||||
logging.info(f"Removing {to_remove} using {srv1} with ignore_dead: {ignore_dead_ips}")
|
||||
await manager.remove_node(srv1.server_id, to_remove.server_id, ignore_dead_ips)
|
||||
|
||||
logging.info(f"Deleting old Raft data and upgrade state on {host1} and restarting")
|
||||
await delete_raft_data_and_upgrade_state(cql, host1)
|
||||
await manager.server_restart(srv1.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Node restarted, waiting until driver connects")
|
||||
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
|
||||
|
||||
logging.info(f"Driver reconnected, host: {host1}. Waiting until upgrade finishes.")
|
||||
await wait_until_upgrade_finishes(cql, host1, time.time() + 60)
|
||||
|
||||
logging.info("Checking if previously created tables still exist")
|
||||
await asyncio.gather(*(cql.run_async(f"select * from {t.full_name}") for t in tables))
|
||||
|
||||
logging.info("Creating another table")
|
||||
await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info("Booting new node")
|
||||
await manager.server_add(config=cfg)
|
||||
136
test/cluster/test_raft_recovery_stuck.py
Normal file
136
test/cluster/test_raft_recovery_stuck.py
Normal file
@@ -0,0 +1,136 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
|
||||
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@log_run_time
|
||||
async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
"""
|
||||
This test uses the gossip-based recovery procedure.
|
||||
|
||||
1. Create a cluster,
|
||||
2. Enter RECOVERY state on every server.
|
||||
3. Delete the Raft data and the upgrade state on all servers.
|
||||
4. Restart them and the upgrade procedure starts.
|
||||
5. Start the first node with a group 0 upgrade error injected to it, so it fails.
|
||||
6. Start the rest of the nodes in the cluster, they enter 'synchronize' state.
|
||||
We assume the failed server cannot be recovered. We cannot just remove it at this point;
|
||||
it's already part of group 0, `remove_from_group0` will wait until upgrade procedure
|
||||
finishes - but the procedure is stuck. To proceed we:
|
||||
7. Enter RECOVERY state on the other servers,
|
||||
8. Remove the failed node, and
|
||||
9. Clear existing Raft data.
|
||||
10. After leaving RECOVERY, the remaining nodes will restart the procedure, establish a new
|
||||
group 0 and finish upgrade.
|
||||
"""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
srv1, *others = servers
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Stopping {servers}")
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in servers))
|
||||
|
||||
logging.info(f"Starting {srv1} with injected group 0 upgrade error")
|
||||
await manager.server_update_config(srv1.server_id, 'error_injections_at_startup', ['group0_upgrade_before_synchronize'])
|
||||
await manager.server_start(srv1.server_id)
|
||||
|
||||
logging.info(f"Starting {others}")
|
||||
await asyncio.gather(*(manager.server_start(srv.server_id) for srv in others))
|
||||
|
||||
logging.info(f"Waiting until {servers} see each other as alive")
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
logging.info(f"Waiting until {hosts} enter 'synchronize' state")
|
||||
await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts))
|
||||
logging.info(f"{hosts} entered synchronize")
|
||||
|
||||
log_file1 = await manager.server_open_log(srv1.server_id)
|
||||
logging.info(f"Checking if Raft upgrade procedure failed on {srv1}")
|
||||
await log_file1.wait_for("error injection before group 0 upgrade enters synchronize")
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
# Prevent scylladb/scylladb#21724
|
||||
logging.info("Wait until everyone sees everyone as alive")
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
cql, hosts = await manager.get_ready_cql(others)
|
||||
|
||||
logging.info(f"Checking if {hosts} are in recovery state")
|
||||
for host in hosts:
|
||||
rs = await cql.run_async(
|
||||
"select value from system.scylla_local where key = 'group0_upgrade_state'",
|
||||
host=host)
|
||||
assert rs[0].value == 'recovery'
|
||||
|
||||
logging.info("Creating a table while in recovery state")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info(f"Stopping {srv1}")
|
||||
await manager.server_stop_gracefully(srv1.server_id)
|
||||
|
||||
logging.info(f"Removing {srv1} using {others[0]}")
|
||||
await manager.remove_node(others[0].server_id, srv1.server_id)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
cql, hosts = await manager.get_ready_cql(others)
|
||||
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}, waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking if previously created table still exists")
|
||||
await cql.run_async(f"select * from {table.full_name}")
|
||||
58
test/cluster/test_replace_ignore_nodes.py
Normal file
58
test/cluster/test_replace_ignore_nodes.py
Normal file
@@ -0,0 +1,58 @@
|
||||
#
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
from test.pylib.internal_types import IPAddress, HostID
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import wait_for_token_ring_and_group0_consistency
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_ignore_nodes(manager: ManagerClient) -> None:
|
||||
"""Replace a node in presence of multiple dead nodes.
|
||||
Regression test for #14487. Does not apply to Raft-topology mode.
|
||||
|
||||
This is a slow test with a 7 node cluster any 3 replace operations,
|
||||
we don't want to run it in debug mode.
|
||||
Preferably run it only in one mode e.g. dev.
|
||||
"""
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
logger.info(f"Booting initial cluster")
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(7)]
|
||||
s2_id = await manager.get_host_id(servers[2].server_id)
|
||||
logger.info(f"Stopping servers {servers[:3]}")
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
await manager.server_stop(servers[1].server_id)
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
|
||||
# The parameter accepts both IP addrs with host IDs.
|
||||
# We must be able to resolve them in both ways.
|
||||
ignore_dead: list[IPAddress | HostID] = [servers[1].ip_addr, s2_id]
|
||||
logger.info(f"Replacing {servers[0]}, ignore_dead_nodes = {ignore_dead}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False,
|
||||
ignore_dead_nodes = ignore_dead)
|
||||
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
ignore_dead = [servers[2].ip_addr]
|
||||
logger.info(f"Replacing {servers[1]}, ignore_dead_nodes = {ignore_dead}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[1].server_id, reuse_ip_addr = False, use_host_id = False,
|
||||
ignore_dead_nodes = ignore_dead)
|
||||
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
logger.info(f"Replacing {servers[2]}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = False)
|
||||
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
@@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
|
||||
GB = 1024 * 1024 * 1024
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_balance_empty_tablets(manager: ManagerClient):
|
||||
|
||||
# This test checks that size-based load balancing migrates empty tablets of a newly created
|
||||
@@ -23,7 +24,7 @@ async def test_balance_empty_tablets(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
|
||||
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
||||
cfg = { 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] }
|
||||
|
||||
cfg_small = cfg | { 'data_file_capacity': 50 * GB }
|
||||
cfg_large = cfg | { 'data_file_capacity': 100 * GB }
|
||||
|
||||
@@ -9,6 +9,7 @@ async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient)
|
||||
'error_injections_at_startup': ['stream_mutation_fragments_table_dropped'],
|
||||
'enable_repair_based_node_ops': False,
|
||||
'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'
|
||||
}) for _ in range(2)]
|
||||
|
||||
|
||||
@@ -832,6 +832,27 @@ async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, wi
|
||||
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
||||
assert res is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablets_and_gossip_topology_changes_are_incompatible(manager: ManagerClient):
|
||||
cfg = {"tablets_mode_for_new_keyspaces": "enabled", "force_gossip_topology_changes": True}
|
||||
with pytest.raises(Exception, match="Failed to add server"):
|
||||
await manager.server_add(config=cfg)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerClient):
|
||||
cfg = {"tablets_mode_for_new_keyspaces": "disabled", "force_gossip_topology_changes": True}
|
||||
await manager.server_add(config=cfg)
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks_name:
|
||||
res = cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks_name}'").one()
|
||||
logger.info(res)
|
||||
|
||||
for enabled in ["false", "true"]:
|
||||
expected = r"Error from server: code=2000 \[Syntax error in CQL query\] message=\"line 1:126 no viable alternative at input 'tablets'\""
|
||||
with pytest.raises(SyntaxException, match=expected):
|
||||
ks_name = unique_name()
|
||||
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets {{'enabled': {enabled}}};")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
|
||||
@@ -60,24 +60,6 @@ async def safe_rolling_restart(manager, servers, with_down):
|
||||
cql = await reconnect_driver(manager)
|
||||
return cql
|
||||
|
||||
async def wait_for_valid_load_stats(cql, table_id, timeout=120):
|
||||
started = time.time()
|
||||
# Wait until the given table has no missing tablet sizes
|
||||
while True:
|
||||
missing_cnt = 0
|
||||
found_cnt = 0
|
||||
for r in await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table_id};"):
|
||||
found_cnt += 1
|
||||
if len(r.missing_replicas) > 0:
|
||||
missing_cnt += 1
|
||||
|
||||
if missing_cnt == 0 and found_cnt > 0:
|
||||
break
|
||||
|
||||
assert time.time() - started < timeout, "Timed out while waiting for valid load_stats"
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
|
||||
"""Test that you can create a table and insert and query data"""
|
||||
@@ -1940,43 +1922,6 @@ async def test_update_load_stats_after_migration(manager: ManagerClient):
|
||||
assert leaving_replica[0] not in replica_hosts, "Leaving replica tablet size is not in load_stats any more"
|
||||
assert pending_replica[0] in replica_hosts, "Pending replica tablet size is in load_stats"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_crash_on_missing_table_from_load_stats(manager: ManagerClient):
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'load_balancer=debug',
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
'--smp', '2',
|
||||
]
|
||||
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
])
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
|
||||
|
||||
# Make sure load_stats has been refreshed and that the coordinator has cached load_stats
|
||||
table_id = await manager.get_table_or_view_id(ks, 'test')
|
||||
await wait_for_valid_load_stats(cql, table_id)
|
||||
|
||||
# Kill the non-coordinator node
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
# Drop the table; this leaves the table size in the cached load_stats on the coordinator
|
||||
await cql.run_async(f"DROP TABLE {ks}.test")
|
||||
|
||||
# Wait for the next load_stats refresh
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
s0_mark = await s0_log.mark()
|
||||
await s0_log.wait_for('raft topology: Refreshed table load stats for all DC', from_mark=s0_mark)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
|
||||
|
||||
@@ -99,15 +99,14 @@ async def test_tablets_are_rebuilt_in_parallel(manager: ManagerClient, same_rack
|
||||
]
|
||||
|
||||
# same_rack == True
|
||||
# rack1: 2 servers
|
||||
# rack1: 1 server
|
||||
# rack2: 3 servers (will have 2 removed)
|
||||
#
|
||||
# same_rack == False
|
||||
# rack1: 3 servers (will have 1 removed)
|
||||
# rack1: 2 servers (will have 1 removed)
|
||||
# rack2: 2 servers (will have 1 removed)
|
||||
|
||||
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
servers = await manager.servers_add(4, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
{"dc": "dc1", "rack": "rack2"},
|
||||
{"dc": "dc1", "rack": "rack2" if same_rack else "rack1"}, # will be removed
|
||||
@@ -121,7 +120,7 @@ async def test_tablets_are_rebuilt_in_parallel(manager: ManagerClient, same_rack
|
||||
" 'dc1': ['rack1', 'rack2']} AND tablets = {'initial': 32};") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
||||
|
||||
servers_to_remove = [servers[3], servers[4]]
|
||||
servers_to_remove = [servers[2], servers[3]]
|
||||
host_ids_to_remove = await gather_safely(*(manager.get_host_id(s.server_id) for s in servers_to_remove))
|
||||
|
||||
logger.info("Stopping servers to be removed")
|
||||
@@ -306,8 +305,7 @@ async def test_remove_is_canceled_if_there_is_node_down(manager: ManagerClient):
|
||||
cmdline = [
|
||||
'--logger-log-level', 'load_balancer=debug',
|
||||
]
|
||||
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
servers = await manager.servers_add(4, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "rack1"},
|
||||
{"dc": "dc1", "rack": "rack2"},
|
||||
{"dc": "dc1", "rack": "rack2"},
|
||||
|
||||
139
test/cluster/test_topology_remove_garbage_group0.py
Normal file
139
test/cluster/test_topology_remove_garbage_group0.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
"""
|
||||
Test removenode with node with node no longer member
|
||||
"""
|
||||
import logging
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import get_token_ring_host_ids, get_current_group0_config, \
|
||||
check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency
|
||||
import time
|
||||
import pytest
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def test_remove_garbage_group0_members(manager: ManagerClient):
|
||||
"""
|
||||
Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission
|
||||
can be handled by executing removenode (which should clear the 'garbage' group 0 member),
|
||||
even though the node is no longer a token ring member. Does not apply to Raft-topology mode.
|
||||
"""
|
||||
# 4 servers, one dead
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(4)]
|
||||
|
||||
# Make sure that the driver has connected to all nodes, and they see each other as NORMAL
|
||||
# (otherwise the driver may remove connection to some host, even after it manages to connect to it,
|
||||
# because the node that it has control connection to considers that host as not NORMAL yet).
|
||||
# This ensures that after we stop/remove some nodes in the test, the driver will still
|
||||
# be able to connect to the remaining nodes. See scylladb/scylladb#16373
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
|
||||
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 60)
|
||||
|
||||
removed_host_id = await manager.get_host_id(servers[0].server_id)
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
|
||||
logging.info(f'removenode {servers[0]} using {servers[1]}')
|
||||
# removenode will fail after removing the server from the token ring,
|
||||
# but before removing it from group 0
|
||||
await inject_error_one_shot(manager.api, servers[1].ip_addr,
|
||||
'removenode_fail_before_remove_from_group0')
|
||||
try:
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
except Exception:
|
||||
# Note: the exception returned here is only '500 internal server error',
|
||||
# need to look in test.py log for the actual message coming from Scylla.
|
||||
logging.info(f'expected exception during injection')
|
||||
|
||||
# Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs
|
||||
# (internally, this endpoint uses token_metadata)
|
||||
token_ring_ids = await get_token_ring_host_ids(manager, servers[1])
|
||||
logging.info(f'token ring members: {token_ring_ids}')
|
||||
|
||||
group0_members = await get_current_group0_config(manager, servers[1])
|
||||
logging.info(f'group 0 members: {group0_members}')
|
||||
group0_ids = {m[0] for m in group0_members}
|
||||
|
||||
# Token ring members should currently be a subset of group 0 members
|
||||
assert token_ring_ids <= group0_ids
|
||||
|
||||
garbage_members = group0_ids - token_ring_ids
|
||||
logging.info(f'garbage members: {garbage_members}')
|
||||
assert len(garbage_members) == 1
|
||||
garbage_member = next(iter(garbage_members))
|
||||
|
||||
# The garbage member is the one that we failed to remove
|
||||
assert garbage_member == removed_host_id
|
||||
|
||||
# Verify that at least it's a non-voter.
|
||||
assert garbage_member in {m[0] for m in group0_members if not m[1]}
|
||||
|
||||
logging.info(f'removenode {servers[0]} using {servers[1]} again')
|
||||
# Retry removenode. It should skip the token ring removal step and remove the server from group 0.
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
|
||||
group0_members = await get_current_group0_config(manager, servers[1])
|
||||
logging.info(f'group 0 members: {group0_members}')
|
||||
group0_ids = {m[0] for m in group0_members}
|
||||
|
||||
# Token ring members and group 0 members should now be the same.
|
||||
assert token_ring_ids == group0_ids
|
||||
|
||||
# Verify that availability is not reduced.
|
||||
# Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers.
|
||||
|
||||
logging.info(f'stop {servers[1]}')
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
logging.info(f'removenode {servers[1]} using {servers[2]}')
|
||||
await manager.remove_node(servers[2].server_id, servers[1].server_id)
|
||||
|
||||
# Perform a similar scenario with decommission. One of the node fails to decommission fully,
|
||||
# but it manages to leave the token ring. We observe the leftovers using the same APIs as above
|
||||
# and remove the leftovers.
|
||||
# We can do this with only 2 nodes because during decommission we become a non-voter before
|
||||
# leaving the token ring, thus the remaining single node will become a voting majority
|
||||
# and will be able to perform removenode alone.
|
||||
|
||||
decommissioned_host_id = await manager.get_host_id(servers[2].server_id)
|
||||
await manager.api.enable_injection(
|
||||
servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True)
|
||||
logging.info(f'decommission {servers[2]}')
|
||||
try:
|
||||
await manager.decommission_node(servers[2].server_id)
|
||||
except Exception:
|
||||
logging.info(f'expected exception during injection')
|
||||
logging.info(f'stop {servers[2]}')
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
|
||||
token_ring_ids = await get_token_ring_host_ids(manager, servers[3])
|
||||
logging.info(f'token ring members: {token_ring_ids}')
|
||||
|
||||
group0_members = await get_current_group0_config(manager, servers[3])
|
||||
logging.info(f'group 0 members: {group0_members}')
|
||||
group0_ids = {m[0] for m in group0_members}
|
||||
|
||||
assert token_ring_ids <= group0_ids
|
||||
|
||||
garbage_members = group0_ids - token_ring_ids
|
||||
logging.info(f'garbage members: {garbage_members}')
|
||||
assert len(garbage_members) == 1
|
||||
garbage_member = next(iter(garbage_members))
|
||||
|
||||
assert garbage_member == decommissioned_host_id
|
||||
assert garbage_member in {m[0] for m in group0_members if not m[1]}
|
||||
|
||||
logging.info(f'removenode {servers[2]} using {servers[3]}')
|
||||
await manager.remove_node(servers[3].server_id, servers[2].server_id)
|
||||
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
81
test/cluster/test_topology_upgrade.py
Normal file
81
test/cluster/test_topology_upgrade.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import log_run_time, wait_until_last_generation_is_in_use, wait_until_topology_upgrade_finishes, \
|
||||
wait_for_cdc_generations_publishing, check_system_topology_and_cdc_generations_v3_consistency, \
|
||||
start_writes_to_cdc_table
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_topology_upgrade_basic(request, build_mode: str, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
'ring_delay_ms': 15000 if build_mode == 'debug' else 5000,
|
||||
}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
del cfg['force_gossip_topology_changes']
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
_, stop_writes_and_verify = await start_writes_to_cdc_table(cql)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Check that triggering upgrade is idempotent")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Waiting for CDC generations publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
logging.info("Booting new node")
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for the new CDC generation publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
await wait_until_last_generation_is_in_use(cql)
|
||||
|
||||
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in the last generation")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
logging.info("Checking correctness of data in system_distributed.cdc_streams_descriptions_v2")
|
||||
await stop_writes_and_verify()
|
||||
@@ -0,0 +1,63 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import wait_until_topology_upgrade_finishes, \
|
||||
wait_for_cdc_generations_publishing, \
|
||||
check_system_topology_and_cdc_generations_v3_consistency
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_topology_upgrade_not_stuck_after_recent_removal(request, manager: ManagerClient):
|
||||
"""
|
||||
Regression test for https://github.com/scylladb/scylladb/issues/18198.
|
||||
1. Create a two node cluster in legacy mode
|
||||
2. Remove one of the nodes
|
||||
3. Upgrade the cluster to raft topology.
|
||||
4. Verify that the upgrade went OK and it did not get stuck.
|
||||
"""
|
||||
# First, force the nodes to start in legacy mode due to the error injection
|
||||
cfg = {
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled',
|
||||
}
|
||||
|
||||
logging.info("Creating a two node cluster")
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
srv1, srv2 = servers
|
||||
|
||||
logging.info("Removing the second node in the cluster")
|
||||
await manager.decommission_node(srv2.server_id)
|
||||
|
||||
logging.info("Waiting until driver connects to the only server")
|
||||
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
|
||||
|
||||
logging.info("Checking the upgrade state")
|
||||
status = await manager.api.raft_topology_upgrade_status(host1.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(host1.address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await wait_until_topology_upgrade_finishes(manager, host1.address, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for CDC generations publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, [host1], time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1])
|
||||
158
test/cluster/test_topology_upgrade_stuck.py
Normal file
158
test/cluster/test_topology_upgrade_stuck.py
Normal file
@@ -0,0 +1,158 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from typing import List
|
||||
|
||||
from test.pylib.log_browsing import ScyllaLogFile
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.scylla_cluster import gather_safely
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_first_completed
|
||||
from test.cluster.util import reconnect_driver, enter_recovery_state, \
|
||||
delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \
|
||||
wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \
|
||||
check_system_topology_and_cdc_generations_v3_consistency
|
||||
|
||||
async def wait_for_log_on_any_node(logs: List[ScyllaLogFile], marks: List[int], pattern: str):
|
||||
"""
|
||||
Waits until a given line appears on any node in the cluster.
|
||||
"""
|
||||
assert len(logs) == len(marks)
|
||||
await wait_for_first_completed([l.wait_for(pattern) for l, m in zip(logs, marks)])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@pytest.mark.skip_mode(mode='debug', reason='test performs many topology changes')
|
||||
@log_run_time
|
||||
async def test_topology_upgrade_stuck(request, manager: ManagerClient):
|
||||
"""
|
||||
Simulates a situation where upgrade procedure gets stuck due to majority
|
||||
loss: we have one upgraded node, one not upgraded node, and three nodes
|
||||
permanently down. Then, it verifies that it's possible to perform recovery
|
||||
procedure and redo the upgrade after the issue is resolved.
|
||||
"""
|
||||
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(5)]
|
||||
to_be_upgraded_node, to_be_isolated_node, *to_be_shutdown_nodes = servers
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
removed_hosts = hosts[2:]
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
logging.info("Enabling error injection which will cause the topology coordinator to get stuck")
|
||||
await gather_safely(*(manager.api.enable_injection(s.ip_addr, "topology_coordinator_fail_to_build_state_during_upgrade", one_shot=False) for s in servers))
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade gets stuck due to error injection")
|
||||
logs = await gather_safely(*(manager.server_open_log(s.server_id) for s in servers))
|
||||
marks = await gather_safely(*(l.mark() for l in logs))
|
||||
await wait_for_log_on_any_node(logs, marks, "failed to build topology coordinator state due to error injection")
|
||||
|
||||
logging.info("Isolate one of the nodes via error injection")
|
||||
await manager.api.enable_injection(to_be_isolated_node.ip_addr, "raft_drop_incoming_append_entries", one_shot=False)
|
||||
|
||||
logging.info("Disable the error injection that causes upgrade to get stuck")
|
||||
marks = await gather_safely(*(l.mark() for l in logs))
|
||||
await gather_safely(*(manager.api.disable_injection(s.ip_addr, "topology_coordinator_fail_to_build_state_during_upgrade") for s in servers))
|
||||
|
||||
logging.info("Wait for the topology coordinator to observe upgrade as finished")
|
||||
await wait_for_log_on_any_node(logs, marks, "upgrade to raft topology has finished")
|
||||
|
||||
logging.info("Shut down three nodes to simulate quorum loss")
|
||||
await gather_safely(*(manager.server_stop(s.server_id) for s in to_be_shutdown_nodes))
|
||||
|
||||
logging.info("Disable the error injection that causes node to be isolated")
|
||||
await manager.api.disable_injection(to_be_isolated_node.ip_addr, "raft_drop_incoming_append_entries")
|
||||
|
||||
logging.info("Checking that not all nodes finished upgrade")
|
||||
upgraded_count = 0
|
||||
for s in [to_be_upgraded_node, to_be_isolated_node]:
|
||||
status = await manager.api.raft_topology_upgrade_status(s.ip_addr)
|
||||
if status == "done":
|
||||
upgraded_count += 1
|
||||
assert upgraded_count != 2
|
||||
|
||||
logging.info(f"Only {upgraded_count}/2 nodes finished upgrade, which was expected")
|
||||
|
||||
servers, others = [to_be_upgraded_node, to_be_isolated_node], to_be_shutdown_nodes
|
||||
|
||||
logging.info(f"Obtaining hosts for nodes {servers}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Restarting hosts {hosts} in recovery mode")
|
||||
await gather_safely(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await manager.rolling_restart(servers)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
for i in range(len(others)):
|
||||
to_remove = others[i]
|
||||
ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]]
|
||||
logging.info(f"Removing {to_remove} using {servers[0]} with ignore_dead: {ignore_dead_ips}")
|
||||
await manager.remove_node(servers[0].server_id, to_remove.server_id, ignore_dead_ips)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await gather_safely(*(delete_raft_topology_state(cql, h) for h in hosts))
|
||||
await gather_safely(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting hosts {hosts}")
|
||||
await manager.rolling_restart(servers)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Waiting until upgrade to raft schema finishes")
|
||||
await gather_safely(*(wait_until_schema_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking the topology upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
logging.info("Waiting until all nodes see others as alive")
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await gather_safely(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Waiting for CDC generations publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts)
|
||||
|
||||
logging.info("Booting three new nodes")
|
||||
servers += await gather_safely(*(manager.server_add() for _ in range(3)))
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for the new CDC generation publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts)
|
||||
@@ -201,6 +201,165 @@ async def test_view_build_status_snapshot(manager: ManagerClient):
|
||||
await wait_for_view(cql, "vt1", 4)
|
||||
await wait_for_view(cql, "vt2", 4)
|
||||
|
||||
# Start cluster in view_builder v1 mode and migrate to v2.
|
||||
# Verify the migration copies the v1 data, and the new v2 table
|
||||
# is used after the migration.
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_migration_to_v2(request, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
del cfg['force_gossip_topology_changes']
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
ks = await create_keyspace(cql)
|
||||
await create_table(cql, ks)
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
# Verify we're using v1 now
|
||||
v = await get_view_builder_version(cql)
|
||||
assert v == 1
|
||||
|
||||
await wait_for_row_count(cql, "system_distributed.view_build_status", 3, hosts[0])
|
||||
|
||||
result = await cql.run_async("SELECT * FROM system.view_build_status_v2")
|
||||
assert len(result) == 0
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking migrated data in system")
|
||||
|
||||
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
|
||||
|
||||
# Check that new writes are written to the v2 table
|
||||
await create_mv(cql, ks, "vt2")
|
||||
await asyncio.gather(*(wait_for_view_v2(cql, ks, "vt2", 3, host=h) for h in hosts))
|
||||
|
||||
await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0])
|
||||
|
||||
# Migrate the view_build_status table to v2 and write to the table during the migration.
|
||||
# The migration process goes through an intermediate stage where it writes to
|
||||
# both the old and new table, so the write should not be lost.
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_migration_to_v2_with_write_during_migration(request, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
del cfg['force_gossip_topology_changes']
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
ks = await create_keyspace(cql)
|
||||
await create_table(cql, ks)
|
||||
|
||||
inj_insert = "view_builder_pause_add_new_view"
|
||||
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
|
||||
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
# pause the migration between reading the old table and writing to the new table, so we have
|
||||
# a time window where new writes may be lost.
|
||||
# we don't know who the coordinator is so inject in all nodes.
|
||||
inj_upgrade = "view_builder_pause_in_migrate_v2"
|
||||
for s in servers:
|
||||
await manager.api.enable_injection(s.ip_addr, inj_upgrade, one_shot=True)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking migrated data in system")
|
||||
|
||||
# Now that the upgrade is paused, write the new view.
|
||||
await manager.api.message_injection(servers[1].ip_addr, inj_insert)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# continue the migration
|
||||
for s in servers:
|
||||
await manager.api.message_injection(s.ip_addr, inj_upgrade)
|
||||
|
||||
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
|
||||
|
||||
# Migrate the view_build_status table to v2 while there is an 'old' write operation in progress.
|
||||
# The migration should wait for the old operations to complete before continuing, otherwise
|
||||
# these writes may be lost.
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_migration_to_v2_barrier(request, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
del cfg['force_gossip_topology_changes']
|
||||
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(2)]
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
ks = await create_keyspace(cql)
|
||||
await create_table(cql, ks)
|
||||
|
||||
# Create MV and delay the write operation to the old table
|
||||
inj_insert = "view_builder_pause_add_new_view"
|
||||
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
# The upgrade should perform a barrier and wait for the delayed operation to complete before continuing.
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
# the upgrade should now be waiting for the insert to complete.
|
||||
# unpause the insert
|
||||
await asyncio.sleep(1)
|
||||
await manager.api.message_injection(servers[1].ip_addr, inj_insert)
|
||||
|
||||
logging.info("Checking migrated data in system")
|
||||
|
||||
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
|
||||
|
||||
# Test that when removing a node from the cluster, we clean its rows from
|
||||
# the view build status table.
|
||||
@pytest.mark.asyncio
|
||||
@@ -269,6 +428,74 @@ async def test_view_build_status_with_replace_node(manager: ManagerClient):
|
||||
|
||||
await wait_for(node_rows_replaced, time.time() + 60)
|
||||
|
||||
# Start with view_build_status v1 mode, and create entries such that
|
||||
# some of them correspond to removed nodes or non-existent views.
|
||||
# Then migrate to v2 table and verify that only valid entries belonging to known nodes
|
||||
# and views are migrated to the new table.
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_migration_to_v2_with_cleanup(request, manager: ManagerClient):
|
||||
# First, force the first node to start in legacy mode
|
||||
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
|
||||
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
# Enable raft-based node operations for subsequent nodes - they should fall back to
|
||||
# using gossiper-based node operations
|
||||
del cfg['force_gossip_topology_changes']
|
||||
|
||||
# We start with total 4 nodes and we will remove one of them before the migration.
|
||||
servers += [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking the upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
# Create a view. This will insert 4 entries to the view build status table, one for each node.
|
||||
ks_name = await create_keyspace(cql)
|
||||
await create_table(cql, ks_name)
|
||||
await create_mv(cql, ks_name, "vt1")
|
||||
|
||||
await wait_for_view_v1(cql, "vt1", 4)
|
||||
|
||||
await wait_for_row_count(cql, "system_distributed.view_build_status", 4, hosts[0])
|
||||
|
||||
# Insert a row that doesn't correspond to an existing view, but does correspond to a known host.
|
||||
# This row should get cleaned during migration.
|
||||
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
||||
await cql.run_async(f"INSERT INTO system_distributed.view_build_status(keyspace_name, view_name, host_id, status) \
|
||||
VALUES ('{ks_name}', 'view_doesnt_exist', {s0_host_id}, 'SUCCESS')")
|
||||
|
||||
# Remove the last node. the entry for this node in the view build status remains and it
|
||||
# corresponds now to an unknown node. The migration should remove it.
|
||||
logging.info("Removing last node")
|
||||
await manager.server_stop_gracefully(servers[-1].server_id)
|
||||
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
|
||||
|
||||
servers = servers[:-1]
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking migrated data in system")
|
||||
|
||||
# Wait for migration and upgrade to view build status v2.
|
||||
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
|
||||
await wait_for_view_v2(cql, ks_name, "vt1", 3)
|
||||
|
||||
# Verify that after migration we kept only the entries for the known nodes and views.
|
||||
async def rows_migrated():
|
||||
result = await cql.run_async("SELECT * FROM system.view_build_status_v2", host=hosts[0])
|
||||
return (len(result) == 3) or None
|
||||
|
||||
await wait_for(rows_migrated, time.time() + 60)
|
||||
|
||||
# Reproduces scylladb/scylladb#20754
|
||||
# View build status migration is doing read with CL=ALL, so it requires all nodes to be up.
|
||||
# Before the fix, the migration was triggered too early, causing unavailable exception in topology coordinator.
|
||||
|
||||
@@ -28,6 +28,13 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
|
||||
def get_pf(rack: str) -> dict[str, str]:
|
||||
return {"dc": "dc1", "rack": rack}
|
||||
|
||||
logging.info('Trying to add a zero-token server in the gossip-based topology')
|
||||
await manager.server_add(config={'join_ring': False,
|
||||
'force_gossip_topology_changes': True,
|
||||
'tablets_mode_for_new_keyspaces': 'disabled'},
|
||||
property_file=get_pf("rz"),
|
||||
expected_error='the raft-based topology is disabled')
|
||||
|
||||
normal_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
|
||||
zero_token_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled', 'join_ring': False}
|
||||
|
||||
|
||||
@@ -683,7 +683,7 @@ class EquivalentIp:
|
||||
return f'EquivalentIp("{self.obj}")'
|
||||
|
||||
# Reproduces issue #7972, #7988, #7997, #8001
|
||||
@pytest.mark.xfail(reason="issues #7997, #8001")
|
||||
@pytest.mark.xfail(reason="issues #7972, #7997, #8001")
|
||||
def testToJsonFct(cql, test_keyspace):
|
||||
abc_tuple = collections.namedtuple('abc_tuple', ['a', 'b', 'c'])
|
||||
with create_type(cql, test_keyspace, "(a int, b uuid, c set<text>)") as type_name:
|
||||
@@ -933,6 +933,7 @@ def testToJsonFct(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT k, toJson(durationval) FROM %s WHERE k = ?", 0), [0, "\"1y1mo2d10h5m\""])
|
||||
|
||||
# Reproduces issue #8077
|
||||
@pytest.mark.xfail(reason="issues #8077")
|
||||
def testJsonWithGroupBy(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int, c int, v int, PRIMARY KEY (k, c))") as table:
|
||||
# tests SELECT JSON statements
|
||||
@@ -953,6 +954,7 @@ def testJsonWithGroupBy(cql, test_keyspace):
|
||||
["{\"count\": 1}"])
|
||||
|
||||
# Reproduces issues #8077, #8078
|
||||
@pytest.mark.xfail(reason="issues #8077")
|
||||
def testSelectJsonSyntax(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int primary key, v int)") as table:
|
||||
# tests SELECT JSON statements
|
||||
|
||||
@@ -494,6 +494,7 @@ def testLimitWithUnset(cql, test_keyspace):
|
||||
[2]
|
||||
)
|
||||
|
||||
@pytest.mark.xfail(reason="#10358 - comparison with unset doesn't generate error")
|
||||
def testWithUnsetValues(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int, i int, j int, s text, PRIMARY KEY (k,i,j))") as table:
|
||||
execute(cql, table, "CREATE INDEX ON %s (s)")
|
||||
@@ -576,6 +577,7 @@ def testInvalidColumnNames(cql, test_keyspace):
|
||||
assert_invalid(cql, table, "SELECT c AS d FROM %s WHERE d CONTAINS KEY 0")
|
||||
assert_invalid_message(cql, table, "name d", "SELECT d FROM %s WHERE a = 0")
|
||||
|
||||
@pytest.mark.xfail(reason="#10632 - strange error message")
|
||||
def testInvalidNonFrozenUDTRelation(cql, test_keyspace):
|
||||
with create_type(cql, test_keyspace, "(a int)") as type:
|
||||
with create_table(cql, test_keyspace, f"(a int PRIMARY KEY, b {type})") as table:
|
||||
|
||||
@@ -918,6 +918,7 @@ def testFilteringOnStaticColumnsWithRowsWithOnlyStaticValues(cql, test_keyspace)
|
||||
[4, 2, 4, 2, 6])
|
||||
|
||||
# Reproduces #10357, #10358
|
||||
@pytest.mark.xfail(reason="#10358")
|
||||
def testFilteringWithoutIndices(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a int, b int, c int, d int, s int static, PRIMARY KEY (a, b))") as table:
|
||||
execute(cql, table, "INSERT INTO %s (a, b, c, d) VALUES (1, 2, 4, 8)")
|
||||
|
||||
@@ -34,12 +34,6 @@ CDC_LOG_TABLE_DESC_PREFIX =
|
||||
" enabled option or creating the vector index on the base table's vector column.\n" \
|
||||
"\n"
|
||||
CDC_LOG_TABLE_DESC_SUFFIX = "\n*/"
|
||||
# The prefix of the create statement returned by `DESC TABLE "tbl$paxos"` and corresponding to a Paxos state table.
|
||||
PAXOS_STATE_TABLE_DESC_PREFIX = \
|
||||
"/* Do NOT execute this statement! It's only for informational purposes.\n" \
|
||||
" A paxos state table is created automatically when enabling LWT on a base table.\n" \
|
||||
"\n"
|
||||
PAXOS_STATE_TABLE_DESC_SUFFIX = "\n*/"
|
||||
|
||||
def filter_non_default_user(desc_result_iter: Iterable[DescRowType]) -> Iterable[DescRowType]:
|
||||
return filter(lambda result: result.name != DEFAULT_SUPERUSER, desc_result_iter)
|
||||
@@ -3382,62 +3376,3 @@ def test_desc_table_tombstone_gc(cql, test_keyspace, scylla_only):
|
||||
# ignore spaces in comparison, as different versions of Scylla
|
||||
# add spaces in different places
|
||||
assert with_clause.replace(' ','') in desc.create_statement.replace(' ','')
|
||||
|
||||
# Ever since tablets were introduced to Scylla, LWT writes its Paxos log not
|
||||
# in a central system table, but in a new table named "...$paxos" in the same
|
||||
# keyspace.
|
||||
# Similary as for CDC's internal tables, we expect these internal Paxos tables
|
||||
# (which have names that are not valid CQL) to be hidden from various DESCRIBE commands.
|
||||
# The only difference is that when describing a CDC base table,
|
||||
# an ALTER statement of the log table is added to the description,
|
||||
# but this is not the case for Paxos tables.
|
||||
# This test checks that these internal tables aren't listed by DESCRIBE commands.
|
||||
# Reproduces issue #28183
|
||||
def test_hide_paxos_table(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int primary key, x int") as table:
|
||||
# The extra "...$paxos" table only appears after a real LWT write is
|
||||
# performed. So let's do an LWT operation that will cause it to be
|
||||
# created.
|
||||
cql.execute(f'INSERT INTO {table}(p,x) values (1,2) IF NOT EXISTS')
|
||||
|
||||
# DESC TABLES
|
||||
# Look at the tables in test_keyspace, check that the test table is
|
||||
# in this list, but its long and unique name (by unique_table_name())
|
||||
# isn't a proper substring of any other table's name.
|
||||
tables = [r.name for r in cql.execute('DESC TABLES') if r.keyspace_name == test_keyspace]
|
||||
_, table_name = table.split('.')
|
||||
assert table_name in tables
|
||||
for listed_name in tables:
|
||||
if table_name != listed_name:
|
||||
assert table_name not in listed_name
|
||||
|
||||
# DESC SCHEMA
|
||||
tables = [r.name for r in cql.execute('DESC SCHEMA') if r.keyspace_name == test_keyspace]
|
||||
assert table_name in tables
|
||||
for listed_name in tables:
|
||||
if table_name != listed_name:
|
||||
assert table_name not in listed_name
|
||||
|
||||
# DESC KEYSPACE of the test keyspace
|
||||
# Again, the test table should be in the list, but no other table
|
||||
# that contains that name.
|
||||
tables = [r.name for r in cql.execute(f'DESC KEYSPACE {test_keyspace}')]
|
||||
assert table_name in tables
|
||||
for listed_name in tables:
|
||||
if table_name != listed_name:
|
||||
assert table_name not in listed_name
|
||||
|
||||
# It is allowed to directly describe a Paxos state table with `DESC ks."tbl$paxos"`
|
||||
# but it should contain only commented-out CQL statements, so executing them is a no-op.
|
||||
def test_paxos_table_described_in_comment(scylla_only, cql, test_keyspace):
|
||||
paxos_table_desc = ""
|
||||
with new_test_table(cql, test_keyspace, "p int primary key, x int") as table:
|
||||
# The extra "...$paxos" table only appears after a real LWT write is
|
||||
# performed. So let's do an LWT operation that will cause it to be
|
||||
# created.
|
||||
cql.execute(f'INSERT INTO {table}(p,x) values (1,2) IF NOT EXISTS')
|
||||
paxos_table_desc = cql.execute(f'DESC TABLE {test_keyspace}."{table.split('.')[1]}$paxos"').one().create_statement
|
||||
|
||||
assert paxos_table_desc.startswith(PAXOS_STATE_TABLE_DESC_PREFIX)
|
||||
assert paxos_table_desc.endswith(PAXOS_STATE_TABLE_DESC_SUFFIX)
|
||||
assert f'CREATE TABLE {test_keyspace}."{table.split('.')[1]}$paxos"' in paxos_table_desc
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
# to reproduce bugs discovered by bigger Cassandra tests.
|
||||
#############################################################################
|
||||
|
||||
from .util import unique_name, unique_key_int, new_test_table
|
||||
from .util import unique_name, unique_key_int
|
||||
|
||||
from cassandra.protocol import FunctionFailure
|
||||
from cassandra.util import Date, Time
|
||||
@@ -414,6 +414,7 @@ def test_fromjson_null_constant(cql, table1):
|
||||
# (issue #7972) where the double value 123.456 was correctly formatted, but
|
||||
# the value 123123.123123 was truncated to an integer. This test reproduces
|
||||
# this.
|
||||
@pytest.mark.xfail(reason="issue #7972")
|
||||
def test_tojson_double(cql, table1):
|
||||
p = unique_key_int()
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, d) VALUES (?, ?)")
|
||||
@@ -519,6 +520,7 @@ def test_tojson_decimal_high_mantissa2(cql, table1):
|
||||
|
||||
# Reproducers for issue #8077: SELECT JSON on a function call should result
|
||||
# in the same JSON strings as it does on Cassandra.
|
||||
@pytest.mark.xfail(reason="issue #8077")
|
||||
def test_select_json_function_call(cql, table1):
|
||||
p = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, 17) USING TIMESTAMP 1234")
|
||||
@@ -602,24 +604,3 @@ def test_select_json_with_alias(cql, table1):
|
||||
}
|
||||
for input, output in input_and_output.items():
|
||||
assert list(cql.execute(f"SELECT JSON {input} from {table1} where p = {p}")) == [(EquivalentJson(output),)]
|
||||
|
||||
# The grammar around DISTINCT and JSON is hairy. Test the combination.
|
||||
def test_select_distinct_json(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int, c int, v int, PRIMARY KEY (p, c)") as table:
|
||||
p1 = unique_key_int()
|
||||
p2 = unique_key_int()
|
||||
# Insert two rows per partition
|
||||
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES ({p1}, 1, 10)")
|
||||
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES ({p1}, 2, 20)")
|
||||
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES ({p2}, 1, 30)")
|
||||
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES ({p2}, 2, 40)")
|
||||
# DISTINCT can only select partition key columns (p)
|
||||
# Should return exactly 2 rows (one per partition)
|
||||
result = list(cql.execute(f"SELECT JSON DISTINCT p FROM {table} WHERE p IN ({p1}, {p2})"))
|
||||
# Check that the results are valid JSON with the expected structure
|
||||
json_values = sorted([json.loads(row[0])["p"] for row in result])
|
||||
assert json_values == sorted([p1, p2])
|
||||
# Without DISTINCT, should return all 4 rows
|
||||
result = list(cql.execute(f"SELECT JSON p FROM {table} WHERE p IN ({p1}, {p2})"))
|
||||
json_values = [json.loads(row[0])["p"] for row in result]
|
||||
assert sorted(json_values) == sorted([p1, p1, p2, p2])
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user