Compare commits
59 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91aab869b8 | ||
|
|
168e0a40e3 | ||
|
|
ac54f21504 | ||
|
|
dbeca7c14d | ||
|
|
0a22ac3c9e | ||
|
|
8a07b41ae4 | ||
|
|
f8879d797d | ||
|
|
8e480110c2 | ||
|
|
977fa91e3d | ||
|
|
2cb98fd612 | ||
|
|
59019bc9a9 | ||
|
|
fc37518aff | ||
|
|
95700c5f7f | ||
|
|
7f34366b9d | ||
|
|
8555fd42df | ||
|
|
1337f4213f | ||
|
|
1dbf53ca29 | ||
|
|
a6c12ed1ef | ||
|
|
60ac13d75d | ||
|
|
9208b2f317 | ||
|
|
296b116ae2 | ||
|
|
de321218bc | ||
|
|
c7e7a9e120 | ||
|
|
e878042987 | ||
|
|
579031cfc8 | ||
|
|
7cc6b0d960 | ||
|
|
88fd9a34c4 | ||
|
|
9b6ce030d0 | ||
|
|
cb30eb2e21 | ||
|
|
e3a0935482 | ||
|
|
88765f627a | ||
|
|
7a72155374 | ||
|
|
d458dd41c6 | ||
|
|
adf9c426c2 | ||
|
|
3eb7193458 | ||
|
|
fd521cee6f | ||
|
|
284c73d466 | ||
|
|
e7dbccd59e | ||
|
|
1c0d847281 | ||
|
|
2bd173da97 | ||
|
|
87492d3073 | ||
|
|
55ecd92feb | ||
|
|
a0bf932caa | ||
|
|
2fc812a1b9 | ||
|
|
fdd623e6bc | ||
|
|
044b001bb4 | ||
|
|
12dabdec66 | ||
|
|
96e727d7b9 | ||
|
|
d95ebe7058 | ||
|
|
492c664fbb | ||
|
|
2dbd1a85a3 | ||
|
|
51186b2f2c | ||
|
|
e99c8eee08 | ||
|
|
92462e502f | ||
|
|
7640ade04d | ||
|
|
5d89816fed | ||
|
|
37b9cccc1c | ||
|
|
10f07fb95a | ||
|
|
b4ca12b39a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -37,4 +37,3 @@ clang_build
|
||||
.idea/
|
||||
nuke
|
||||
rust/target
|
||||
|
||||
|
||||
@@ -136,6 +136,7 @@ future<> controller::start_server() {
|
||||
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds,
|
||||
_config.alternator_enforce_authorization,
|
||||
_config.alternator_warn_authorization,
|
||||
_config.alternator_max_users_query_size_in_trace_output,
|
||||
&_memory_limiter.local().get_semaphore(),
|
||||
_config.max_concurrent_requests_per_shard);
|
||||
|
||||
@@ -245,7 +245,8 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
_cdc_metadata(cdc_metadata),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
|
||||
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
|
||||
_ssg(ssg),
|
||||
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
|
||||
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
|
||||
@@ -881,15 +882,37 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
co_return rjson::print(std::move(response));
|
||||
}
|
||||
|
||||
// This function increments the authorization_failures counter, and may also
|
||||
// log a warn-level message and/or throw an access_denied exception, depending
|
||||
// on what enforce_authorization and warn_authorization are set to.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authorization_error must explicitly return after calling this function.
|
||||
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
|
||||
stats.authorization_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("alternator_warn_authorization=true: {}", msg);
|
||||
}
|
||||
throw api_error::access_denied(std::move(msg));
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization,
|
||||
const service::client_state& client_state,
|
||||
const schema_ptr& schema,
|
||||
auth::permission permission_to_check) {
|
||||
if (!enforce_authorization) {
|
||||
auth::permission permission_to_check,
|
||||
alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
// Unfortunately, the fix for issue #23218 did not modify the function
|
||||
@@ -904,31 +927,33 @@ future<> verify_permission(
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(fmt::format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
|
||||
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
if (!client_state.user() || !client_state.user()->name ||
|
||||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
// Using exceptions for errors makes this function faster in the
|
||||
// success path (when the operation is allowed).
|
||||
throw api_error::access_denied(format(
|
||||
"{} access on table {}.{} is denied to role {}",
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"{} access on table {}.{} is denied to role {}, client address {}",
|
||||
auth::permissions::to_string(permission_to_check),
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to verify_permission() above, but just for CREATE operations.
|
||||
// Those do not operate on any specific table, so require permissions on
|
||||
// ALL KEYSPACES instead of any specific table.
|
||||
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
|
||||
if (!enforce_authorization) {
|
||||
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
auto resource = auth::resource(auth::resource_kind::data);
|
||||
@@ -937,7 +962,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"CREATE access on ALL KEYSPACES is denied to role {}", username));
|
||||
}
|
||||
}
|
||||
@@ -954,7 +979,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
@@ -1292,7 +1317,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
|
||||
if (tags->Size() < 1) {
|
||||
co_return api_error::validation("The number of tags must be at least 1") ;
|
||||
}
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
|
||||
});
|
||||
@@ -1313,7 +1338,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
|
||||
|
||||
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
|
||||
});
|
||||
@@ -1516,7 +1541,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
}
|
||||
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
@@ -1722,7 +1747,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, client_state);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1823,9 +1848,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1878,7 +1903,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
verify_billing_mode(request);
|
||||
}
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
schema_ptr schema;
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -2049,7 +2074,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
|
||||
for (view_ptr view : new_views) {
|
||||
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
|
||||
@@ -2817,7 +2842,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -2921,7 +2946,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -3199,7 +3224,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
|
||||
}
|
||||
for (const auto& b : mutation_builders) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
|
||||
}
|
||||
// If alternator_force_read_before_write is true we will first get the previous item size
|
||||
// and only then do send the mutation.
|
||||
@@ -4425,7 +4450,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -4536,7 +4561,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
|
||||
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
|
||||
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
|
||||
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
service::storage_proxy::coordinator_query_result qr =
|
||||
co_await _proxy.query(
|
||||
schema, std::move(command), std::move(partition_ranges), cl,
|
||||
@@ -4668,7 +4693,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
|
||||
for (const table_requests& tr : requests) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
@@ -5128,10 +5153,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
filter filter,
|
||||
query::partition_slice::option_set custom_opts,
|
||||
service::client_state& client_state,
|
||||
cql3::cql_stats& cql_stats,
|
||||
alternator::stats& stats,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit,
|
||||
bool enforce_authorization) {
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization) {
|
||||
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
|
||||
|
||||
tracing::trace(trace_state, "Performing a database query");
|
||||
@@ -5158,7 +5184,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
|
||||
|
||||
auto regular_columns =
|
||||
table_schema->regular_columns() | std::views::transform(&column_definition::id)
|
||||
@@ -5194,9 +5220,9 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
|
||||
}
|
||||
if (has_filter) {
|
||||
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
|
||||
cql_stats.filtered_rows_matched_total += size;
|
||||
stats.cql_stats.filtered_rows_matched_total += size;
|
||||
}
|
||||
if (opt_items) {
|
||||
if (opt_items->size() >= max_items_for_rapidjson_array) {
|
||||
@@ -5320,7 +5346,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
||||
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
|
||||
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
|
||||
@@ -5801,7 +5827,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
||||
query::partition_slice::option_set opts;
|
||||
opts.set_if<query::partition_slice::option::reversed>(!forward);
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
|
||||
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
|
||||
@@ -139,6 +139,7 @@ class executor : public peering_sharded_service<executor> {
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
// An smp_service_group to be used for limiting the concurrency when
|
||||
// forwarding Alternator request between shards - if necessary for LWT.
|
||||
smp_service_group _ssg;
|
||||
@@ -264,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
|
||||
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
|
||||
|
||||
/**
|
||||
* Make return type for serializing the object "streamed",
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "utils/aws_sigv4.hh"
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -270,24 +271,57 @@ protected:
|
||||
}
|
||||
};
|
||||
|
||||
// This function increments the authentication_failures counter, and may also
|
||||
// log a warn-level message and/or throw an exception, depending on what
|
||||
// enforce_authorization and warn_authorization are set to.
|
||||
// The username and client address are only used for logging purposes -
|
||||
// they are not included in the error message returned to the client, since
|
||||
// the client knows who it is.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authentication_error must explicitly return after calling this function.
|
||||
template<typename Exception>
|
||||
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
|
||||
stats.authentication_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
throw std::move(e);
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
|
||||
if (!_enforce_authorization) {
|
||||
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
|
||||
slogger.debug("Skipping authorization");
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto host_it = req._headers.find("Host");
|
||||
if (host_it == req._headers.end()) {
|
||||
throw api_error::invalid_signature("Host header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature("Host header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto authorization_it = req._headers.find("Authorization");
|
||||
if (authorization_it == req._headers.end()) {
|
||||
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string host = host_it->second;
|
||||
std::string_view authorization_header = authorization_it->second;
|
||||
auto pos = authorization_header.find_first_of(' ');
|
||||
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
authorization_header.remove_prefix(pos+1);
|
||||
std::string credential;
|
||||
@@ -322,7 +356,9 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
|
||||
std::vector<std::string_view> credential_split = split(credential, '/');
|
||||
if (credential_split.size() != 5) {
|
||||
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string user(credential_split[0]);
|
||||
std::string datestamp(credential_split[1]);
|
||||
@@ -346,7 +382,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
|
||||
return get_key_from_roles(proxy, as, std::move(username));
|
||||
};
|
||||
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
|
||||
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
|
||||
user = std::move(user),
|
||||
host = std::move(host),
|
||||
datestamp = std::move(datestamp),
|
||||
@@ -354,18 +390,32 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
signed_headers_map = std::move(signed_headers_map),
|
||||
region = std::move(region),
|
||||
service = std::move(service),
|
||||
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
|
||||
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
|
||||
key_cache::value_ptr key_ptr(nullptr);
|
||||
try {
|
||||
key_ptr = key_ptr_fut.get();
|
||||
} catch (const api_error& e) {
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
e, user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
std::string signature;
|
||||
try {
|
||||
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
|
||||
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
|
||||
} catch (const std::exception& e) {
|
||||
throw api_error::invalid_signature(e.what());
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
|
||||
if (signature != std::string_view(user_signature)) {
|
||||
_key_cache.remove(user);
|
||||
throw api_error::unrecognized_client("The security token included in the request is invalid.");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::unrecognized_client("wrong signature"),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
return user;
|
||||
});
|
||||
@@ -618,7 +668,6 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
, _auth_service(auth_service)
|
||||
, _sl_controller(sl_controller)
|
||||
, _key_cache(1024, 1min, slogger)
|
||||
, _enforce_authorization(false)
|
||||
, _max_users_query_size_in_trace_output(1024)
|
||||
, _enabled_servers{}
|
||||
, _pending_requests("alternator::server::pending_requests")
|
||||
@@ -700,10 +749,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
_enforce_authorization = std::move(enforce_authorization);
|
||||
_warn_authorization = std::move(warn_authorization);
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
|
||||
if (!port && !https_port) {
|
||||
|
||||
@@ -47,6 +47,7 @@ class server : public peering_sharded_service<server> {
|
||||
|
||||
key_cache _key_cache;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
utils::updateable_value<uint64_t> _max_users_query_size_in_trace_output;
|
||||
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
|
||||
named_gate _pending_requests;
|
||||
@@ -99,7 +100,7 @@ public:
|
||||
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
// get_client_data() is called (on each shard separately) when the virtual
|
||||
|
||||
@@ -188,6 +188,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
|
||||
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
|
||||
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
|
||||
});
|
||||
|
||||
// Only register the following metrics for the global metrics, not per-table
|
||||
if (!has_table) {
|
||||
metrics.add_group("alternator", {
|
||||
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
|
||||
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
|
||||
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {
|
||||
|
||||
@@ -105,6 +105,17 @@ public:
|
||||
// The sizes are the the written items' sizes grouped per table.
|
||||
utils::estimated_histogram batch_write_item_op_size_kb{30};
|
||||
} operation_sizes;
|
||||
// Count of authentication and authorization failures, counted if either
|
||||
// alternator_enforce_authorization or alternator_warn_authorization are
|
||||
// set to true. If both are false, no authentication or authorization
|
||||
// checks are performed, so failures are not recognized or counted.
|
||||
// "authentication" failure means the request was not signed with a valid
|
||||
// user and key combination. "authorization" failure means the request was
|
||||
// authenticated to a valid user - but this user did not have permissions
|
||||
// to perform the operation (considering RBAC settings and the user's
|
||||
// superuser status).
|
||||
uint64_t authentication_failures = 0;
|
||||
uint64_t authorization_failures = 0;
|
||||
// Miscellaneous event counters
|
||||
uint64_t total_operations = 0;
|
||||
uint64_t unsupported_operations = 0;
|
||||
|
||||
@@ -827,7 +827,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
|
||||
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
|
||||
partition_key pk = iter.shard.id.to_partition_key(*schema);
|
||||
|
||||
@@ -95,7 +95,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
}
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
if (enabled) {
|
||||
if (tags_map.contains(TTL_TAG_KEY)) {
|
||||
|
||||
@@ -220,6 +220,25 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/nodes/excluded",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Retrieve host ids of nodes which are marked as excluded",
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"string"
|
||||
},
|
||||
"nickname":"get_excluded_nodes",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/nodes/joining",
|
||||
"operations":[
|
||||
@@ -1571,6 +1590,30 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/exclude_node",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Marks the node as permanently down (excluded).",
|
||||
"type":"void",
|
||||
"nickname":"exclude_node",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"hosts",
|
||||
"description":"Comma-separated list of host ids to exclude",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/removal_status",
|
||||
"operations":[
|
||||
|
||||
@@ -42,6 +42,14 @@
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"consider_only_existing_data",
|
||||
"description":"Set to \"true\" to flush all memtables and force tombstone garbage collection to check only the sstables being compacted (false by default). The memtable, commitlog and other uncompacted sstables will not be checked during tombstone garbage collection.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -844,6 +844,25 @@ rest_remove_node(sharded<service::storage_service>& ss, std::unique_ptr<http::re
|
||||
});
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_exclude_node(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
auto hosts = utils::split_comma_separated_list(req->get_query_param("hosts"))
|
||||
| std::views::transform([] (const sstring& s) { return locator::host_id(utils::UUID(s)); })
|
||||
| std::ranges::to<std::vector<locator::host_id>>();
|
||||
|
||||
auto& topo = ss.local().get_token_metadata().get_topology();
|
||||
for (auto host : hosts) {
|
||||
if (!topo.has_node(host)) {
|
||||
throw bad_param_exception(fmt::format("Host ID {} does not belong to this cluster", host));
|
||||
}
|
||||
}
|
||||
|
||||
apilog.info("exclude_node: hosts={}", hosts);
|
||||
co_await ss.local().mark_excluded(hosts);
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_removal_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
@@ -1769,6 +1788,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss));
|
||||
ss::move.set(r, rest_bind(rest_move, ss));
|
||||
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
|
||||
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
|
||||
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
|
||||
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
|
||||
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
|
||||
@@ -1846,6 +1866,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
|
||||
ss::decommission.unset(r);
|
||||
ss::move.unset(r);
|
||||
ss::remove_node.unset(r);
|
||||
ss::exclude_node.unset(r);
|
||||
ss::get_removal_status.unset(r);
|
||||
ss::force_remove_completion.unset(r);
|
||||
ss::set_logging_level.unset(r);
|
||||
|
||||
127
api/tasks.cc
127
api/tasks.cc
@@ -38,76 +38,78 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
|
||||
};
|
||||
}
|
||||
|
||||
static future<shared_ptr<compaction::major_keyspace_compaction_task_impl>> force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<compaction::flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = compaction::flush_mode::skip;
|
||||
}
|
||||
return compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
|
||||
}
|
||||
|
||||
static future<shared_ptr<compaction::upgrade_sstables_compaction_task_impl>> upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
|
||||
auto& db = ctx.db;
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
return compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
}
|
||||
|
||||
static future<shared_ptr<compaction::cleanup_keyspace_compaction_task_impl>> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
|
||||
if (rs.is_local() || !rs.is_vnode_based()) {
|
||||
auto reason = rs.is_local() ? "require" : "support";
|
||||
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
|
||||
co_return nullptr;
|
||||
}
|
||||
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
|
||||
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
|
||||
auto msg = "Can not perform cleanup operation when topology changes";
|
||||
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
|
||||
co_await coroutine::return_exception(std::runtime_error(msg));
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
co_return co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
|
||||
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
|
||||
}
|
||||
|
||||
void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
|
||||
t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
apilog.debug("force_keyspace_compaction_async: keyspace={} tables={}, flush={}", keyspace, table_infos, flush);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<compaction::flush_mode> fmopt;
|
||||
if (!flush) {
|
||||
fmopt = compaction::flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt);
|
||||
|
||||
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
});
|
||||
|
||||
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<compaction::flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = compaction::flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
|
||||
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
|
||||
co_await task->done();
|
||||
co_return json_void();
|
||||
});
|
||||
|
||||
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
apilog.info("force_keyspace_cleanup_async: keyspace={} tables={}", keyspace, table_infos);
|
||||
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
|
||||
auto msg = "Can not perform cleanup operation when topology changes";
|
||||
apilog.warn("force_keyspace_cleanup_async: keyspace={} tables={}: {}", keyspace, table_infos, msg);
|
||||
co_await coroutine::return_exception(std::runtime_error(msg));
|
||||
tasks::task_id id = tasks::task_id::create_null_id();
|
||||
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
|
||||
if (task) {
|
||||
id = task->get_status().id;
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
|
||||
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
co_return json::json_return_type(id.to_sstring());
|
||||
});
|
||||
|
||||
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
|
||||
if (rs.is_local() || !rs.is_vnode_based()) {
|
||||
auto reason = rs.is_local() ? "require" : "support";
|
||||
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
|
||||
co_return json::json_return_type(0);
|
||||
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
|
||||
if (task) {
|
||||
co_await task->done();
|
||||
}
|
||||
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
|
||||
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
|
||||
auto msg = "Can not perform cleanup operation when topology changes";
|
||||
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
|
||||
co_await coroutine::return_exception(std::runtime_error(msg));
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
|
||||
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
@@ -129,25 +131,12 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
}));
|
||||
|
||||
t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
|
||||
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
}));
|
||||
|
||||
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
}));
|
||||
|
||||
@@ -62,6 +62,17 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
|
||||
return addr | std::ranges::to<std::vector>();
|
||||
});
|
||||
|
||||
ss::get_excluded_nodes.set(r, [&tm](const_req req) {
|
||||
const auto& local_tm = *tm.local().get();
|
||||
std::vector<sstring> eps;
|
||||
local_tm.get_topology().for_each_node([&] (auto& node) {
|
||||
if (node.is_excluded()) {
|
||||
eps.push_back(node.host_id().to_sstring());
|
||||
}
|
||||
});
|
||||
return eps;
|
||||
});
|
||||
|
||||
ss::get_joining_nodes.set(r, [&tm, &g](const_req req) {
|
||||
const auto& local_tm = *tm.local().get();
|
||||
const auto& points = local_tm.get_bootstrap_tokens();
|
||||
@@ -130,6 +141,7 @@ void unset_token_metadata(http_context& ctx, routes& r) {
|
||||
ss::get_leaving_nodes.unset(r);
|
||||
ss::get_moving_nodes.unset(r);
|
||||
ss::get_joining_nodes.unset(r);
|
||||
ss::get_excluded_nodes.unset(r);
|
||||
ss::get_host_id_map.unset(r);
|
||||
httpd::endpoint_snitch_info_json::get_datacenter.unset(r);
|
||||
httpd::endpoint_snitch_info_json::get_rack.unset(r);
|
||||
|
||||
@@ -1209,7 +1209,7 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const std::vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
|
||||
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
|
||||
auto s = db::system_keyspace::cdc_streams_state();
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s,
|
||||
@@ -1252,24 +1252,24 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
|
||||
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
|
||||
}
|
||||
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
|
||||
if (tables) {
|
||||
for (auto table : *tables) {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
table_streams new_table_map;
|
||||
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
|
||||
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
|
||||
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
|
||||
};
|
||||
@@ -1345,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
|
||||
}
|
||||
}
|
||||
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
|
||||
auto table_it = all_tables.find(table);
|
||||
if (table_it == all_tables.end()) {
|
||||
@@ -1402,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<cdc::stream_id> new_streams;
|
||||
new_streams.reserve(new_tablet_map.tablet_count());
|
||||
utils::chunked_vector<cdc::stream_id> new_streams;
|
||||
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
|
||||
for (auto tid : new_tablet_map.tablet_ids()) {
|
||||
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -1425,7 +1425,7 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
muts.emplace_back(std::move(mut));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
|
||||
utils::chunked_vector<mutation> muts;
|
||||
muts.reserve(2);
|
||||
|
||||
|
||||
@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
|
||||
|
||||
struct committed_stream_set {
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
|
||||
struct cdc_stream_diff {
|
||||
std::vector<stream_id> closed_streams;
|
||||
std::vector<stream_id> opened_streams;
|
||||
utils::chunked_vector<stream_id> closed_streams;
|
||||
utils::chunked_vector<stream_id> opened_streams;
|
||||
};
|
||||
|
||||
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
|
||||
@@ -220,11 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
|
||||
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const std::vector<cdc::stream_id>&, api::timestamp_type);
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
|
||||
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
|
||||
|
||||
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
|
||||
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -149,7 +149,7 @@ public:
|
||||
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
|
||||
}
|
||||
|
||||
static cdc::stream_id get_stream(
|
||||
const std::vector<cdc::stream_id>& streams,
|
||||
const utils::chunked_vector<cdc::stream_id>& streams,
|
||||
dht::token tok) {
|
||||
if (streams.empty()) {
|
||||
on_internal_error(cdc_log, "get_stream: streams empty");
|
||||
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
|
||||
return ret;
|
||||
}
|
||||
|
||||
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
auto now = api::new_timestamp();
|
||||
if (ts > now + get_generation_leeway().count()) {
|
||||
throw exceptions::invalid_request_exception(seastar::format(
|
||||
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
|
||||
return !it->second;
|
||||
}
|
||||
|
||||
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed) {
|
||||
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed) {
|
||||
|
||||
if (closed.size() == prev_stream_set.size()) {
|
||||
// all previous streams are closed, so the next stream set is just the opened streams.
|
||||
@@ -273,8 +273,8 @@ future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
// streams and removing the closed streams. we assume each stream set is
|
||||
// sorted by token, and the result is sorted as well.
|
||||
|
||||
std::vector<cdc::stream_id> next_stream_set;
|
||||
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
|
||||
utils::chunked_vector<cdc::stream_id> next_stream_set;
|
||||
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
|
||||
|
||||
auto next_prev = prev_stream_set.begin();
|
||||
auto next_closed = closed.begin();
|
||||
@@ -318,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
|
||||
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
|
||||
}
|
||||
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
|
||||
std::vector<stream_id> closed, opened;
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
|
||||
utils::chunked_vector<stream_id> closed, opened;
|
||||
|
||||
auto before_it = before.begin();
|
||||
auto after_it = after.begin();
|
||||
|
||||
@@ -49,7 +49,7 @@ class metadata final {
|
||||
|
||||
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
|
||||
|
||||
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
|
||||
public:
|
||||
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
|
||||
@@ -111,14 +111,14 @@ public:
|
||||
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
static future<std::vector<stream_id>> construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed);
|
||||
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed);
|
||||
|
||||
static future<cdc_stream_diff> generate_stream_diff(
|
||||
const std::vector<stream_id>& before,
|
||||
const std::vector<stream_id>& after);
|
||||
const utils::chunked_vector<stream_id>& before,
|
||||
const utils::chunked_vector<stream_id>& after);
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -145,9 +145,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
|
||||
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
|
||||
}
|
||||
compression_parameters cp(*compression_options);
|
||||
cp.validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
|
||||
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
|
||||
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
|
||||
}
|
||||
|
||||
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
|
||||
|
||||
@@ -112,14 +112,8 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
|
||||
ksm->strategy_name(),
|
||||
locator::replication_strategy_params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option()),
|
||||
tmptr->get_topology());
|
||||
if (rs->uses_tablets()) {
|
||||
warnings.push_back(
|
||||
"Tables in this keyspace will be replicated using Tablets "
|
||||
"and will not support counters features. To use counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
|
||||
if (ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
}
|
||||
if (rs->uses_tablets() && ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
}
|
||||
|
||||
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to create an RF-rack-invalid keyspace.
|
||||
|
||||
@@ -222,7 +222,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
|
||||
}
|
||||
|
||||
if (ks_uses_tablets && pt.is_counter()) {
|
||||
if (ks_uses_tablets && pt.is_counter() && !db.features().counters_with_tablets) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot use the 'counter' type for table {}.{}: Counters are not yet supported with tablets", keyspace(), cf_name));
|
||||
}
|
||||
|
||||
|
||||
@@ -3329,7 +3329,6 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
|
||||
commit_load_reader_func func;
|
||||
input_stream<char> fin;
|
||||
replay_state::impl& state;
|
||||
input_stream<char> r;
|
||||
uint64_t id = 0;
|
||||
size_t pos = 0;
|
||||
size_t next = 0;
|
||||
|
||||
@@ -1315,15 +1315,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
|
||||
"Server-global user table compression options. If enabled, all user tables"
|
||||
"will be compressed using the provided options, unless overridden"
|
||||
"by compression options in the table schema. The available options are:\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
|
||||
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
|
||||
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
|
||||
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
|
||||
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
|
||||
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
|
||||
@@ -1425,7 +1425,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
|
||||
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
|
||||
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
|
||||
|
||||
@@ -458,6 +458,7 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<bool> alternator_warn_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
named_value<uint32_t> alternator_streams_time_window_s;
|
||||
named_value<uint32_t> alternator_timeout_in_ms;
|
||||
|
||||
@@ -2463,14 +2463,14 @@ future<bool> system_keyspace::cdc_is_rewritten() {
|
||||
}
|
||||
|
||||
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) {
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
|
||||
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
|
||||
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
|
||||
|
||||
struct cur_t {
|
||||
table_id tid;
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
std::optional<cur_t> cur;
|
||||
|
||||
@@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
if (cur) {
|
||||
co_await f(cur->tid, cur->ts, std::move(cur->streams));
|
||||
}
|
||||
cur = { tid, ts, std::vector<cdc::stream_id>() };
|
||||
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
|
||||
}
|
||||
cur->streams.push_back(std::move(stream_id));
|
||||
|
||||
|
||||
@@ -601,7 +601,7 @@ public:
|
||||
future<bool> cdc_is_rewritten();
|
||||
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
|
||||
|
||||
// Load Raft Group 0 id from scylla.local
|
||||
|
||||
@@ -3311,15 +3311,6 @@ public:
|
||||
_step.base->schema()->cf_name(), _step.current_token(), view_names);
|
||||
}
|
||||
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
|
||||
if (_step.current_key.key().is_empty()) {
|
||||
// consumer got end-of-stream without consuming a single partition
|
||||
vlogger.debug("Reader didn't produce anything, marking views as built");
|
||||
while (!_step.build_status.empty()) {
|
||||
_built_views.views.push_back(std::move(_step.build_status.back()));
|
||||
_step.build_status.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
// before going back to the minimum token, advance current_key to the end
|
||||
// and check for built views in that range.
|
||||
_step.current_key = { _step.prange.end().value_or(dht::ring_position::max()).value().token(), partition_key::make_empty()};
|
||||
@@ -3338,6 +3329,7 @@ public:
|
||||
|
||||
// Called in the context of a seastar::thread.
|
||||
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
inject_failure("dont_start_build_step");
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto compaction_state = make_lw_shared<compact_for_query_state>(
|
||||
*step.reader.schema(),
|
||||
@@ -3372,6 +3364,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
||||
}).get();
|
||||
utils::get_local_injector().inject("delay_finishing_build_step", utils::wait_for_message(60s)).get();
|
||||
}
|
||||
|
||||
future<> view_builder::mark_as_built(view_ptr view) {
|
||||
|
||||
@@ -1278,7 +1278,7 @@ public:
|
||||
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
|
||||
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
|
||||
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await emit_stream_set(ts, cdc::stream_state::current, current);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
|
||||
|
||||
5
dist/common/scripts/scylla-housekeeping
vendored
5
dist/common/scripts/scylla-housekeeping
vendored
@@ -21,6 +21,11 @@ import urllib.request
|
||||
from pkg_resources import parse_version
|
||||
import multiprocessing as mp
|
||||
|
||||
# Python 3.14 changed the default to 'forkserver', which is not compatible
|
||||
# with our relocatable python. It execs our Python binary, but without our
|
||||
# ld.so. Change it back to 'fork' to avoid issues.
|
||||
mp.set_start_method('fork')
|
||||
|
||||
VERSION = "1.0"
|
||||
quiet = False
|
||||
# Temporary url for the review
|
||||
|
||||
@@ -109,6 +109,32 @@ to do what, configure the following in ScyllaDB's configuration:
|
||||
alternator_enforce_authorization: true
|
||||
```
|
||||
|
||||
Note: switching `alternator_enforce_authorization` from `false` to `true`
|
||||
before the client application has the proper secret keys and permission
|
||||
tables set up will cause the application's requests to immediately fail.
|
||||
Therefore, we recommend to begin by keeping `alternator_enforce_authorization`
|
||||
set to `false` and setting `alternator_warn_authorization` to `true`.
|
||||
This setting will continue to allow all requests without failing on
|
||||
authentication or authorization errors - but will _count_ would-be
|
||||
authentication and authorization failures in the two metrics:
|
||||
|
||||
* `scylla_alternator_authentication_failures`
|
||||
* `scylla_alternator_authorization_failures`
|
||||
|
||||
`alternator_warn_authorization=true` also generates a WARN-level log message
|
||||
on each authentication or authorization failure. These log messages each
|
||||
includes the string `alternator_enforce_authorization=true`, and information
|
||||
that can help pinpoint the source of the error - such as the username
|
||||
involved in the attempt, and the address of the client sending the request.
|
||||
|
||||
When you see that both metrics are not increasing (or, alternatively, that no
|
||||
more log messages appear), you can be sure that the application is properly
|
||||
set up and can finally set `alternator_enforce_authorization` to `true`.
|
||||
You can leave `alternator_warn_authorization` set or unset, depending on
|
||||
whether or not you want to see log messages when requests fail on
|
||||
authentication/authorization (in any case, the metric counts these failures,
|
||||
and the client will also get the error).
|
||||
|
||||
Alternator implements the same [signature protocol](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)
|
||||
as DynamoDB and the rest of AWS. Clients use, as usual, an access key ID and
|
||||
a secret access key to prove their identity and the authenticity of their
|
||||
|
||||
@@ -197,12 +197,6 @@ Limitations and Unsupported Features
|
||||
throughout its lifetime. Failing to keep that invariant satisfied may result in data inconsistencies,
|
||||
performance problems, or other issues.
|
||||
|
||||
The following ScyllaDB features are not supported if a keyspace has tablets
|
||||
enabled. If you plan to use any of the features listed below, CREATE your keyspace
|
||||
:ref:`with tablets disabled <tablets-enable-tablets>`.
|
||||
|
||||
* Counters
|
||||
|
||||
To enable materialized views and secondary indexes for tablet keyspaces, use
|
||||
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
|
||||
|
||||
@@ -3,8 +3,6 @@
|
||||
ScyllaDB Counters
|
||||
==================
|
||||
|
||||
.. note:: Counters are not supported in keyspaces with :doc:`tablets</architecture/tablets>` enabled.
|
||||
|
||||
Counters are useful for any application where you need to increment a count, such as keeping a track of:
|
||||
|
||||
* The number of web page views on a website.
|
||||
|
||||
47
docs/operating-scylla/nodetool-commands/excludenode.rst
Normal file
47
docs/operating-scylla/nodetool-commands/excludenode.rst
Normal file
@@ -0,0 +1,47 @@
|
||||
Nodetool excludenode
|
||||
====================
|
||||
|
||||
.. warning::
|
||||
You must never use the ``nodetool excludenode`` on a running node that can be reached by other nodes in the cluster.
|
||||
Before using the command, make sure the node is permanently down and cannot be recovered.
|
||||
|
||||
|
||||
Running ``excludenode`` will mark given nodes as permanently down (excluded).
|
||||
The cluster will no longer attempt to contact excluded nodes, which unblocks
|
||||
tablet load balancing, replication changes, etc.
|
||||
The nodes will be permanently banned from the cluster, meaning you won't be able to bring them back.
|
||||
|
||||
Data ownership is not changed, and the nodes are still cluster members,
|
||||
so have to be eventually removed or replaced.
|
||||
|
||||
After nodes are excluded, there is no need to pass them in the list of ignored
|
||||
nodes to removnenode, replace, or repair.
|
||||
|
||||
Prerequisites
|
||||
------------------------
|
||||
|
||||
* Using ``excludenode`` requires at least a quorum of nodes in a cluster to be available.
|
||||
If the quorum is lost, it must be restored before you change the cluster topology.
|
||||
See :doc:`Handling Node Failures </troubleshooting/handling-node-failures>` for details.
|
||||
|
||||
Usage
|
||||
--------
|
||||
|
||||
Provide the Host IDs of the nodes you want to mark as permanently down.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool excludenode <Host ID> [ ... <Host ID>]
|
||||
|
||||
Examples:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool excludenode 2d1e1b0a-4ecb-4128-ba45-36ba558f7aee
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool excludenode 2d1e1b0a-4ecb-4128-ba45-36ba558f7aee 73adf19e-2912-4cf6-b9ab-bbc74297b8de
|
||||
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
@@ -47,6 +47,12 @@ Example:
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
|
||||
To only mark the node as permanently down without doing actual removal, use :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>`:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool excludenode <Host ID of the node>
|
||||
|
||||
|
||||
.. _removenode-ignore-dead-nodes:
|
||||
|
||||
|
||||
@@ -18,84 +18,93 @@ Example output:
|
||||
|
||||
Datacenter: datacenter1
|
||||
=======================
|
||||
Status=Up/Down
|
||||
Status=Up/Down/eXcluded
|
||||
|/ State=Normal/Leaving/Joining/Moving
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 127.0.0.1 394.97 MB 256 33.4% 292a6c7f-2063-484c-b54d-9015216f1750 rack1
|
||||
UN 127.0.0.2 151.07 MB 256 34.3% 102b6ecd-2081-4073-8172-bf818c35e27b rack1
|
||||
UN 127.0.0.3 249.07 MB 256 32.3% 20db6ecd-2981-447s-l172-jf118c17o27y rack1
|
||||
XN 127.0.0.4 149.07 MB 256 32.3% dd961642-c7c6-4962-9f5a-ea774dbaed77 rack1
|
||||
|
||||
+----------+---------------------------------------+
|
||||
|Parameter |Description |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+==========+=======================================+
|
||||
|Datacenter|The data center that holds |
|
||||
| |the information. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Status |``U`` - The node is up. |
|
||||
| | |
|
||||
| |``D`` - The node is down. |
|
||||
+----------+---------------------------------------+
|
||||
|State |``N`` - Normal |
|
||||
| | |
|
||||
| |``L`` - Leaving |
|
||||
| | |
|
||||
| |``J`` - Joining |
|
||||
| | |
|
||||
| |``M`` - Moving |
|
||||
+----------+---------------------------------------+
|
||||
|Address |The IP address of the node. |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Load |The size on disk the ScyllaDB data |
|
||||
| | takes up (updates every 60 seconds). |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Tokens |The number of tokens per node. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Owns |The percentage of data owned by |
|
||||
| |the node (per datacenter) multiplied by|
|
||||
| |the replication factor you are using. |
|
||||
| | |
|
||||
| |For example, if the node owns 25% of |
|
||||
| |the data and the replication factor |
|
||||
| |is 4, the value will equal 100%. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Host ID |The unique identifier (UUID) |
|
||||
| |automatically assigned to the node. |
|
||||
| | |
|
||||
+----------+---------------------------------------+
|
||||
|Rack |The name of the rack. |
|
||||
+----------+---------------------------------------+
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Parameter |Description |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+==========+===============================================================+
|
||||
|Datacenter|The data center that holds |
|
||||
| |the information. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Status |``U`` - The node is up. |
|
||||
| | |
|
||||
| |``D`` - The node is down. |
|
||||
| | |
|
||||
| |``X`` - The node is :ref:`excluded <status-excluded>`. |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|State |``N`` - Normal |
|
||||
| | |
|
||||
| |``L`` - Leaving |
|
||||
| | |
|
||||
| |``J`` - Joining |
|
||||
| | |
|
||||
| |``M`` - Moving |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Address |The IP address of the node. |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Load |The size on disk the ScyllaDB data |
|
||||
| | takes up (updates every 60 seconds). |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Tokens |The number of tokens per node. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Owns |The percentage of data owned by |
|
||||
| |the node (per datacenter) multiplied by |
|
||||
| |the replication factor you are using. |
|
||||
| | |
|
||||
| |For example, if the node owns 25% of |
|
||||
| |the data and the replication factor |
|
||||
| |is 4, the value will equal 100%. |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Host ID |The unique identifier (UUID) |
|
||||
| |automatically assigned to the node. |
|
||||
| | |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|Rack |The name of the rack. |
|
||||
+----------+---------------------------------------------------------------+
|
||||
|
||||
.. _status-excluded:
|
||||
|
||||
Nodes in the excluded status (``X``) are down nodes which were marked as excluded
|
||||
by ``removenode``, ``excludenode``` or node replace, and means that they are considered permanently lost.
|
||||
See :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>` for more information.
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -30,6 +30,7 @@ Nodetool
|
||||
nodetool-commands/enablebackup
|
||||
nodetool-commands/enablebinary
|
||||
nodetool-commands/enablegossip
|
||||
nodetool-commands/excludenode
|
||||
nodetool-commands/flush
|
||||
nodetool-commands/getcompactionthroughput
|
||||
nodetool-commands/getendpoints
|
||||
@@ -104,6 +105,7 @@ Operations that are not listed below are currently not available.
|
||||
* :doc:`enablebackup </operating-scylla/nodetool-commands/enablebackup/>` - Enable incremental backup.
|
||||
* :doc:`enablebinary </operating-scylla/nodetool-commands/enablebinary/>` - Re-enable native transport (binary protocol).
|
||||
* :doc:`enablegossip </operating-scylla/nodetool-commands/enablegossip/>` - Re-enable gossip.
|
||||
* :doc:`excludenode </operating-scylla/nodetool-commands/excludenode/>`- Mark nodes as permanently down.
|
||||
* :doc:`flush </operating-scylla/nodetool-commands/flush/>` - Flush one or more column families.
|
||||
* :doc:`getcompactionthroughput </operating-scylla/nodetool-commands/getcompactionthroughput>` - Print the throughput cap for compaction in the system
|
||||
* :doc:`getendpoints <nodetool-commands/getendpoints/>` :code:`<keyspace>` :code:`<table>` :code:`<key>`- Print the end points that owns the key.
|
||||
|
||||
@@ -159,6 +159,7 @@ public:
|
||||
gms::feature workload_prioritization { *this, "WORKLOAD_PRIORITIZATION"sv };
|
||||
gms::feature colocated_tablets { *this, "COLOCATED_TABLETS"sv };
|
||||
gms::feature cdc_with_tablets { *this, "CDC_WITH_TABLETS"sv };
|
||||
gms::feature counters_with_tablets { *this, "COUNTERS_WITH_TABLETS"sv };
|
||||
gms::feature file_stream { *this, "FILE_STREAM"sv };
|
||||
gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv };
|
||||
gms::feature tablet_options { *this, "TABLET_OPTIONS"sv };
|
||||
|
||||
@@ -37,7 +37,9 @@ struct tablet_load_stats final {
|
||||
// Sum of all tablet sizes on a node and available disk space.
|
||||
uint64_t effective_capacity;
|
||||
|
||||
std::unordered_map<locator::range_based_tablet_id, uint64_t> tablet_sizes;
|
||||
// Contains tablet sizes per table. The token ranges must be in the form
|
||||
// (a, b] and only such ranges are allowed
|
||||
std::unordered_map<::table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;
|
||||
};
|
||||
|
||||
struct load_stats {
|
||||
|
||||
@@ -90,60 +90,53 @@ future<sstring> ec2_snitch::aws_api_call(sstring addr, uint16_t port, sstring cm
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::aws_api_call_once(sstring addr, uint16_t port, sstring cmd, std::optional<sstring> token) {
|
||||
return connect(socket_address(inet_address{addr}, port))
|
||||
.then([this, addr, cmd, token] (connected_socket fd) {
|
||||
_sd = std::move(fd);
|
||||
_in = _sd.input();
|
||||
_out = _sd.output();
|
||||
connected_socket fd = co_await connect(socket_address(inet_address{addr}, port));
|
||||
auto in = fd.input();
|
||||
auto out = fd.output();
|
||||
|
||||
if (token) {
|
||||
_req = sstring("GET ") + cmd +
|
||||
sstring(" HTTP/1.1\r\nHost: ") +addr +
|
||||
sstring("\r\nX-aws-ec2-metadata-token: ") + *token +
|
||||
sstring("\r\n\r\n");
|
||||
} else {
|
||||
_req = sstring("PUT ") + cmd +
|
||||
sstring(" HTTP/1.1\r\nHost: ") + addr +
|
||||
sstring("\r\nX-aws-ec2-metadata-token-ttl-seconds: 60") +
|
||||
sstring("\r\n\r\n");
|
||||
}
|
||||
if (token) {
|
||||
_req = sstring("GET ") + cmd +
|
||||
sstring(" HTTP/1.1\r\nHost: ") +addr +
|
||||
sstring("\r\nX-aws-ec2-metadata-token: ") + *token +
|
||||
sstring("\r\n\r\n");
|
||||
} else {
|
||||
_req = sstring("PUT ") + cmd +
|
||||
sstring(" HTTP/1.1\r\nHost: ") + addr +
|
||||
sstring("\r\nX-aws-ec2-metadata-token-ttl-seconds: 60") +
|
||||
sstring("\r\n\r\n");
|
||||
}
|
||||
|
||||
return _out.write(_req.c_str()).then([this] {
|
||||
return _out.flush();
|
||||
});
|
||||
}).then([this] {
|
||||
_parser.init();
|
||||
return _in.consume(_parser).then([this] {
|
||||
if (_parser.eof()) {
|
||||
return make_exception_future<sstring>("Bad HTTP response");
|
||||
}
|
||||
co_await out.write(_req.c_str());
|
||||
co_await out.flush();
|
||||
|
||||
// Read HTTP response header first
|
||||
auto _rsp = _parser.get_parsed_response();
|
||||
auto rc = _rsp->_status;
|
||||
// Verify EC2 instance metadata access
|
||||
if (rc == http::reply::status_type(403)) {
|
||||
return make_exception_future<sstring>(std::runtime_error("Error: Unauthorized response received when trying to communicate with instance metadata service."));
|
||||
}
|
||||
if (_rsp->_status != http::reply::status_type::ok) {
|
||||
return make_exception_future<sstring>(std::runtime_error(format("Error: HTTP response status {}", _rsp->_status)));
|
||||
}
|
||||
_parser.init();
|
||||
co_await in.consume(_parser);
|
||||
if (_parser.eof()) {
|
||||
co_await coroutine::return_exception(std::runtime_error("Bad HTTP response"));
|
||||
}
|
||||
|
||||
auto it = _rsp->_headers.find("Content-Length");
|
||||
if (it == _rsp->_headers.end()) {
|
||||
return make_exception_future<sstring>("Error: HTTP response does not contain: Content-Length\n");
|
||||
}
|
||||
// Read HTTP response header first
|
||||
auto _rsp = _parser.get_parsed_response();
|
||||
auto rc = _rsp->_status;
|
||||
// Verify EC2 instance metadata access
|
||||
if (rc == http::reply::status_type(403)) {
|
||||
co_await coroutine::return_exception(std::runtime_error("Error: Unauthorized response received when trying to communicate with instance metadata service."));
|
||||
}
|
||||
if (_rsp->_status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(std::runtime_error(format("Error: HTTP response status {}", _rsp->_status)));
|
||||
}
|
||||
|
||||
auto content_len = std::stoi(it->second);
|
||||
auto it = _rsp->_headers.find("Content-Length");
|
||||
if (it == _rsp->_headers.end()) {
|
||||
co_await coroutine::return_exception(std::runtime_error("Error: HTTP response does not contain: Content-Length\n"));
|
||||
}
|
||||
|
||||
// Read HTTP response body
|
||||
return _in.read_exactly(content_len).then([] (temporary_buffer<char> buf) {
|
||||
sstring res(buf.get(), buf.size());
|
||||
auto content_len = std::stoi(it->second);
|
||||
|
||||
return make_ready_future<sstring>(std::move(res));
|
||||
});
|
||||
});
|
||||
});
|
||||
// Read HTTP response body
|
||||
temporary_buffer<char> buf = co_await in.read_exactly(content_len);
|
||||
sstring res(buf.get(), buf.size());
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::read_property_file() {
|
||||
|
||||
@@ -30,9 +30,6 @@ protected:
|
||||
future<sstring> aws_api_call(sstring addr, uint16_t port, const sstring cmd, std::optional<sstring> token);
|
||||
future<sstring> read_property_file();
|
||||
private:
|
||||
connected_socket _sd;
|
||||
input_stream<char> _in;
|
||||
output_stream<char> _out;
|
||||
http_response_parser _parser;
|
||||
sstring _req;
|
||||
exponential_backoff_retry _ec2_api_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(2560));
|
||||
|
||||
@@ -865,9 +865,11 @@ table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexce
|
||||
|
||||
uint64_t tablet_load_stats::add_tablet_sizes(const tablet_load_stats& tls) {
|
||||
uint64_t table_sizes_sum = 0;
|
||||
for (auto& [rb_tid, tablet_size] : tls.tablet_sizes) {
|
||||
tablet_sizes[rb_tid] = tablet_size;
|
||||
table_sizes_sum += tablet_size;
|
||||
for (auto& [table, sizes] : tls.tablet_sizes) {
|
||||
for (auto& [range, tablet_size] : sizes) {
|
||||
tablet_sizes[table][range] = tablet_size;
|
||||
table_sizes_sum += tablet_size;
|
||||
}
|
||||
}
|
||||
return table_sizes_sum;
|
||||
}
|
||||
@@ -894,16 +896,87 @@ load_stats& load_stats::operator+=(const load_stats& s) {
|
||||
}
|
||||
|
||||
std::optional<uint64_t> load_stats::get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const {
|
||||
if (auto node_i = tablet_stats.find(host); node_i != tablet_stats.end()) {
|
||||
const tablet_load_stats& tls = node_i->second;
|
||||
if (auto ts_i = tls.tablet_sizes.find(rb_tid); ts_i != tls.tablet_sizes.end()) {
|
||||
return ts_i->second;
|
||||
if (auto host_i = tablet_stats.find(host); host_i != tablet_stats.end()) {
|
||||
auto& sizes_per_table = host_i->second.tablet_sizes;
|
||||
if (auto table_i = sizes_per_table.find(rb_tid.table); table_i != sizes_per_table.end()) {
|
||||
auto& tablet_sizes = table_i->second;
|
||||
if (auto size_i = tablet_sizes.find(rb_tid.range); size_i != tablet_sizes.end()) {
|
||||
return size_i->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
tablet_logger.debug("Unable to find tablet size on host: {} for tablet: {}", host, rb_tid);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
lw_shared_ptr<load_stats> load_stats::reconcile_tablets_resize(const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const {
|
||||
lw_shared_ptr<load_stats> reconciled_stats { make_lw_shared<load_stats>(*this) };
|
||||
load_stats& new_stats = *reconciled_stats;
|
||||
|
||||
for (table_id table : tables) {
|
||||
if (!new_tm.tablets().has_tablet_map(table)) {
|
||||
// Table has been dropped, remove it from stats
|
||||
for (auto& [host, tls] : new_stats.tablet_stats) {
|
||||
tls.tablet_sizes.erase(table);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const auto& old_tmap = old_tm.tablets().get_tablet_map(table);
|
||||
const auto& new_tmap = new_tm.tablets().get_tablet_map(table);
|
||||
size_t old_tablet_count = old_tmap.tablet_count();
|
||||
size_t new_tablet_count = new_tmap.tablet_count();
|
||||
if (old_tablet_count == new_tablet_count * 2) {
|
||||
// Reconcile for merge
|
||||
for (size_t i = 0; i < old_tablet_count; i += 2) {
|
||||
range_based_tablet_id rb_tid1 { table, old_tmap.get_token_range(tablet_id(i)) };
|
||||
range_based_tablet_id rb_tid2 { table, old_tmap.get_token_range(tablet_id(i + 1)) };
|
||||
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
|
||||
for (auto& replica : tinfo.replicas) {
|
||||
auto tablet_size_opt1 = new_stats.get_tablet_size(replica.host, rb_tid1);
|
||||
auto tablet_size_opt2 = new_stats.get_tablet_size(replica.host, rb_tid2);
|
||||
if (!tablet_size_opt1 || !tablet_size_opt2) {
|
||||
if (!tablet_size_opt1) {
|
||||
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid1, replica.host);
|
||||
}
|
||||
if (!tablet_size_opt2) {
|
||||
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid2, replica.host);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
dht::token_range new_range { new_tmap.get_token_range(tablet_id(i / 2)) };
|
||||
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
|
||||
uint64_t merged_tablet_size = *tablet_size_opt1 + *tablet_size_opt2;
|
||||
sizes_for_table[new_range] = merged_tablet_size;
|
||||
sizes_for_table.erase(rb_tid1.range);
|
||||
sizes_for_table.erase(rb_tid2.range);
|
||||
}
|
||||
}
|
||||
} else if (old_tablet_count == new_tablet_count / 2) {
|
||||
// Reconcile for split
|
||||
for (size_t i = 0; i < old_tablet_count; i++) {
|
||||
range_based_tablet_id rb_tid { table, old_tmap.get_token_range(tablet_id(i)) };
|
||||
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
|
||||
for (auto& replica : tinfo.replicas) {
|
||||
auto tablet_size_opt = new_stats.get_tablet_size(replica.host, rb_tid);
|
||||
if (!tablet_size_opt) {
|
||||
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid, replica.host);
|
||||
return nullptr;
|
||||
}
|
||||
dht::token_range new_range1 { new_tmap.get_token_range(tablet_id(i * 2)) };
|
||||
dht::token_range new_range2 { new_tmap.get_token_range(tablet_id(i * 2 + 1)) };
|
||||
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
|
||||
uint64_t split_tablet_size = *tablet_size_opt / 2;
|
||||
sizes_for_table[new_range1] = split_tablet_size;
|
||||
sizes_for_table[new_range2] = split_tablet_size;
|
||||
sizes_for_table.erase(rb_tid.range);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reconciled_stats;
|
||||
}
|
||||
|
||||
tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
|
||||
: _schema(std::move(schema))
|
||||
, _ranges(ranges)
|
||||
|
||||
@@ -66,6 +66,9 @@ struct global_tablet_id {
|
||||
|
||||
struct range_based_tablet_id {
|
||||
table_id table;
|
||||
|
||||
// This represents the token range of the tablet in the form (a, b]
|
||||
// and only such ranges are allowed
|
||||
dht::token_range range;
|
||||
|
||||
bool operator==(const range_based_tablet_id&) const = default;
|
||||
@@ -445,7 +448,9 @@ struct tablet_load_stats {
|
||||
// Sum of all tablet sizes on a node and available disk space.
|
||||
uint64_t effective_capacity = 0;
|
||||
|
||||
std::unordered_map<range_based_tablet_id, uint64_t> tablet_sizes;
|
||||
// Contains tablet sizes per table.
|
||||
// The token ranges must be in the form (a, b] and only such ranges are allowed
|
||||
std::unordered_map<table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;
|
||||
|
||||
// returns the aggregated size of all the tablets added
|
||||
uint64_t add_tablet_sizes(const tablet_load_stats& tls);
|
||||
@@ -479,6 +484,12 @@ struct load_stats {
|
||||
}
|
||||
|
||||
std::optional<uint64_t> get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const;
|
||||
|
||||
// Modifies the tablet sizes in load_stats for the given table after a split or merge. The old_tm argument has
|
||||
// to contain the token_metadata pre-resize. The function returns load_stats with tablet token ranges
|
||||
// corresponding to the post-resize tablet_map.
|
||||
// In case any pre-resize tablet replica is not found, the function returns nullptr
|
||||
lw_shared_ptr<load_stats> reconcile_tablets_resize(const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const;
|
||||
};
|
||||
|
||||
using load_stats_v2 = load_stats;
|
||||
|
||||
43
main.cc
43
main.cc
@@ -2221,12 +2221,47 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
//
|
||||
// Also, if the dictionary compression feature is not enabled, use
|
||||
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
|
||||
|
||||
gms::feature::listener_registration reg_listener;
|
||||
|
||||
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
|
||||
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
|
||||
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
|
||||
}
|
||||
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
|
||||
compression_parameters original_params{sstable_compression_options().get_options()};
|
||||
auto params = sstable_compression_options().get_options();
|
||||
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
|
||||
// Register a callback to update the default compression algorithm when the feature is enabled.
|
||||
// Precondition:
|
||||
// The callback must run inside seastar::async context:
|
||||
// - If the listener fires immediately, we are running inside seastar::async already.
|
||||
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
|
||||
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(params, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
|
||||
cfg->sstable_compression_user_table_options().validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
|
||||
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
|
||||
} catch (const std::exception& e) {
|
||||
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
|
||||
throw bad_configuration_error();
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
DROP KEYSPACE IF EXISTS counters;
|
||||
|
||||
-- FIXME: use tablets after https://github.com/scylladb/scylladb/issues/18180 is done.
|
||||
CREATE KEYSPACE IF NOT EXISTS counters WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'} AND TABLETS = {'enabled': false};
|
||||
CREATE KEYSPACE IF NOT EXISTS counters WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'};
|
||||
|
||||
CREATE TABLE IF NOT EXISTS counters.counter1 (key blob PRIMARY KEY, "C0" counter, "C1" counter, "C2" counter, "C3" counter, "C4" counter);
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:a9742362bc16ca16e9f962af993d6df7d6c4301182528d2882d50ec01b27b043
|
||||
size 6314928
|
||||
oid sha256:012ccbeb5c93878bf260f751ff55faa723f235ee796dc7e31c4e14c1bcc0efae
|
||||
size 6408088
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:3c9d0a4c8289a7edf8ffb58d23cd71b686d98730ab1ac75921ac3a2d533eb66a
|
||||
size 6325416
|
||||
oid sha256:acb4310f476a7dac4a645ae6babae22af2541c37ed368eac666ff3bd24f1a56a
|
||||
size 6406700
|
||||
|
||||
@@ -196,6 +196,7 @@ struct row_level_repair_metrics {
|
||||
uint64_t rx_hashes_nr{0};
|
||||
uint64_t inc_sst_skipped_bytes{0};
|
||||
uint64_t inc_sst_read_bytes{0};
|
||||
uint64_t tablet_time_ms{0};
|
||||
row_level_repair_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
_metrics.add_group("repair", {
|
||||
@@ -219,6 +220,8 @@ struct row_level_repair_metrics {
|
||||
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
|
||||
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
|
||||
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
|
||||
sm::make_counter("tablet_time_ms", tablet_time_ms,
|
||||
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -3477,7 +3480,16 @@ future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
service::frozen_topology_guard topo_guard) {
|
||||
auto start_time = flush_time;
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
|
||||
co_return co_await repair.run();
|
||||
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
|
||||
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
|
||||
auto t = std::chrono::steady_clock::now();
|
||||
auto update_time = seastar::defer([&] {
|
||||
if (is_tablet && !is_tablet_rebuild) {
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
|
||||
_metrics.tablet_time_ms += duration.count();
|
||||
}
|
||||
});
|
||||
co_await repair.run();
|
||||
}
|
||||
|
||||
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {
|
||||
|
||||
@@ -483,13 +483,14 @@ locator::static_effective_replication_map_ptr keyspace::get_static_effective_rep
|
||||
} // namespace replica
|
||||
|
||||
void backlog_controller::adjust() {
|
||||
// Compute and update the backlog even when static shares are set to
|
||||
// ensure that the backlog metrics reflect the current state.
|
||||
auto backlog = _current_backlog();
|
||||
if (controller_disabled()) {
|
||||
update_controller(_static_shares);
|
||||
return;
|
||||
}
|
||||
|
||||
auto backlog = _current_backlog();
|
||||
|
||||
if (backlog >= _control_points.back().input) {
|
||||
update_controller(_control_points.back().output);
|
||||
return;
|
||||
@@ -510,7 +511,7 @@ void backlog_controller::adjust() {
|
||||
|
||||
float backlog_controller::backlog_of_shares(float shares) const {
|
||||
size_t idx = 1;
|
||||
if (controller_disabled() || _control_points.size() == 0) {
|
||||
if (_control_points.size() == 0) {
|
||||
return 1.0f;
|
||||
}
|
||||
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
|
||||
@@ -1896,12 +1897,7 @@ std::ostream& operator<<(std::ostream& out, const database& db) {
|
||||
return out;
|
||||
}
|
||||
|
||||
future<mutation> database::do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema,
|
||||
db::timeout_clock::time_point timeout,tracing::trace_state_ptr trace_state) {
|
||||
auto m = fm.unfreeze(m_schema);
|
||||
m.upgrade(cf.schema());
|
||||
|
||||
// prepare partition slice
|
||||
static query::partition_slice partition_slice_for_counter_update(const mutation& m) {
|
||||
query::column_id_vector static_columns;
|
||||
static_columns.reserve(m.partition().static_row().size());
|
||||
m.partition().static_row().for_each_cell([&] (auto id, auto&&) {
|
||||
@@ -1924,20 +1920,18 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
regular_columns.erase(std::unique(regular_columns.begin(), regular_columns.end()),
|
||||
regular_columns.end());
|
||||
|
||||
auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns),
|
||||
return query::partition_slice(std::move(cr_ranges), std::move(static_columns),
|
||||
std::move(regular_columns), { }, { }, query::max_rows);
|
||||
}
|
||||
|
||||
auto op = cf.write_in_progress();
|
||||
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
auto locks = co_await cf.lock_counter_cells(m, timeout);
|
||||
|
||||
future<mutation> database::read_and_transform_counter_mutation_to_shards(mutation m, column_family& cf, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
|
||||
// Before counter update is applied it needs to be transformed from
|
||||
// deltas to counter shards. To do that, we need to read the current
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(cf.schema(), "counter-read-before-write", timeout, trace_state);
|
||||
auto slice = partition_slice_for_counter_update(m);
|
||||
auto mopt = co_await counter_write_query(cf.schema(), cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state);
|
||||
|
||||
if (utils::get_local_injector().enter("apply_counter_update_delay_100ms")) {
|
||||
@@ -1948,14 +1942,8 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// cells we can look for our shard in each of them, increment
|
||||
// its clock and apply the delta.
|
||||
transform_counter_updates_to_shards(m, mopt ? &*mopt : nullptr, cf.failed_counter_applies_to_memtable(), get_token_metadata().get_my_id());
|
||||
tracing::trace(trace_state, "Applying counter update");
|
||||
co_await apply_with_commitlog(cf, m, timeout);
|
||||
|
||||
if (utils::get_local_injector().enter("apply_counter_update_delay_5s")) {
|
||||
co_await seastar::sleep(std::chrono::seconds(5));
|
||||
}
|
||||
|
||||
co_return m;
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
max_purgeable memtable_list::get_max_purgeable(const dht::decorated_key& dk, is_shadowable is, api::timestamp_type max_seen_timestamp) const noexcept {
|
||||
@@ -2050,29 +2038,70 @@ future<> database::apply_in_memory(const mutation& m, column_family& cf, db::rp_
|
||||
return cf.apply(m, std::move(h), timeout);
|
||||
}
|
||||
|
||||
future<mutation> database::apply_counter_update(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
|
||||
future<counter_update_guard> database::acquire_counter_locks(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
|
||||
auto& cf = find_column_family(fm.column_family_id());
|
||||
|
||||
auto m = fm.unfreeze(s);
|
||||
m.upgrade(cf.schema());
|
||||
|
||||
auto op = cf.write_in_progress();
|
||||
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
|
||||
return do_with(std::move(m), [this, &cf, op = std::move(op), timeout] (mutation& m) mutable {
|
||||
return update_write_metrics_if_failed([&m, &cf, op = std::move(op), timeout] mutable -> future<counter_update_guard> {
|
||||
return cf.lock_counter_cells(m, timeout).then([op = std::move(op)] (std::vector<locked_cell> locks) mutable {
|
||||
return counter_update_guard{std::move(op), std::move(locks)};
|
||||
});
|
||||
}());
|
||||
});
|
||||
}
|
||||
|
||||
future<mutation> database::prepare_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
|
||||
if (timeout <= db::timeout_clock::now() || utils::get_local_injector().is_enabled("database_apply_counter_update_force_timeout")) {
|
||||
update_write_metrics_for_timed_out_write();
|
||||
return make_exception_future<mutation>(timed_out_error{});
|
||||
}
|
||||
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
auto& cf = find_column_family(fm.column_family_id());
|
||||
if (is_in_critical_disk_utilization_mode() && cf.is_eligible_to_write_rejection_on_critical_disk_utilization()) {
|
||||
update_write_metrics_for_rejected_writes();
|
||||
return make_exception_future<mutation>(replica::critical_disk_utilization_exception{"rejected counter update mutation"});
|
||||
}
|
||||
return update_write_metrics(seastar::futurize_invoke([&] {
|
||||
if (!s->is_synced()) {
|
||||
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
|
||||
auto m = fm.unfreeze(s);
|
||||
m.upgrade(cf.schema());
|
||||
|
||||
return update_write_metrics_if_failed(
|
||||
read_and_transform_counter_mutation_to_shards(std::move(m), cf, std::move(trace_state), timeout));
|
||||
}
|
||||
|
||||
future<> database::apply_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
|
||||
auto& cf = find_column_family(fm.column_family_id());
|
||||
|
||||
auto m = fm.unfreeze(s);
|
||||
m.upgrade(cf.schema());
|
||||
|
||||
tracing::trace(trace_state, "Applying counter update");
|
||||
auto f = co_await coroutine::as_future(update_write_metrics(seastar::futurize_invoke([&] {
|
||||
if (!s->is_synced()) {
|
||||
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
try {
|
||||
return apply_with_commitlog(cf, m, timeout);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
throw;
|
||||
}
|
||||
})));
|
||||
if (f.failed()) {
|
||||
co_await coroutine::return_exception_ptr(f.get_exception());
|
||||
}
|
||||
try {
|
||||
return do_apply_counter_update(cf, m, s, timeout, std::move(trace_state));
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
throw;
|
||||
|
||||
if (utils::get_local_injector().enter("apply_counter_update_delay_5s")) {
|
||||
co_await seastar::sleep(std::chrono::seconds(5));
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// #9919 etc. The initiative to wrap exceptions here
|
||||
@@ -2309,6 +2338,24 @@ Future database::update_write_metrics(Future&& f) {
|
||||
});
|
||||
}
|
||||
|
||||
template<typename Future>
|
||||
Future database::update_write_metrics_if_failed(Future&& f) {
|
||||
return f.then_wrapped([s = _stats] (auto f) {
|
||||
if (f.failed()) {
|
||||
++s->total_writes;
|
||||
++s->total_writes_failed;
|
||||
auto ep = f.get_exception();
|
||||
if (is_timeout_exception(ep)) {
|
||||
++s->total_writes_timedout;
|
||||
} else if (try_catch<replica::rate_limit_exception>(ep)) {
|
||||
++s->total_writes_rate_limited;
|
||||
}
|
||||
return futurize<Future>::make_exception_future(std::move(ep));
|
||||
}
|
||||
return f;
|
||||
});
|
||||
}
|
||||
|
||||
void database::update_write_metrics_for_timed_out_write() {
|
||||
++_stats->total_writes;
|
||||
++_stats->total_writes_failed;
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "types/user.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "cell_locking.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include <chrono>
|
||||
@@ -1500,6 +1501,11 @@ struct string_pair_eq {
|
||||
bool operator()(spair lhs, spair rhs) const;
|
||||
};
|
||||
|
||||
struct counter_update_guard {
|
||||
utils::phased_barrier::operation op;
|
||||
std::vector<locked_cell> locks;
|
||||
};
|
||||
|
||||
class db_user_types_storage;
|
||||
|
||||
// Policy for sharded<database>:
|
||||
@@ -1728,11 +1734,12 @@ private:
|
||||
future<> do_apply_many(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
|
||||
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
|
||||
|
||||
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
|
||||
tracing::trace_state_ptr trace_state);
|
||||
future<mutation> read_and_transform_counter_mutation_to_shards(mutation m, column_family& cf, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
|
||||
|
||||
template<typename Future>
|
||||
Future update_write_metrics(Future&& f);
|
||||
template<typename Future>
|
||||
Future update_write_metrics_if_failed(Future&& f);
|
||||
void update_write_metrics_for_timed_out_write();
|
||||
void update_write_metrics_for_rejected_writes();
|
||||
future<std::unique_ptr<keyspace>> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, const locator::token_metadata_ptr& token_metadata, system_keyspace system);
|
||||
@@ -1912,7 +1919,11 @@ public:
|
||||
// Mutations may be partially visible to reads until restart on exception (FIXME).
|
||||
future<> apply(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
|
||||
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
|
||||
future<counter_update_guard> acquire_counter_locks(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
future<mutation> prepare_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
future<> apply_counter_update(schema_ptr, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
|
||||
const sstring& get_snitch_name() const;
|
||||
/*!
|
||||
* \brief clear snapshot based on a tag
|
||||
|
||||
@@ -2811,8 +2811,10 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
|
||||
if (tablet_filter(*_tablet_map, gid)) {
|
||||
const uint64_t tablet_size = sg.live_disk_space_used();
|
||||
table_stats.size_in_bytes += tablet_size;
|
||||
const locator::range_based_tablet_id rb_tid {gid.table, _tablet_map->get_token_range(gid.tablet)};
|
||||
tablet_stats.tablet_sizes[rb_tid] = tablet_size;
|
||||
const dht::token_range trange = _tablet_map->get_token_range(gid.tablet);
|
||||
// Make sure the token range is in the form (a, b]
|
||||
SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());
|
||||
tablet_stats.tablet_sizes[gid.table][trange] = tablet_size;
|
||||
}
|
||||
});
|
||||
return locator::combined_load_stats{
|
||||
|
||||
@@ -192,9 +192,7 @@ tablet_map_to_mutations(const tablet_map& tablets, table_id id, const sstring& k
|
||||
m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts);
|
||||
}
|
||||
}
|
||||
if (auto next_tid = tablets.next_tablet(tid)) {
|
||||
tid = *next_tid;
|
||||
}
|
||||
tid = *tablets.next_tablet(tid);
|
||||
}
|
||||
co_await process_mutation(std::move(m));
|
||||
}
|
||||
@@ -577,7 +575,7 @@ void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hi
|
||||
|
||||
namespace {
|
||||
|
||||
std::optional<tablet_id> process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
|
||||
tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
|
||||
tablet_replica_set tablet_replicas;
|
||||
if (row.has("replicas")) {
|
||||
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
|
||||
@@ -661,7 +659,7 @@ std::optional<tablet_id> process_one_row(replica::database* db, table_id table,
|
||||
persisted_last_token, current_last_token, table, tid));
|
||||
}
|
||||
|
||||
return map.next_tablet(tid);
|
||||
return *map.next_tablet(tid);
|
||||
}
|
||||
|
||||
struct tablet_metadata_builder {
|
||||
@@ -716,9 +714,7 @@ struct tablet_metadata_builder {
|
||||
}
|
||||
|
||||
if (row.has("last_token")) {
|
||||
if (auto next_tid = process_one_row(db, current->table, current->map, current->tid, row)) {
|
||||
current->tid = *next_tid;
|
||||
}
|
||||
current->tid = process_one_row(db, current->table, current->map, current->tid, row);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -133,10 +133,11 @@ check_jenkins_job_status() {
|
||||
|
||||
lastCompletedJobName="$jenkins_url/job/$jenkins_job/lastCompletedBuild"
|
||||
getBuildResult=$(curl -s --user $JENKINS_USERNAME:$JENKINS_API_TOKEN $lastCompletedJobName/api/json?tree=result)
|
||||
if [[ "$getBuildResult" == "*Unauthorized*" ]]; then
|
||||
echo -e "${ORANGE}WARNING:${NC} Failed to authenticate with Jenkins. please check your JENKINS_USERNAME and JENKINS_API_TOKEN setting"
|
||||
if [[ $getBuildResult =~ (Access Denied|401 Unauthorized) ]]; then
|
||||
echo -e "${ORANGE}WARNING:${NC} Access Denied to $lastCompletedJobName. \nPlease check your JENKINS_USERNAME and JENKINS_API_TOKEN setting"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
lastCompleted=$(echo "$getBuildResult" | jq -r '.result')
|
||||
|
||||
if [[ "$lastCompleted" == "SUCCESS" ]]; then
|
||||
|
||||
@@ -66,19 +66,6 @@ future<paxos_state::replica_guard> paxos_state::get_replica_lock(const schema& s
|
||||
// Once the global barrier completes, no requests remain on the old shard,
|
||||
// so we can safely switch to acquiring locks only on the new shard.
|
||||
auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token);
|
||||
|
||||
if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) {
|
||||
const auto& erm = s.table().get_effective_replication_map();
|
||||
const auto& rs = erm->get_replication_strategy();
|
||||
sstring tablet_map_desc;
|
||||
if (rs.uses_tablets()) {
|
||||
const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s.id());
|
||||
tablet_map_desc = ::format(", tablet id {}, tablet map {}",
|
||||
tablet_map.get_tablet_id(token), tablet_map);
|
||||
}
|
||||
on_internal_error(paxos_state::logger,
|
||||
format("invalid shard, shards {}, token {}{}", shards, token, tablet_map_desc));
|
||||
}
|
||||
std::ranges::sort(shards);
|
||||
|
||||
replica_guard replica_guard;
|
||||
|
||||
@@ -3582,32 +3582,60 @@ storage_proxy::mutate_counters_on_leader(utils::chunked_vector<frozen_mutation_a
|
||||
{
|
||||
auto& update_ms = mutations;
|
||||
co_await coroutine::parallel_for_each(update_ms, [&] (frozen_mutation_and_schema& fm_a_s) -> future<> {
|
||||
co_await mutate_counter_on_leader_and_replicate(fm_a_s.s, std::move(fm_a_s.fm), cl, timeout, trace_state, permit, fence, caller);
|
||||
auto erm = _db.local().find_column_family(fm_a_s.s).get_effective_replication_map();
|
||||
auto shard = erm->get_sharder(*fm_a_s.s).shard_for_reads(fm_a_s.fm.token(*fm_a_s.s));
|
||||
bool local = shard == this_shard_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
|
||||
return container().invoke_on(shard, {_write_smp_service_group, timeout}, [gs = global_schema_ptr(fm_a_s.s), &fm = fm_a_s.fm, cl, timeout, gt = tracing::global_trace_state_ptr(trace_state), permit, local, fence, caller] (storage_proxy& sp) mutable -> future<> {
|
||||
auto p = local ? std::move(permit) : empty_service_permit(); // FIXME: either obtain a real permit on this shard or hold original one across shard
|
||||
return sp.mutate_counter_on_leader_and_replicate(gs, fm, cl, timeout, gt.get(), std::move(p), fence, caller);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation fm, db::consistency_level cl, clock_type::time_point timeout,
|
||||
storage_proxy::mutate_counter_on_leader_and_replicate(schema_ptr s, const frozen_mutation& fm, db::consistency_level cl, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit,
|
||||
fencing_token fence, locator::host_id caller) {
|
||||
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
|
||||
// FIXME: This does not handle intra-node tablet migration properly.
|
||||
// Refs https://github.com/scylladb/scylladb/issues/18180
|
||||
auto shard = erm->get_sharder(*s).shard_for_reads(fm.token(*s));
|
||||
bool local = shard == this_shard_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&proxy = container(), gs = global_schema_ptr(s), fm = std::move(fm), cl, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), permit = std::move(permit), local, fence, caller] (replica::database& db) {
|
||||
auto trace_state = gt.get();
|
||||
auto p = local ? std::move(permit) : /* FIXME: either obtain a real permit on this shard or hold original one across shard */ empty_service_permit();
|
||||
const auto& rs = gs.get()->table().get_effective_replication_map()->get_replication_strategy();
|
||||
return proxy.local().run_fenceable_write(rs, fence, caller,
|
||||
[&db, fm = std::move(fm), timeout, trace_state, gs = std::move(gs)]() mutable {
|
||||
return db.apply_counter_update(std::move(gs), fm, timeout, std::move(trace_state));
|
||||
})
|
||||
.then([&proxy, cl, timeout, trace_state, p = std::move(p)] (mutation m) mutable {
|
||||
return proxy.local().replicate_counter_from_leader(std::move(m), cl, std::move(trace_state), timeout, std::move(p));
|
||||
const auto& rs = s->table().get_effective_replication_map()->get_replication_strategy();
|
||||
|
||||
return run_fenceable_write(rs, fence, caller, [this, erm, s, &fm, timeout, trace_state] (this auto) -> future<mutation> {
|
||||
// lock the reading shards in sorted order.
|
||||
// usually there is only a single read shard, which is the current shard. we need to lock the
|
||||
// counter on this shard to protect the counter's read-modify-write operation against concurrent updates.
|
||||
// during intranode migration, the read shard switches, creating a phase where both shards
|
||||
// may receive updates concurrently for the same counter. therefore, we need to lock both shards.
|
||||
auto lock_shards = erm->shards_ready_for_reads(*s, fm.token(*s));
|
||||
std::ranges::sort(lock_shards);
|
||||
|
||||
using foreign_counter_guard = foreign_ptr<lw_shared_ptr<replica::counter_update_guard>>;
|
||||
utils::small_vector<foreign_counter_guard, 2> counter_locks;
|
||||
for (auto shard : lock_shards) {
|
||||
counter_locks.push_back(co_await _db.invoke_on(shard, {_write_smp_service_group, timeout},
|
||||
[s = global_schema_ptr(s), &fm, gtr = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) mutable -> future<foreign_counter_guard> {
|
||||
return db.acquire_counter_locks(s, fm, timeout, gtr.get()).then([] (replica::counter_update_guard g) {
|
||||
return make_foreign(make_lw_shared<replica::counter_update_guard>(std::move(g)));
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
// read the current counter value and transform the counter update mutation
|
||||
auto m = co_await _db.local().prepare_counter_update(s, fm, timeout, trace_state);
|
||||
|
||||
auto apply = [this, erm, &m, trace_state, timeout] (shard_id shard) {
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
|
||||
[s = global_schema_ptr(m.schema()), fm = freeze(m), gtr = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) mutable -> future<> {
|
||||
return db.apply_counter_update(s, std::move(fm), timeout, gtr.get());
|
||||
});
|
||||
};
|
||||
co_await apply_on_shards(erm, *s, m.token(), std::move(apply));
|
||||
|
||||
co_return std::move(m);
|
||||
}).then([this, cl, timeout, trace_state, permit = std::move(permit)] (mutation m) mutable {
|
||||
return replicate_counter_from_leader(std::move(m), cl, std::move(trace_state), timeout, std::move(permit));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -495,7 +495,7 @@ private:
|
||||
future<> mutate_counters_on_leader(utils::chunked_vector<frozen_mutation_and_schema> mutations, db::consistency_level cl, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit,
|
||||
fencing_token fence, locator::host_id caller);
|
||||
future<> mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation m, db::consistency_level cl, clock_type::time_point timeout,
|
||||
future<> mutate_counter_on_leader_and_replicate(schema_ptr s, const frozen_mutation& m, db::consistency_level cl, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit,
|
||||
fencing_token fence, locator::host_id caller);
|
||||
|
||||
|
||||
@@ -4293,6 +4293,36 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos
|
||||
rtlogger.info("Removenode succeeded. Request ID: {}", request_id);
|
||||
}
|
||||
|
||||
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
std::unordered_set<raft::server_id> raft_hosts;
|
||||
for (auto host : hosts) {
|
||||
if (_gossiper.is_alive(host)) {
|
||||
const std::string message = ::format("Cannot mark host {} as excluded because it's alive", host);
|
||||
rtlogger.warn("{}", message);
|
||||
throw std::runtime_error(message);
|
||||
}
|
||||
raft_hosts.insert(raft::server_id(host.uuid()));
|
||||
}
|
||||
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.add_ignored_nodes(raft_hosts);
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("Mark as excluded: {}", hosts));
|
||||
rtlogger.info("Marking nodes as excluded: {}, previous set: {}", hosts, _topology_state_machine._topology.ignored_nodes);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("mark_excluded: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
}
|
||||
rtlogger.info("Nodes marked as excluded: {}", hosts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
|
||||
return run_with_api_lock_in_gossiper_mode_only(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
|
||||
@@ -8236,6 +8266,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) {
|
||||
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
|
||||
}
|
||||
|
||||
bool storage_service::raft_topology_change_enabled() const {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
|
||||
}
|
||||
return _topology_change_kind_enabled == topology_change_kind::raft;
|
||||
}
|
||||
|
||||
bool storage_service::legacy_topology_change_enabled() const {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
|
||||
}
|
||||
return _topology_change_kind_enabled == topology_change_kind::legacy;
|
||||
}
|
||||
|
||||
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
|
||||
_protocol_servers.push_back(&server);
|
||||
if (start_instantly) {
|
||||
@@ -8251,7 +8295,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n
|
||||
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
|
||||
}
|
||||
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
|
||||
}
|
||||
|
||||
|
||||
@@ -390,7 +390,7 @@ public:
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
private:
|
||||
inet_address get_broadcast_address() const noexcept {
|
||||
@@ -785,6 +785,7 @@ public:
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
|
||||
future<> mark_excluded(const std::vector<locator::host_id>&);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
@@ -914,12 +915,8 @@ private:
|
||||
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
|
||||
|
||||
public:
|
||||
bool raft_topology_change_enabled() const {
|
||||
return _topology_change_kind_enabled == topology_change_kind::raft;
|
||||
}
|
||||
bool legacy_topology_change_enabled() const {
|
||||
return _topology_change_kind_enabled == topology_change_kind::legacy;
|
||||
}
|
||||
bool raft_topology_change_enabled() const;
|
||||
bool legacy_topology_change_enabled() const;
|
||||
|
||||
private:
|
||||
future<> _raft_state_monitor = make_ready_future<>();
|
||||
|
||||
@@ -1263,7 +1263,13 @@ public:
|
||||
}
|
||||
} else {
|
||||
for (auto rack : rf_in_dc->get_rack_list()) {
|
||||
auto shards = shards_per_rack.at(endpoint_dc_rack{dc, rack});
|
||||
size_t shards = 0;
|
||||
auto dc_rack = endpoint_dc_rack{dc, rack};
|
||||
if (!shards_per_rack.contains(dc_rack)) {
|
||||
lblogger.warn("No shards for rack {}, but table {}.{} replicates there", rack, s.ks_name(), s.cf_name());
|
||||
} else {
|
||||
shards = shards_per_rack.at(dc_rack);
|
||||
}
|
||||
size_t tablets_in_rack = std::ceil(min_per_shard_tablet_count * shards);
|
||||
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in rack {} ({} shards) in DC {}",
|
||||
tablets_in_rack, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), rack, shards, dc);
|
||||
|
||||
@@ -1511,10 +1511,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
|
||||
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
|
||||
}
|
||||
utils::get_local_injector().inject("stream_tablet_fail",
|
||||
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
|
||||
|
||||
if (action_failed(tablet_state.streaming)) {
|
||||
if (action_failed(tablet_state.streaming) || utils::get_local_injector().enter("stream_tablet_fail")) {
|
||||
const bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
|
||||
bool critical_disk_utilization = false;
|
||||
if (auto stats = _tablet_allocator.get_load_stats()) {
|
||||
@@ -1659,12 +1657,19 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_transition(last_token)
|
||||
.del_migration_task_info(last_token, _feature_service)
|
||||
.build());
|
||||
if (trinfo.pending_replica) {
|
||||
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *trinfo.pending_replica, last_token);
|
||||
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
|
||||
if (leaving_replica) {
|
||||
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *leaving_replica, last_token);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::end_migration: {
|
||||
// Move the tablet size in load_stats
|
||||
auto leaving = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
|
||||
auto pending = trinfo.pending_replica;
|
||||
const dht::token_range trange {tmap.get_token_range(gid.tablet)};
|
||||
migrate_tablet_size(leaving->host, pending->host, gid, trange);
|
||||
|
||||
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
|
||||
// See do_tablet_operation() doc.
|
||||
bool defer_transition = utils::get_local_injector().enter("handle_tablet_migration_end_migration");
|
||||
@@ -1884,6 +1889,39 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
|
||||
}
|
||||
|
||||
void migrate_tablet_size(locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) {
|
||||
auto has_tablet_size = [&] (const locator::load_stats& stats, locator::host_id host) {
|
||||
if (auto host_i = stats.tablet_stats.find(host); host_i != stats.tablet_stats.end()) {
|
||||
auto& tables = host_i->second.tablet_sizes;
|
||||
if (auto table_i = tables.find(gid.table); table_i != tables.find(gid.table)) {
|
||||
if (auto size_i = table_i->second.find(trange); size_i != table_i->second.find(trange)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
if (leaving != pending) {
|
||||
auto old_load_stats = _tablet_allocator.get_load_stats();
|
||||
if (old_load_stats) {
|
||||
const locator::load_stats& stats = *old_load_stats;
|
||||
if (has_tablet_size(stats, leaving) && !has_tablet_size(stats, pending)) {
|
||||
rtlogger.debug("Moving tablet size for tablet: {} from: {} to: {}", gid, leaving, pending);
|
||||
auto new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
|
||||
auto& new_leaving_ts = new_load_stats->tablet_stats.at(leaving);
|
||||
auto& new_pending_ts = new_load_stats->tablet_stats.at(pending);
|
||||
auto map_node = new_leaving_ts.tablet_sizes.at(gid.table).extract(trange);
|
||||
new_pending_ts.tablet_sizes[gid.table].insert(std::move(map_node));
|
||||
if (new_leaving_ts.tablet_sizes.at(gid.table).empty()) {
|
||||
new_leaving_ts.tablet_sizes.erase(gid.table);
|
||||
}
|
||||
_tablet_allocator.set_load_stats(std::move(new_load_stats));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> handle_tablet_resize_finalization(group0_guard g) {
|
||||
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
|
||||
rtlogger.info("handle_tablet_resize_finalization: waiting");
|
||||
@@ -1953,6 +1991,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
|
||||
|
||||
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
|
||||
guard = co_await start_operation();
|
||||
auto new_tm = get_token_metadata_ptr();
|
||||
auto reconciled_stats = old_load_stats->reconcile_tablets_resize(plan.resize_plan().finalize_resize, *tm, *new_tm);
|
||||
if (reconciled_stats) {
|
||||
_tablet_allocator.set_load_stats(reconciled_stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> handle_truncate_table(group0_guard guard) {
|
||||
|
||||
@@ -539,17 +539,13 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
|
||||
}
|
||||
}
|
||||
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled) const {
|
||||
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
|
||||
if (!dicts_enabled) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
|
||||
"all nodes are upgraded to a versions which supports it",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
if (!dicts_allowed) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
}
|
||||
if (_chunk_length) {
|
||||
auto chunk_length = _chunk_length.value();
|
||||
|
||||
@@ -106,8 +106,7 @@ public:
|
||||
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
|
||||
|
||||
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
|
||||
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
|
||||
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
|
||||
void validate(dicts_feature_enabled) const;
|
||||
|
||||
std::map<sstring, sstring> get_options() const;
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "sstables/random_access_reader.hh"
|
||||
#include "utils/disk-error-handler.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/fragmented_temporary_buffer.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -24,6 +25,15 @@ future <temporary_buffer<char>> random_access_reader::read_exactly(size_t n) noe
|
||||
}
|
||||
}
|
||||
|
||||
future<fragmented_temporary_buffer> random_access_reader::read_exactly_fragmented(size_t n) noexcept {
|
||||
try {
|
||||
fragmented_temporary_buffer::reader reader;
|
||||
return reader.read_exactly(*_in, n);
|
||||
} catch (...) {
|
||||
return current_exception_as_future<fragmented_temporary_buffer>();
|
||||
}
|
||||
}
|
||||
|
||||
static future<> close_if_needed(std::unique_ptr<input_stream<char>> in) {
|
||||
if (!in) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/fragmented_temporary_buffer.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -33,6 +34,8 @@ protected:
|
||||
public:
|
||||
future <temporary_buffer<char>> read_exactly(size_t n) noexcept;
|
||||
|
||||
future<fragmented_temporary_buffer> read_exactly_fragmented(size_t n) noexcept;
|
||||
|
||||
future<> seek(uint64_t pos) noexcept;
|
||||
|
||||
bool eof() const noexcept { return _in->eof(); }
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "utils/fragmented_temporary_buffer.hh"
|
||||
#include "data_dictionary/storage_options.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "writer.hh"
|
||||
@@ -275,9 +276,11 @@ future<> parse(const schema&, sstable_version_types, random_access_reader& in, T
|
||||
// All composite parsers must come after this
|
||||
template<typename First, typename... Rest>
|
||||
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, First& first, Rest&&... rest) {
|
||||
return parse(s, v, in, first).then([v, &s, &in, &rest...] {
|
||||
return parse(s, v, in, std::forward<Rest>(rest)...);
|
||||
});
|
||||
auto fut = parse(s, v, in, first);
|
||||
(..., (void)(fut = fut.then([&s, v, &in, &rest] () mutable {
|
||||
return parse(s, v, in, std::forward<Rest>(rest));
|
||||
})));
|
||||
return fut;
|
||||
}
|
||||
|
||||
// Intended to be used for a type that describes itself through describe_type().
|
||||
@@ -516,16 +519,26 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
|
||||
s.header.memory_size,
|
||||
s.header.sampling_level,
|
||||
s.header.size_at_full_sampling);
|
||||
auto buf = co_await in.read_exactly(s.header.size * sizeof(pos_type));
|
||||
auto len = s.header.size * sizeof(pos_type);
|
||||
check_buf_size(buf, len);
|
||||
|
||||
// Use fragmented buffer to avoid large contiguous allocations
|
||||
auto frag_buf = co_await in.read_exactly_fragmented(len);
|
||||
if (frag_buf.empty()) {
|
||||
throw bufsize_mismatch_exception(0, len);
|
||||
}
|
||||
if (frag_buf.size_bytes() != len) {
|
||||
throw bufsize_mismatch_exception(frag_buf.size_bytes(), len);
|
||||
}
|
||||
|
||||
// Positions are encoded in little-endian.
|
||||
auto b = buf.get();
|
||||
auto stream = frag_buf.get_istream();
|
||||
s.positions.reserve(s.header.size + 1);
|
||||
while (s.positions.size() != s.header.size) {
|
||||
s.positions.push_back(seastar::read_le<pos_type>(b));
|
||||
b += sizeof(pos_type);
|
||||
auto pos_result = stream.read<pos_type>();
|
||||
if (!pos_result) {
|
||||
std::rethrow_exception(pos_result.assume_error());
|
||||
}
|
||||
s.positions.push_back(seastar::le_to_cpu(*pos_result));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
// Since the keys in the index are not sized, we need to calculate
|
||||
@@ -2895,18 +2908,16 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
input_stream<char> data_stream;
|
||||
if (sst->get_compression()) {
|
||||
data_stream = co_await sst->data_stream(0, sst->ondisk_data_size(), permit,
|
||||
nullptr, nullptr, sstable::raw_stream::yes);
|
||||
} else {
|
||||
data_stream = co_await sst->data_stream(0, sst->data_size(), permit,
|
||||
input_stream<char> data_stream = co_await (sst->get_compression()
|
||||
? sst->data_stream(0, sst->ondisk_data_size(), permit,
|
||||
nullptr, nullptr, sstable::raw_stream::yes)
|
||||
: sst->data_stream(0, sst->data_size(), permit,
|
||||
nullptr, nullptr, sstable::raw_stream::no,
|
||||
integrity_check::yes, [&ret](sstring msg) {
|
||||
sstlog.error("{}", msg);
|
||||
ret.status = validate_checksums_status::invalid;
|
||||
});
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
auto valid = true;
|
||||
std::exception_ptr ex;
|
||||
|
||||
@@ -550,10 +550,13 @@ future<locator::effective_replication_map_ptr> sstables_loader::await_topology_q
|
||||
auto expected_topology_version = erm->get_token_metadata().get_version();
|
||||
auto& ss = _ss.local();
|
||||
|
||||
// The awaiting only works with raft enabled, and we only need it with tablets,
|
||||
// so let's bypass the awaiting when tablet is disabled.
|
||||
if (!t.uses_tablets()) {
|
||||
break;
|
||||
}
|
||||
// optimistically attempt to grab an erm on quiesced topology
|
||||
// The awaiting is only needed with tablet over raft, so we're bypassing the check
|
||||
// when raft is disabled.
|
||||
if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) {
|
||||
if (co_await ss.verify_topology_quiesced(expected_topology_version)) {
|
||||
break;
|
||||
}
|
||||
erm = nullptr;
|
||||
|
||||
273
test/alternator/test_logs.py
Normal file
273
test/alternator/test_logs.py
Normal file
@@ -0,0 +1,273 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
###############################################################################
|
||||
# Most Alternator tests should limit themselves to the DynamoDB API provided by
|
||||
# the AWS SDK, and should work when running against any DynamoDB-compatible
|
||||
# database, including DynamoDB and Scylla - the latter running locally or
|
||||
# remotely. In particular, tests should generally *not* attempt to look at
|
||||
# Scylla's log file - as this log file is not available at all when testing
|
||||
# a remote server.
|
||||
#
|
||||
# Nevertheless, in some cases we might want to verify that some specific
|
||||
# log messages do appear in Scylla's log file. These tests should be
|
||||
# concentrated in this source file. All these tests are skipped when the
|
||||
# test is not running against Scylla, or Scylla's log file cannot be
|
||||
# found - e.g., because Scylla is running on a remote machine, or configured
|
||||
# not to write to a log file.
|
||||
#
|
||||
# The code below automatically figures out where the log file is - when
|
||||
# Scylla is running locally. First the local Scylla process is detected
|
||||
# (it is the process listening to the our HTTP requests, if we can find
|
||||
# one), then its standard output is guessed to be the log file - and then
|
||||
# we verify that it really is.
|
||||
#############################################################################
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import io
|
||||
import time
|
||||
import re
|
||||
import urllib.parse
|
||||
from contextlib import contextmanager
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from .util import new_test_table, scylla_config_temporary
|
||||
from .test_cql_rbac import new_dynamodb, new_role
|
||||
|
||||
# Utility function for trying to find a local process which is listening to
|
||||
# the given local IP address and port. If such a process exists, return its
|
||||
# process id (as a string). Otherwise, return None. Note that the local
|
||||
# process needs to belong to the same user running this test, or it cannot
|
||||
# be found.
|
||||
def local_process_id(ip, port):
|
||||
# Implement something like the shell "lsof -Pni @{ip}:{port}", just
|
||||
# using /proc without any external shell command.
|
||||
# First, we look in /proc/net/tcp for a LISTEN socket (state 0x0A) at the
|
||||
# desired local address. The address is specially-formatted hex of the ip
|
||||
# and port, with 0100007F:2352 for 127.0.0.1:9042. We check for two
|
||||
# listening addresses: one is the specific IP address given, and the
|
||||
# other is listening on address 0 (INADDR_ANY).
|
||||
ip2hex = lambda ip: ''.join([f'{int(x):02X}' for x in reversed(ip.split('.'))])
|
||||
port2hex = lambda port: f'{int(port):04X}'
|
||||
try:
|
||||
addr1 = ip2hex(ip) + ':' + port2hex(port)
|
||||
except:
|
||||
return None
|
||||
addr2 = ip2hex('0.0.0.0') + ':' + port2hex(port)
|
||||
LISTEN = '0A'
|
||||
with open('/proc/net/tcp', 'r') as f:
|
||||
for line in f:
|
||||
cols = line.split()
|
||||
if cols[3] == LISTEN and (cols[1] == addr1 or cols[1] == addr2):
|
||||
inode = cols[9]
|
||||
break
|
||||
else:
|
||||
# Didn't find a process listening on the given address
|
||||
return None
|
||||
# Now look in /proc/*/fd/* for processes that have this socket "inode"
|
||||
# as one of its open files. We can only find a process that belongs to
|
||||
# the same user.
|
||||
target = f'socket:[{inode}]'
|
||||
for proc in os.listdir('/proc'):
|
||||
if not proc.isnumeric():
|
||||
continue
|
||||
dir = f'/proc/{proc}/fd/'
|
||||
try:
|
||||
for fd in os.listdir(dir):
|
||||
if os.readlink(dir + fd) == target:
|
||||
# Found the process!
|
||||
return proc
|
||||
except:
|
||||
# Ignore errors. We can't check processes we don't own.
|
||||
pass
|
||||
return None
|
||||
|
||||
# A fixture to find the Scylla log file, returning the log file's path.
|
||||
# If the log file cannot be found, or it's not Scylla, the fixture calls
|
||||
# pytest.skip() to skip any test which uses it. The fixture has module
|
||||
# scope, so looking for the log file only happens once. Individual tests
|
||||
# should use the function-scope fixture "logfile" below, which takes care
|
||||
# of opening the log file for reading in the right place.
|
||||
# We look for the log file by looking for a local process listening to the
|
||||
# given DynamoDB API connection, assuming its standard output is the log file,
|
||||
# and then verifying that this file is a proper Scylla log file.
|
||||
@pytest.fixture(scope="module")
|
||||
def logfile_path(dynamodb):
|
||||
# Split the endpoint URL into host and port part. If the port is not
|
||||
# explicitly specified, we need to assume it is the default http port
|
||||
# (80) or default https (443) port, depending on the scheme.
|
||||
endpoint_url = dynamodb.meta.client._endpoint.host
|
||||
p = urllib.parse.urlparse(endpoint_url)
|
||||
# If hostname is a string not an ip address, it's unlikely to be a local
|
||||
# process anyway... test/alternator/run uses an IP address.
|
||||
ip = p.hostname
|
||||
port = p.port
|
||||
if port is None:
|
||||
port = 443 if p.scheme == 'https' else 80
|
||||
pid = local_process_id(ip, port)
|
||||
if not pid:
|
||||
pytest.skip("Can't find local process")
|
||||
# Now that we know the process id, use /proc to find if its standard
|
||||
# output is redirected to a file. If it is, that's the log file. If it
|
||||
# isn't a file, we don't known where the user is writing the log...
|
||||
try:
|
||||
log = os.readlink(f'/proc/{pid}/fd/1')
|
||||
except:
|
||||
pytest.skip("Can't find local log file")
|
||||
# If the process's standard output is some pipe or device, it's
|
||||
# not the log file we were hoping for...
|
||||
if not log.startswith('/') or not os.path.isfile(log):
|
||||
pytest.skip("Can't find local log file")
|
||||
# Scylla can be configured to put the log in syslog, not in the standard
|
||||
# output. So let's verify that the file which we found actually looks
|
||||
# like a Scylla log and isn't just empty or something... The Scylla log
|
||||
# file always starts with the line: "Scylla version ... starting ..."
|
||||
with open(log, 'r') as f:
|
||||
head = f.read(7)
|
||||
if head != 'Scylla ':
|
||||
pytest.skip("Not a Scylla log file")
|
||||
yield log
|
||||
|
||||
# The "logfile" fixture returns the log file open for reading at the end.
|
||||
# Testing if something appears in the log usually involves taking this
|
||||
# fixture, and then checking with wait_for_log() for the desired message to
|
||||
# have appeared. Because each logfile fixture opens the log file separately,
|
||||
# it is ok if several tests are run in parallel - but they will see each
|
||||
# other's log messages so should try to ensure that unique strings (e.g.,
|
||||
# random table names) appear in the log message.
|
||||
@pytest.fixture(scope="function")
|
||||
def logfile(logfile_path):
|
||||
with open(logfile_path, 'r') as f:
|
||||
f.seek(0, io.SEEK_END)
|
||||
yield f
|
||||
|
||||
# wait_for_log() checks if the log, starting at its current position
|
||||
# (probably set by the logfile fixture), contains the given message -
|
||||
# and if it doesn't, calls pytest.fail().
|
||||
# Because it may take time for the log message to be flushed, and sometimes
|
||||
# we may want to look for messages about various delayed events, this
|
||||
# function doesn't give up when it reaches the end of file, and rather
|
||||
# retries until a given timeout. The timeout may be long, because successful
|
||||
# tests will not wait for it. Note, however, that long timeouts will make
|
||||
# xfailing tests slow.
|
||||
def wait_for_log(logfile, pattern, re_flags=re.MULTILINE, timeout=5):
|
||||
contents = logfile.read()
|
||||
prog = re.compile(pattern, re_flags)
|
||||
if prog.search(contents):
|
||||
return
|
||||
end = time.time() + timeout
|
||||
while time.time() < end:
|
||||
s = logfile.read()
|
||||
if s:
|
||||
# Though unlikely, it is possible that a single message got
|
||||
# split into two reads, so we need to check (and recheck)
|
||||
# the entire content since the beginning of this wait :-(
|
||||
contents = contents + s
|
||||
if prog.search(contents):
|
||||
return
|
||||
time.sleep(0.1)
|
||||
pytest.fail(f'Timed out ({timeout} seconds) looking for {pattern} in log file. Got:\n' + contents)
|
||||
|
||||
# A simple example of testing the log file - we check that a table creation,
|
||||
# and table deletion, both cause messages to appear on the log.
|
||||
def test_log_table_operations(dynamodb, logfile):
|
||||
schema = {
|
||||
'KeySchema': [{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
'AttributeDefinitions': [{ 'AttributeName': 'p', 'AttributeType': 'S' }]
|
||||
}
|
||||
with new_test_table(dynamodb, **schema) as table:
|
||||
wait_for_log(logfile, f'Creating keyspace alternator_{table.name}')
|
||||
wait_for_log(logfile, f'Dropping keyspace alternator_{table.name}')
|
||||
|
||||
# Test that when alternator_warn_authorization is set to true, WARN-level
|
||||
# log messages are generated on authentication or authorization errors.
|
||||
# This is in addition to the metric counting these errors, which are tested
|
||||
# in test_metrics.py. These are tests for issue #25308.
|
||||
# We check that alternator_warn_authorization enables log messages regardless
|
||||
# of what alternator_enforce_authorization is set to. If enforce_authorization
|
||||
# is also true, a message is logged and the request failed - and if it is
|
||||
# false the same message is logged and the request succeeds.
|
||||
#
|
||||
# It's important to have a regression test that these log messages appear,
|
||||
# because it is documented that they appear in "warn" mode and users may
|
||||
# rely on them instead of metrics to learn about auth setup problems.
|
||||
#
|
||||
# Note that we do not test that when alternator_warn_authorization is
|
||||
# set false, warnings are NOT logged - this is less important, and also
|
||||
# trickier to test what does NOT appear on the log (we definitely don't want
|
||||
# to wait for a timeout).
|
||||
#
|
||||
# We have several tests here, for several kinds of authentication and
|
||||
# authorization errors.
|
||||
|
||||
@contextmanager
|
||||
def scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_enforce_authorization', 'true' if enforce_auth else 'false'):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_warn_authorization', 'true' if warn_auth else 'false'):
|
||||
yield
|
||||
|
||||
# authentication failure 1: bogus username and secret key
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
def test_log_authentication_failure_1(dynamodb, logfile, test_table_s, enforce_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
|
||||
with new_dynamodb(dynamodb, 'bogus_username', 'bogus_secret_key') as d:
|
||||
tab = d.Table(test_table_s.name)
|
||||
# We don't expect get_item() to find any item, it should either
|
||||
# pass or fail depending on enforce_auth - but in any case it
|
||||
# should log the error.
|
||||
try:
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'UnrecognizedClientException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
wait_for_log(logfile, '^WARN .*user bogus_username.*client address')
|
||||
|
||||
# authentication failure 2: real username, wrong secret key
|
||||
# Unfortunately, tests that create a new role need to use CQL too.
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
def test_log_authentication_failure_2(dynamodb, cql, logfile, test_table_s, enforce_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, 'bogus_secret_key') as d:
|
||||
tab = d.Table(test_table_s.name)
|
||||
try:
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'UnrecognizedClientException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
wait_for_log(logfile, f'^WARN .*wrong signature for user {role}.*client address')
|
||||
|
||||
# Authorization failure - a valid user but without permissions to do a
|
||||
# given operation.
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
def test_log_authorization_failure(dynamodb, cql, logfile, test_table_s, enforce_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
tab = d.Table(test_table_s.name)
|
||||
# The new role is not a superuser, so should not have
|
||||
# permissions to read from this table created earlier by
|
||||
# the superuser.
|
||||
try:
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'AccessDeniedException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
wait_for_log(logfile, f'^WARN .*SELECT access on table.*{test_table_s.name} is denied to role {role}.*client address')
|
||||
@@ -34,6 +34,7 @@ import requests
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.test_manual_requests import get_signed_request
|
||||
from test.alternator.test_cql_rbac import new_dynamodb, new_role
|
||||
from test.alternator.util import random_string, new_test_table, is_aws, scylla_config_read, scylla_config_temporary
|
||||
|
||||
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
|
||||
@@ -861,6 +862,119 @@ def test_ttl_stats(dynamodb, metrics, alternator_ttl_period_in_seconds):
|
||||
time.sleep(0.1)
|
||||
assert not 'Item' in table.get_item(Key={'p': p0})
|
||||
|
||||
# The following tests check the authentication and authorization failure
|
||||
# counters:
|
||||
# * scylla_alternator_authentication_failures
|
||||
# * scylla_alternator_authorization_failures
|
||||
# as well as their interaction with the alternator_enforce_authorization
|
||||
# and alternator_warn_authorization configuration options:
|
||||
#
|
||||
# 1. When alternator_enforce_authorization and alternator_warn_authorization
|
||||
# are both set to "false", these two metrics aren't incremented (and
|
||||
# operations are allowed).
|
||||
# 2. When alternator_enforce_authorization is set to false but
|
||||
# alternator_warn_authorization is set to true, the two metrics are
|
||||
# incremented but the operations are still allowed.
|
||||
# 3. When alternator_enforce_authorization is set to "true", the two metrics
|
||||
# are incremented and the operations are not allowed.
|
||||
#
|
||||
# We have several tests here, for several kinds of authentication and
|
||||
# authorization errors. These are tests for issue #25308.
|
||||
|
||||
@contextmanager
|
||||
def scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_enforce_authorization', 'true' if enforce_auth else 'false'):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_warn_authorization', 'true' if warn_auth else 'false'):
|
||||
yield
|
||||
|
||||
# authentication failure 1: bogus username and secret key
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
@pytest.mark.parametrize("warn_auth", [True, False])
|
||||
def test_authentication_failure_1(dynamodb, metrics, test_table_s, enforce_auth, warn_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
|
||||
with new_dynamodb(dynamodb, 'bogus_username', 'bogus_secret_key') as d:
|
||||
# We don't expect get_item() to find any item, we just care if
|
||||
# to see if it experiences an authentication failure, and if
|
||||
# it increments the authentication failure metric.
|
||||
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
|
||||
tab = d.Table(test_table_s.name)
|
||||
try:
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'UnrecognizedClientException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
new_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
|
||||
if warn_auth or enforce_auth:
|
||||
# If Alternator has any reason to check the authentication
|
||||
# headers (i.e., either warn_auth or enforce_auth is enabled)
|
||||
# then it will count the errors.
|
||||
assert new_auth_failures == saved_auth_failures + 1
|
||||
else:
|
||||
# If Alternator has no reason to check the authentication
|
||||
# headers (i.e., both warn_auth and enforce_auth are off)
|
||||
# it won't check - and won't find or count auth errors.
|
||||
assert new_auth_failures == saved_auth_failures
|
||||
|
||||
# authentication failure 2: real username, wrong secret key
|
||||
# Unfortunately, tests that create a new role need to use CQL too.
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
@pytest.mark.parametrize("warn_auth", [True, False])
|
||||
def test_authentication_failure_2(dynamodb, cql, metrics, test_table_s, enforce_auth, warn_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, 'bogus_secret_key') as d:
|
||||
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
|
||||
tab = d.Table(test_table_s.name)
|
||||
try:
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'UnrecognizedClientException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
new_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
|
||||
if enforce_auth or warn_auth:
|
||||
assert new_auth_failures == saved_auth_failures + 1
|
||||
else:
|
||||
assert new_auth_failures == saved_auth_failures
|
||||
|
||||
# Authorization failure - a valid user but without permissions to do a
|
||||
# given operation.
|
||||
@pytest.mark.parametrize("enforce_auth", [True, False])
|
||||
@pytest.mark.parametrize("warn_auth", [True, False])
|
||||
def test_authorization_failure(dynamodb, cql, metrics, test_table_s, enforce_auth, warn_auth):
|
||||
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authorization_failures')
|
||||
tab = d.Table(test_table_s.name)
|
||||
try:
|
||||
# Note that the new role is not a superuser, so should
|
||||
# not have permissions to read from this table created
|
||||
# earlier by the superuser.
|
||||
tab.get_item(Key={'p': 'dog'})
|
||||
operation_succeeded = True
|
||||
except ClientError as e:
|
||||
assert 'AccessDeniedException' in str(e)
|
||||
operation_succeeded = False
|
||||
if enforce_auth:
|
||||
assert not operation_succeeded
|
||||
else:
|
||||
assert operation_succeeded
|
||||
new_auth_failures = get_metric(metrics, 'scylla_alternator_authorization_failures')
|
||||
if enforce_auth or warn_auth:
|
||||
assert new_auth_failures == saved_auth_failures + 1
|
||||
else:
|
||||
assert new_auth_failures == saved_auth_failures
|
||||
|
||||
# TODO: there are additional metrics which we don't yet test here. At the
|
||||
# time of this writing they are:
|
||||
# reads_before_write, write_using_lwt, shard_bounce_for_lwt,
|
||||
|
||||
@@ -2202,7 +2202,7 @@ SEASTAR_THREAD_TEST_CASE(test_construct_next_stream_set) {
|
||||
};
|
||||
|
||||
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
|
||||
std::vector<cdc::stream_id> stream_ids;
|
||||
utils::chunked_vector<cdc::stream_id> stream_ids;
|
||||
for (auto t : tokens) {
|
||||
stream_ids.push_back(stream_id_for_token(t));
|
||||
}
|
||||
@@ -2311,7 +2311,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
|
||||
};
|
||||
|
||||
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
|
||||
std::vector<cdc::stream_id> stream_ids;
|
||||
utils::chunked_vector<cdc::stream_id> stream_ids;
|
||||
for (auto t : tokens) {
|
||||
stream_ids.push_back(stream_id_for_token(t));
|
||||
}
|
||||
@@ -2406,7 +2406,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
|
||||
|
||||
struct cdc_gc_test_config {
|
||||
table_id table;
|
||||
std::vector<std::vector<cdc::stream_id>> streams;
|
||||
std::vector<utils::chunked_vector<cdc::stream_id>> streams;
|
||||
size_t new_base_stream;
|
||||
};
|
||||
|
||||
@@ -2522,11 +2522,11 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
|
||||
// as the base and the history is empty
|
||||
|
||||
auto table = table_id(utils::UUID_gen::get_time_UUID());
|
||||
std::vector<cdc::stream_id> streams0;
|
||||
utils::chunked_vector<cdc::stream_id> streams0;
|
||||
for (auto t : {10, 20, 30}) {
|
||||
streams0.emplace_back(dht::token(t), 0);
|
||||
}
|
||||
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
|
||||
cdc_gc_test_config test1 = {
|
||||
.table = table,
|
||||
@@ -2551,12 +2551,12 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
|
||||
// as the base and one history entry for open 50
|
||||
|
||||
auto table = table_id(utils::UUID_gen::get_time_UUID());
|
||||
std::vector<cdc::stream_id> streams0;
|
||||
utils::chunked_vector<cdc::stream_id> streams0;
|
||||
for (auto t : {10, 20, 30}) {
|
||||
streams0.emplace_back(dht::token(t), 0);
|
||||
}
|
||||
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
std::vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
|
||||
|
||||
cdc_gc_test_config test2 = {
|
||||
.table = table,
|
||||
@@ -2584,7 +2584,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_get_new_base) {
|
||||
auto tp = base_time + offset;
|
||||
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(tp.time_since_epoch()).count();
|
||||
|
||||
streams_map[ts] = cdc::committed_stream_set{tp, std::vector<cdc::stream_id>{}};
|
||||
streams_map[ts] = cdc::committed_stream_set{tp, utils::chunked_vector<cdc::stream_id>{}};
|
||||
}
|
||||
return streams_map;
|
||||
};
|
||||
|
||||
@@ -2015,7 +2015,7 @@ SEASTAR_TEST_CASE(test_table_compression) {
|
||||
|
||||
e.execute_cql("create table tb6 (foo text PRIMARY KEY, bar text);").get();
|
||||
BOOST_REQUIRE(e.local_db().has_schema("ks", "tb6"));
|
||||
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == compression_parameters::algorithm::lz4);
|
||||
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == e.local_db().get_config().sstable_compression_user_table_options().get_algorithm());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -270,19 +270,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())));
|
||||
auto tempstr = test(sst).filename(component_type::TemporaryStatistics);
|
||||
tests::touch_file(tempstr.native()).get();
|
||||
auto tempstat = fs::canonical(tempstr);
|
||||
auto statstr = test(sst).filename(component_type::Statistics);
|
||||
|
||||
with_sstable_directory(env, [&] (sharded<sstables::sstable_directory>& sstdir_ok) {
|
||||
auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok, {});
|
||||
BOOST_REQUIRE_NO_THROW(expect_ok.get());
|
||||
const auto& dir = env.local().tempdir();
|
||||
lister::scan_dir(dir.path(), lister::dir_entry_types::of<directory_entry_type::regular>(), [tempstat] (fs::path parent_dir, directory_entry de) {
|
||||
BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
BOOST_REQUIRE(!file_exists(tempstr.native()).get());
|
||||
BOOST_REQUIRE(file_exists(statstr.native()).get()); // sanity check that we didn't miss the directory itself
|
||||
});
|
||||
|
||||
remove_file(test(sst).filename(sstables::component_type::Statistics).native()).get();
|
||||
remove_file(statstr.native()).get();
|
||||
|
||||
with_sstable_directory(env, [] (sharded<sstables::sstable_directory>& sstdir_fatal) {
|
||||
auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal, {});
|
||||
|
||||
@@ -692,36 +692,48 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
|
||||
BOOST_REQUIRE(b.empty());
|
||||
};
|
||||
|
||||
{
|
||||
auto in = make_is();
|
||||
expect(in, buf1);
|
||||
expect(in, buf2);
|
||||
expect_eof(in);
|
||||
}
|
||||
|
||||
in = make_is();
|
||||
{
|
||||
auto in = make_is();
|
||||
in.skip(0).get();
|
||||
expect(in, buf1);
|
||||
expect(in, buf2);
|
||||
expect_eof(in);
|
||||
}
|
||||
|
||||
in = make_is();
|
||||
{
|
||||
auto in = make_is();
|
||||
expect(in, buf1);
|
||||
in.skip(0).get();
|
||||
expect(in, buf2);
|
||||
expect_eof(in);
|
||||
}
|
||||
|
||||
in = make_is();
|
||||
{
|
||||
auto in = make_is();
|
||||
expect(in, buf1);
|
||||
in.skip(opts.buffer_size).get();
|
||||
expect_eof(in);
|
||||
}
|
||||
|
||||
in = make_is();
|
||||
{
|
||||
auto in = make_is();
|
||||
in.skip(opts.buffer_size * 2).get();
|
||||
expect_eof(in);
|
||||
}
|
||||
|
||||
in = make_is();
|
||||
{
|
||||
auto in = make_is();
|
||||
in.skip(opts.buffer_size).get();
|
||||
in.skip(opts.buffer_size).get();
|
||||
expect_eof(in);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2142,6 +2142,30 @@ SEASTAR_THREAD_TEST_CASE(test_per_shard_count_respected_with_rack_list) {
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Reproduces https://github.com/scylladb/scylladb/issues/26768
|
||||
SEASTAR_THREAD_TEST_CASE(test_replacing_last_node_in_rack_with_rack_list_rf) {
|
||||
cql_test_config cfg{};
|
||||
cfg.db_config->tablets_initial_scale_factor.set(10);
|
||||
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
|
||||
auto rack1 = topo.rack();
|
||||
auto rack2 = topo.start_new_rack();
|
||||
auto dc = topo.dc();
|
||||
|
||||
auto host1 = topo.add_node(node_state::normal, 1, rack1);
|
||||
auto host2 = topo.add_node(node_state::normal, 1, rack2);
|
||||
|
||||
auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack, rack2.rack}}});
|
||||
auto table = add_table(e, ks_name).get();
|
||||
|
||||
topo.set_node_state(host2, node_state::left);
|
||||
|
||||
rebalance_tablets(e);
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_shrinks_respecting_rack_allocation) {
|
||||
cql_test_config cfg{};
|
||||
cfg.db_config->tablets_per_shard_goal.set(10);
|
||||
@@ -3681,6 +3705,143 @@ SEASTAR_THREAD_TEST_CASE(test_creating_lots_of_tables_doesnt_overflow_metadata)
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile) {
|
||||
auto cfg = tablet_cql_test_config();
|
||||
|
||||
// This test checks the correctness of the load_stats reconciliation algorithm.
|
||||
// We only attempt to reconcile tablet_sizes after a merge or a split.
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
auto dc = topo.dc();
|
||||
const size_t tablet_count = 16;
|
||||
|
||||
auto host = topo.add_node(node_state::normal, 4);
|
||||
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
|
||||
|
||||
sstring table_name = "table_1";
|
||||
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
|
||||
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
token_metadata_ptr old_tmptr = stm.get();
|
||||
auto& tmap = stm.get()->tablets().get_tablet_map(table);
|
||||
|
||||
auto set_tablet_count = [&] (size_t new_tablet_count) {
|
||||
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
|
||||
tablet_map new_tmap(new_tablet_count);
|
||||
tmeta.set_tablet_map(table, std::move(new_tmap));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
};
|
||||
|
||||
// This checks if the tablet sizes have been correctly reconciled after a merge
|
||||
{
|
||||
locator::load_stats stats;
|
||||
locator::tablet_load_stats& tls = stats.tablet_stats[host];
|
||||
for (size_t i = 0; i < tablet_count; ++i) {
|
||||
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
|
||||
tls.tablet_sizes[table][range] = i;
|
||||
}
|
||||
|
||||
size_t tablet_count_after_merge = tablet_count / 2;
|
||||
set_tablet_count(tablet_count_after_merge);
|
||||
|
||||
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
|
||||
BOOST_REQUIRE(reconciled_stats_ptr);
|
||||
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_merge);
|
||||
|
||||
locator::tablet_map tmap_after_merge(tablet_count_after_merge);
|
||||
for (size_t i = 0; i < tablet_count_after_merge; ++i) {
|
||||
dht::token_range trange {tmap_after_merge.get_token_range(locator::tablet_id{i})};
|
||||
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
|
||||
uint64_t expected_sum = 0;
|
||||
for (uint64_t i_sum = 0; i_sum < 2; ++i_sum) {
|
||||
expected_sum += i * 2 + i_sum;
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, expected_sum);
|
||||
}
|
||||
}
|
||||
|
||||
// This checks if the tablet sizes have been correctly reconciled after a split
|
||||
{
|
||||
locator::load_stats stats;
|
||||
locator::tablet_load_stats& tls = stats.tablet_stats[host];
|
||||
for (size_t i = 0; i < tablet_count; ++i) {
|
||||
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
|
||||
tls.tablet_sizes[table][range] = i * 2;
|
||||
}
|
||||
|
||||
size_t tablet_count_after_split = tablet_count * 2;
|
||||
set_tablet_count(tablet_count_after_split);
|
||||
|
||||
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
|
||||
BOOST_REQUIRE(reconciled_stats_ptr);
|
||||
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_split);
|
||||
|
||||
locator::tablet_map tmap_after_split(tablet_count_after_split);
|
||||
for (size_t i = 0; i < tablet_count_after_split; ++i) {
|
||||
dht::token_range trange {tmap_after_split.get_token_range(locator::tablet_id{i})};
|
||||
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
|
||||
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, i / 2);
|
||||
}
|
||||
}
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile_tablet_not_found) {
|
||||
auto cfg = tablet_cql_test_config();
|
||||
|
||||
// This test checks if the reconcile tablet algorithm returns nullptr when it
|
||||
// can't find all the tablet sizes in load_stats
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
topology_builder topo(e);
|
||||
auto dc = topo.dc();
|
||||
const size_t tablet_count = 16;
|
||||
|
||||
auto host = topo.add_node(node_state::normal, 4);
|
||||
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
|
||||
|
||||
sstring table_name = "table_1";
|
||||
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
|
||||
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
auto& tmap = stm.get()->tablets().get_tablet_map(table);
|
||||
|
||||
locator::load_stats stats;
|
||||
locator::tablet_load_stats& tls = stats.tablet_stats[host];
|
||||
// Add all tablet sizes except the last one. This will cause reconcile to return a nullptr
|
||||
for (size_t i = 0; i < tablet_count - 1; ++i) {
|
||||
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
|
||||
tls.tablet_sizes[table][range] = i;
|
||||
}
|
||||
|
||||
token_metadata_ptr old_tm { stm.get() };
|
||||
|
||||
auto set_tablet_count = [&] (size_t new_tablet_count) {
|
||||
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
|
||||
tablet_map new_tmap(new_tablet_count);
|
||||
tmeta.set_tablet_map(table, std::move(new_tmap));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
};
|
||||
|
||||
// Test if merge reconcile detects a missing sibling tablet in load_stats
|
||||
set_tablet_count(tablet_count / 2);
|
||||
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
|
||||
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
|
||||
|
||||
// Test if split reconcile detects a missing tablet in load_stats
|
||||
set_tablet_count(tablet_count * 2);
|
||||
reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
|
||||
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_tablet_id_and_range_side) {
|
||||
static constexpr size_t tablet_count = 128;
|
||||
locator::tablet_map tmap(tablet_count);
|
||||
|
||||
@@ -11,6 +11,7 @@ import logging
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from functools import cached_property
|
||||
from functools import wraps
|
||||
from typing import List, Dict, Callable
|
||||
@@ -147,13 +148,24 @@ class Worker:
|
||||
update.serial_consistency_level = ConsistencyLevel.LOCAL_SERIAL
|
||||
try:
|
||||
res = await self.cql.run_async(update)
|
||||
applied = bool(res and res[0].applied)
|
||||
assert applied, f"LWT not applied: pk={pk} s{self.worker_id} new={new_val} guard={guard_vals} prev={prev_val}"
|
||||
except (WriteTimeout, OperationTimedOut, ReadTimeout) as e:
|
||||
if not is_uncertainty_timeout(e):
|
||||
raise
|
||||
applied = await self.verify_update_through_select(pk, new_val, prev_val)
|
||||
|
||||
else:
|
||||
applied = bool(res and res[0].applied)
|
||||
if not applied:
|
||||
logger.error(
|
||||
"LWT_NOT_APPLIED pk=%r worker=s%d new=%r guard=%r prev=%r ts_ms=%d",
|
||||
pk,
|
||||
self.worker_id,
|
||||
new_val,
|
||||
guard_vals,
|
||||
prev_val,
|
||||
)
|
||||
raise AssertionError(
|
||||
f"LWT not applied: pk={pk} s{self.worker_id} new={new_val} guard={guard_vals} prev={prev_val}"
|
||||
)
|
||||
if applied:
|
||||
self.on_applied(pk, self.worker_id, new_val)
|
||||
self.success_counts[pk] += 1
|
||||
@@ -189,7 +201,7 @@ class BaseLWTTester:
|
||||
self.pk_to_token: Dict[int, int] = {}
|
||||
self.migrations = 0
|
||||
self.phase = "warmup" # "warmup" -> "migrating" -> "post"
|
||||
self.phase_ops = {"warmup": 0, "migrating": 0, "post": 0}
|
||||
self.phase_ops = defaultdict(int)
|
||||
|
||||
def _get_lower_bound(self, pk: int, col_idx: int) -> int:
|
||||
return self.lb_counts[pk][col_idx]
|
||||
|
||||
224
test/cluster/lwt/test_lwt_during_tablets_resize.py
Normal file
224
test/cluster/lwt/test_lwt_during_tablets_resize.py
Normal file
@@ -0,0 +1,224 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
|
||||
import pytest
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.lwt.lwt_common import (
|
||||
BaseLWTTester,
|
||||
wait_for_tablet_count,
|
||||
DEFAULT_WORKERS,
|
||||
DEFAULT_NUM_KEYS,
|
||||
)
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.tablets import get_tablet_count
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Test constants
|
||||
TARGET_RESIZE_COUNT = 20
|
||||
WARMUP_LWT_CNT = 100
|
||||
POST_LWT_CNT = 100
|
||||
PHASE_WARMUP = "warmup"
|
||||
PHASE_POST = "post"
|
||||
PHASE_RESIZE = "resize"
|
||||
MIN_TABLETS = 1
|
||||
MAX_TABLETS = 20
|
||||
RESIZE_TIMEOUT = 240
|
||||
|
||||
|
||||
def powers_of_two_in_range(lo: int, hi: int):
|
||||
if lo > hi or hi < 1:
|
||||
return []
|
||||
lo = max(1, lo)
|
||||
start_e = (lo - 1).bit_length()
|
||||
end_e = hi.bit_length()
|
||||
return [1 << e for e in range(start_e, end_e + 1) if (1 << e) <= hi]
|
||||
|
||||
|
||||
async def run_random_resizes(
|
||||
stop_event_: asyncio.Event,
|
||||
manager: ManagerClient,
|
||||
servers,
|
||||
tester: BaseLWTTester,
|
||||
ks: str,
|
||||
table: str,
|
||||
target_steps: int = TARGET_RESIZE_COUNT,
|
||||
pause_range=(0.5, 2.0)
|
||||
):
|
||||
"""
|
||||
Perform randomized tablet count changes (splits/merges) until target resize count is reached
|
||||
or stop_event_ is set. Returns a dict with simple stats.
|
||||
"""
|
||||
split_count = 0
|
||||
merge_count = 0
|
||||
current_resize_count = 0
|
||||
pow2_targets = powers_of_two_in_range(MIN_TABLETS, MAX_TABLETS)
|
||||
|
||||
while not stop_event_.is_set() and current_resize_count < target_steps:
|
||||
current_count = await get_tablet_count(manager, servers[0], ks, table)
|
||||
candidates = [t for t in pow2_targets if t != current_count]
|
||||
target_cnt = random.choice(candidates)
|
||||
|
||||
direction = "split" if target_cnt > current_count else "merge"
|
||||
logger.info(
|
||||
"[%s] starting: %s.%s tablet_count %d -> target %d",
|
||||
direction.upper(),
|
||||
ks,
|
||||
table,
|
||||
current_count,
|
||||
target_cnt,
|
||||
)
|
||||
|
||||
# Apply resize
|
||||
await tester.cql.run_async(
|
||||
f"ALTER TABLE {ks}.{table} WITH tablets = {{'min_tablet_count': {target_cnt}}}"
|
||||
)
|
||||
|
||||
count_after_resize = await wait_for_tablet_count(
|
||||
manager, servers[0], tester.ks, tester.tbl,
|
||||
predicate=(
|
||||
(lambda c, tgt=target_cnt: c >= tgt)
|
||||
if direction == "split"
|
||||
else (lambda c, tgt=target_cnt: c <= tgt)
|
||||
),
|
||||
target=target_cnt,
|
||||
timeout_s=RESIZE_TIMEOUT
|
||||
)
|
||||
|
||||
if direction == "split":
|
||||
logger.info(
|
||||
"[SPLIT] converged: %s.%s tablet_count %d -> %d (target %d)",
|
||||
ks,
|
||||
table,
|
||||
current_count,
|
||||
count_after_resize,
|
||||
target_cnt,
|
||||
)
|
||||
assert count_after_resize >= current_count, (
|
||||
f"Tablet count expected to be increased during split (was {current_count}, now {count_after_resize})"
|
||||
)
|
||||
split_count += 1
|
||||
else:
|
||||
logger.info(
|
||||
"[MERGE] converged: %s.%s tablet_count %d -> %d (target %d)",
|
||||
ks,
|
||||
table,
|
||||
current_count,
|
||||
count_after_resize,
|
||||
target_cnt,
|
||||
)
|
||||
assert count_after_resize <= current_count, (
|
||||
f"Tablet count expected to be decreased during merge (was {current_count}, now {count_after_resize})"
|
||||
)
|
||||
merge_count += 1
|
||||
|
||||
current_resize_count += 1
|
||||
await asyncio.sleep(random.uniform(*pause_range))
|
||||
|
||||
return {
|
||||
"steps_done": current_resize_count,
|
||||
"seen_split": split_count,
|
||||
"seen_merge": merge_count,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode("release", "error injections are not supported in release mode")
|
||||
@skip_mode("debug", "debug mode is too slow for this test")
|
||||
async def test_multi_column_lwt_during_split_merge(manager: ManagerClient):
|
||||
"""
|
||||
Test scenario:
|
||||
1. Start N servers with tablets enabled
|
||||
2. Create keyspace/table
|
||||
3. Insert rows, precompute pk->token
|
||||
4. Start LWT workers
|
||||
5. Run randomized tablet resizing in parallel
|
||||
6. Stop workers and verify consistency
|
||||
"""
|
||||
cfg = {
|
||||
"enable_tablets": True,
|
||||
"tablet_load_stats_refresh_interval_in_seconds": 1,
|
||||
"target-tablet-size-in-bytes": 1024 * 16,
|
||||
}
|
||||
properties = [
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
]
|
||||
cmdline = [
|
||||
'--logger-log-level', 'paxos=trace'
|
||||
]
|
||||
servers = await manager.servers_add(6, config=cfg, cmdline=cmdline, property_file=properties)
|
||||
|
||||
async with new_test_keyspace(
|
||||
manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} "
|
||||
"AND tablets = {'initial': 1}",
|
||||
) as ks:
|
||||
stop_event_ = asyncio.Event()
|
||||
table = "lwt_split_merge_table"
|
||||
tester = BaseLWTTester(
|
||||
manager,
|
||||
ks,
|
||||
table,
|
||||
num_workers=DEFAULT_WORKERS,
|
||||
num_keys=DEFAULT_NUM_KEYS,
|
||||
)
|
||||
|
||||
await tester.create_schema()
|
||||
await tester.initialize_rows()
|
||||
await tester.start_workers(stop_event_)
|
||||
|
||||
try:
|
||||
# Phase 1: warmup LWT (100 applied CAS)
|
||||
tester.set_phase(PHASE_WARMUP)
|
||||
logger.info("LWT warmup: waiting for %d applied CAS", WARMUP_LWT_CNT)
|
||||
await tester.wait_for_phase_ops(stop_event_, PHASE_WARMUP, WARMUP_LWT_CNT, timeout=180, poll=0.2)
|
||||
logger.info("LWT warmup complete: %d ops", tester.get_phase_ops(PHASE_WARMUP))
|
||||
|
||||
# Phase 2: randomized resizes with LWT running
|
||||
logger.info(f"LWT during split/merge phase starting")
|
||||
tester.set_phase(PHASE_RESIZE)
|
||||
|
||||
resize_stats = await run_random_resizes(
|
||||
stop_event_=stop_event_,
|
||||
manager=manager,
|
||||
servers=servers,
|
||||
tester=tester,
|
||||
ks=ks,
|
||||
table=table,
|
||||
target_steps=TARGET_RESIZE_COUNT,
|
||||
)
|
||||
logger.info("LWT resize complete: %d ops", tester.get_phase_ops(PHASE_RESIZE))
|
||||
|
||||
# Phase 3: post resize LWT (100 applied CAS)
|
||||
tester.set_phase(PHASE_POST)
|
||||
logger.info("LWT post resize: waiting for %d applied CAS", POST_LWT_CNT)
|
||||
await tester.wait_for_phase_ops(stop_event_, PHASE_POST, POST_LWT_CNT, timeout=180, poll=0.2)
|
||||
logger.info("LWT post resize complete: %d ops", tester.get_phase_ops(PHASE_POST))
|
||||
|
||||
logger.info(
|
||||
"Randomized resize complete: steps_done=%d, seen_split=%s, seen_merge=%s, ops=%d",
|
||||
resize_stats["steps_done"],
|
||||
resize_stats["seen_split"],
|
||||
resize_stats["seen_merge"],
|
||||
sum(tester.phase_ops.values()),
|
||||
)
|
||||
|
||||
finally:
|
||||
await tester.stop_workers()
|
||||
|
||||
await tester.verify_consistency()
|
||||
logger.info("Multi-column LWT during randomized split/merge test completed successfully")
|
||||
@@ -6,6 +6,7 @@
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient, wait_for_cql_and_get_hosts
|
||||
from test.pylib.tablets import get_tablet_replica
|
||||
@@ -193,3 +194,45 @@ async def test_interrupt_view_build_shard_registration(manager: ManagerClient):
|
||||
assert len(res) == n_partitions
|
||||
res = await cql.run_async(f"SELECT * FROM {ks}.mv")
|
||||
assert len(res) == n_partitions
|
||||
|
||||
# The test verifies that when a reshard happens when building multiple views,
|
||||
# which have different progress, we won't mistakenly decide that a view is built
|
||||
# even if a build step is empty due to resharding.
|
||||
# Reproduces https://github.com/scylladb/scylladb/issues/26523
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_empty_build_step_after_reshard(manager: ManagerClient):
|
||||
server = await manager.server_add(cmdline=['--smp', '1', '--logger-log-level', 'view=debug'])
|
||||
partitions = random.sample(range(1000), 129) # need more than 128 to allow the first build step to finish and save the progress
|
||||
logger.info(f"Using partitions: {partitions}")
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets={{'enabled':false}}")
|
||||
await cql.run_async(f"CREATE TABLE ks.test (p int, c int, PRIMARY KEY(p,c));")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO ks.test (p, c) VALUES ({k}, {k+1});") for k in partitions])
|
||||
|
||||
# Create first materialized view and wait until building starts. The base table has enough partitions for 2 build steps.
|
||||
# Allow the first build step to finish and save progress. In the second step there's only one partition left to build, which will land only on one
|
||||
# of the shards after resharding.
|
||||
await manager.api.enable_injection(server.ip_addr, "delay_finishing_build_step", one_shot=False)
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv AS SELECT p, c FROM ks.test WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
|
||||
async def progress_saved():
|
||||
rows = await cql.run_async(f"SELECT * FROM system.scylla_views_builds_in_progress WHERE keyspace_name = 'ks' AND view_name = 'mv'")
|
||||
return len(rows) > 0 or None
|
||||
await wait_for(progress_saved, time.time() + 60)
|
||||
await manager.api.enable_injection(server.ip_addr, "dont_start_build_step", one_shot=False)
|
||||
await manager.api.message_injection(server.ip_addr, "delay_finishing_build_step")
|
||||
|
||||
# Create second materialized view and immediately restart the server to cause resharding. The new view will effectively start building after the restart.
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv2 AS SELECT p, c FROM ks.test WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
await manager.server_start(server.server_id, cmdline_options_override=['--smp', '2', '--logger-log-level', 'view=debug'])
|
||||
cql = await reconnect_driver(manager)
|
||||
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
||||
await wait_for_view(cql, 'mv', 1)
|
||||
await wait_for_view(cql, 'mv2', 1)
|
||||
|
||||
# Verify that no rows are missing
|
||||
base_rows = await cql.run_async(f"SELECT * FROM ks.test")
|
||||
mv_rows = await cql.run_async(f"SELECT * FROM ks.mv")
|
||||
mv2_rows = await cql.run_async(f"SELECT * FROM ks.mv2")
|
||||
assert len(base_rows) == len(mv_rows) == len(mv2_rows) == 129
|
||||
|
||||
94
test/cluster/test_counters_with_tablets.py
Normal file
94
test/cluster/test_counters_with_tablets.py
Normal file
@@ -0,0 +1,94 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.tablets import get_tablet_replica
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("migration_type", ["internode", "intranode"])
|
||||
async def test_counter_updates_during_tablet_migration(manager: ManagerClient, migration_type: str):
|
||||
"""
|
||||
Test that counter updates remain consistent during tablet migrations.
|
||||
|
||||
This test performs concurrent counter increment operations on a single partition
|
||||
while simultaneously triggering a tablet migration, either between nodes or
|
||||
between shards on the same node.
|
||||
|
||||
The test verifies that counter consistency is maintained throughout the migration
|
||||
process: no counter updates are lost, and the final counter value matches the total
|
||||
number of increments performed.
|
||||
"""
|
||||
|
||||
if migration_type == "intranode":
|
||||
node_count = 1
|
||||
else:
|
||||
node_count = 3
|
||||
|
||||
cmdline = ['--smp', '2', '--logger-log-level', 'raft_topology=debug', '--logger-log-level', 'storage_service=debug']
|
||||
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||
cql = manager.get_cql()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets={'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.counters (pk int PRIMARY KEY, c counter)")
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
pk = 1 # Single partition key for all updates
|
||||
|
||||
async def do_counter_updates():
|
||||
"""Continuously update a single counter during migration"""
|
||||
update_count = 0
|
||||
|
||||
while not stop_event.is_set():
|
||||
await asyncio.gather(*[cql.run_async(f"UPDATE {ks}.counters SET c = c + 1 WHERE pk = {pk}") for _ in range(100)])
|
||||
update_count += 100
|
||||
|
||||
return update_count
|
||||
|
||||
async def do_migration():
|
||||
"""Perform the specified type of tablet migration"""
|
||||
try:
|
||||
tablet_token = 0 # the single tablet
|
||||
replica = await get_tablet_replica(manager, servers[0], ks, 'counters', tablet_token)
|
||||
src_host = replica[0]
|
||||
src_shard = replica[1]
|
||||
|
||||
if migration_type == "internode":
|
||||
# Find a different node to migrate to
|
||||
all_host_ids = [await manager.get_host_id(server.server_id) for server in servers]
|
||||
dst_host = next(host_id for host_id in all_host_ids if host_id != src_host)
|
||||
dst_shard = 0
|
||||
else: # migration_type == "intranode"
|
||||
# Move to a different shard on the same node
|
||||
dst_host = src_host
|
||||
dst_shard = 1 - src_shard # Switch between shard 0 and 1
|
||||
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "counters", src_host, src_shard, dst_host, dst_shard, tablet_token)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
# Run counter updates and migration concurrently
|
||||
update_task = asyncio.create_task(do_counter_updates())
|
||||
await asyncio.sleep(0.5)
|
||||
await do_migration()
|
||||
total_updates = await update_task
|
||||
logger.info("Completed %d counter updates during migration", total_updates)
|
||||
|
||||
# Verify no increments were lost - counter value should equal number of updates
|
||||
result = await cql.run_async(f"SELECT c FROM {ks}.counters WHERE pk = {pk}")
|
||||
actual_count = result[0].c
|
||||
|
||||
assert actual_count == total_updates, f"Counter value mismatch: expected {total_updates}, got {actual_count}"
|
||||
@@ -92,11 +92,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
Column("pk", IntType),
|
||||
Column('int_c', IntType)
|
||||
])
|
||||
if not tablets_enabled: # issue #18180
|
||||
await random_tables.add_table(name='t2', pks=1, columns=[
|
||||
Column("pk", IntType),
|
||||
Column('counter_c', CounterType)
|
||||
])
|
||||
await random_tables.add_table(name='t2', pks=1, columns=[
|
||||
Column("pk", IntType),
|
||||
Column('counter_c', CounterType)
|
||||
])
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async(f"USE {random_tables.keyspace}")
|
||||
|
||||
@@ -126,10 +125,9 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
with pytest.raises(WriteFailure, match="stale topology exception"):
|
||||
await cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host2)
|
||||
|
||||
if not tablets_enabled: # issue #18180
|
||||
logger.info(f"trying to write through host2 to counter column [{host2}]")
|
||||
with pytest.raises(WriteFailure, match="stale topology exception"):
|
||||
await cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
|
||||
logger.info(f"trying to write through host2 to counter column [{host2}]")
|
||||
with pytest.raises(WriteFailure, match="stale topology exception"):
|
||||
await cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
|
||||
|
||||
random_tables.drop_all()
|
||||
|
||||
|
||||
@@ -157,6 +157,9 @@ def get_incremental_repair_sst_skipped_bytes(server):
|
||||
def get_incremental_repair_sst_read_bytes(server):
|
||||
return get_metrics(server, "scylla_repair_inc_sst_read_bytes")
|
||||
|
||||
def get_repair_tablet_time_ms(server):
|
||||
return get_metrics(server, "scylla_repair_tablet_time_ms")
|
||||
|
||||
async def get_sstables_for_server(manager, server, ks):
|
||||
node_workdir = await manager.server_get_workdir(server.server_id)
|
||||
sstables = get_sstables(node_workdir, ks, 'test')
|
||||
@@ -671,3 +674,18 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
assert skip1 == skip2
|
||||
assert read1 < read2
|
||||
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
time1 = 0
|
||||
time2 = 0
|
||||
|
||||
for s in servers:
|
||||
time1 += get_repair_tablet_time_ms(s)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
for s in servers:
|
||||
time2 += get_repair_tablet_time_ms(s)
|
||||
|
||||
assert time1 == 0
|
||||
assert time2 > 0
|
||||
|
||||
@@ -24,7 +24,7 @@ async def validate_status_operation(result: str, live_eps: list, down_eps: list,
|
||||
assert lines[i] == "=" * dc_line_len
|
||||
|
||||
i += 1
|
||||
assert lines[i] == "Status=Up/Down"
|
||||
assert lines[i].startswith("Status=Up/Down")
|
||||
|
||||
i += 1
|
||||
assert lines[i] == "|/ State=Normal/Leaving/Joining/Moving"
|
||||
@@ -47,7 +47,10 @@ async def validate_status_operation(result: str, live_eps: list, down_eps: list,
|
||||
|
||||
assert ep in (live_eps + down_eps)
|
||||
|
||||
assert status_state[0] == ('U' if ep in live_eps else 'D')
|
||||
if ep in live_eps:
|
||||
assert status_state[0] == 'U'
|
||||
else:
|
||||
assert status_state[0] in ['D', 'X']
|
||||
|
||||
if ep in joining:
|
||||
assert status_state[1] == 'J'
|
||||
|
||||
@@ -22,7 +22,7 @@ class Measurement(NamedTuple):
|
||||
metric_error_threshold: int = 1000
|
||||
|
||||
class DB(NamedTuple):
|
||||
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' } AND tablets = { 'enabled' : false }"
|
||||
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' }"
|
||||
tbl_schema: str = "p int, c int, PRIMARY KEY (p)"
|
||||
tbl_opts: str = ""
|
||||
prep_stmt_gen: Callable[[str], str] | None = None
|
||||
|
||||
@@ -4,10 +4,14 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import os
|
||||
import time
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
|
||||
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
@@ -23,25 +27,6 @@ def yaml_to_cmdline(config):
|
||||
return cmdline
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
||||
async def test_dict_compression_not_allowed(manager: ManagerClient, cfg_source: str):
|
||||
config = {
|
||||
'sstable_compression_dictionaries_allow_in_ddl': False,
|
||||
'sstable_compression_user_table_options': {
|
||||
'sstable_compression': 'ZstdWithDictsCompressor',
|
||||
'chunk_length_in_kb': 4,
|
||||
'compression_level': 10
|
||||
}
|
||||
}
|
||||
expected_error = 'Invalid sstable_compression_user_table_options: sstable_compression ZstdWithDictsCompressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`'
|
||||
|
||||
if cfg_source == 'yaml':
|
||||
await manager.server_add(config=config, expected_error=expected_error)
|
||||
else:
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
|
||||
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
|
||||
@@ -105,4 +90,59 @@ async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source
|
||||
if cfg_source == 'yaml':
|
||||
await manager.server_add(config=config, expected_error=expected_error)
|
||||
else:
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
|
||||
"""
|
||||
Check that the default SSTable compression algorithm is:
|
||||
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
|
||||
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
|
||||
|
||||
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
|
||||
- Create a table. Ensure that it uses the LZ4Compressor.
|
||||
- Upgrade one node.
|
||||
- Create a second table. Ensure that it still uses the LZ4Compressor.
|
||||
- Upgrade the second node.
|
||||
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
|
||||
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
|
||||
"""
|
||||
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
|
||||
"""Helper to create a table and verify its compression algorithm."""
|
||||
logger.info(f"Creating table {table_name} ({context})")
|
||||
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
|
||||
|
||||
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
|
||||
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
|
||||
actual_compression = result[0].compression.get("sstable_compression")
|
||||
logger.info(f"Actual compression for {table_name}: {actual_compression}")
|
||||
|
||||
assert actual_compression == expected_compression, \
|
||||
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
|
||||
|
||||
new_exe = os.getenv("SCYLLA")
|
||||
assert new_exe
|
||||
|
||||
logger.info("Starting servers with version 2025.1")
|
||||
servers = await manager.servers_add(2, version=scylla_2025_1)
|
||||
|
||||
logger.info("Creating a test keyspace")
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
|
||||
|
||||
logger.info("Upgrading server 0")
|
||||
await manager.server_change_version(servers[0].server_id, new_exe)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
|
||||
|
||||
logger.info("Upgrading server 1")
|
||||
await manager.server_change_version(servers[1].server_id, new_exe)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
|
||||
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
|
||||
|
||||
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled")
|
||||
@@ -8,14 +8,10 @@ import logging
|
||||
import pytest
|
||||
import itertools
|
||||
import time
|
||||
import contextlib
|
||||
import typing
|
||||
from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.pylib.rest_client import read_barrier, ScyllaMetrics, HTTPError
|
||||
from test.pylib.rest_client import read_barrier, ScyllaMetrics
|
||||
from cassandra.cluster import ConsistencyLevel, Session as CassandraSession
|
||||
from cassandra.policies import FallthroughRetryPolicy, ConstantReconnectionPolicy
|
||||
from cassandra.protocol import ServerError
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -494,83 +490,3 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
|
||||
for algo in nondict_algorithms:
|
||||
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
|
||||
assert (await get_compressor_names(no_compression)) == set()
|
||||
|
||||
async def test_sstable_compression_dictionaries_allow_in_ddl(manager: ManagerClient):
|
||||
"""
|
||||
Tests the sstable_compression_dictionaries_allow_in_ddl option.
|
||||
When it's disabled, ALTER and CREATE statements should not be allowed
|
||||
to configure tables to use compression dictionaries for sstables.
|
||||
"""
|
||||
# Bootstrap cluster and configure server
|
||||
logger.info("Bootstrapping cluster")
|
||||
|
||||
servers = (await manager.servers_add(1, cmdline=[
|
||||
*common_debug_cli_options,
|
||||
"--sstable-compression-dictionaries-allow-in-ddl=false",
|
||||
], auto_rack_dc="dc1"))
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def with_expect_server_error(msg):
|
||||
try:
|
||||
yield
|
||||
except ServerError as e:
|
||||
if e.message != msg:
|
||||
raise
|
||||
else:
|
||||
raise Exception('Expected a ServerError, got no exceptions')
|
||||
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
await cql.run_async("""
|
||||
CREATE KEYSPACE test
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
""")
|
||||
|
||||
for new_algo in ['LZ4WithDicts', 'ZstdWithDicts']:
|
||||
logger.info(f"Tested algorithm: {new_algo}")
|
||||
table_name = f"test.{new_algo}"
|
||||
|
||||
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents CREATE with dict compression")
|
||||
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Enable the config option")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
|
||||
|
||||
logger.info("CREATE the table with dict compression")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Disable compression on the table")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': ''}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Disable the config option again")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")
|
||||
|
||||
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents ALTER with dict compression")
|
||||
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
|
||||
logger.info("Enable the config option again")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
|
||||
|
||||
logger.info("ALTER the table with dict compression")
|
||||
await cql.run_async(SimpleStatement(f'''
|
||||
ALTER TABLE {table_name}
|
||||
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
|
||||
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
|
||||
logger.info("Enable the config option again")
|
||||
|
||||
logger.info("Disable the config option for the next test")
|
||||
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import uuid
|
||||
|
||||
from cassandra.protocol import ConfigurationException, InvalidRequest, SyntaxException
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
from test.cluster.test_tablets2 import safe_rolling_restart
|
||||
@@ -921,6 +923,45 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
|
||||
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
||||
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_excludenode(manager: ManagerClient):
|
||||
"""
|
||||
Verifies recovery scenario involving marking the node as excluded using excludenode.
|
||||
|
||||
1. Create a cluster with 3 racks, 1 node in rack1, 2 nodes in rack2, and 1 in rack3.
|
||||
2. The keyspace is initially replicated to rack1 and rack2.
|
||||
3. We down one node in rack2, which should cause unavailability.
|
||||
4. We mark the node as excluded using excludenode. This unblocks the next ALTER
|
||||
5. We add rack3 to RF of the keyspace. This wouldn't succeed without marking the node as excluded.
|
||||
6. We verify that downed node can be removed successfully, while there are still tablets on it. That's
|
||||
why we need two nodes in rack2.
|
||||
"""
|
||||
servers = await manager.servers_add(servers_num=3, auto_rack_dc='dc1')
|
||||
await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack2'})
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', "
|
||||
"'dc1': ['rack1', 'rack2']} AND tablets = { 'initial': 8 }") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
live_node = servers[0]
|
||||
node_to_remove = servers[1]
|
||||
with pytest.raises(Exception, match="Cannot mark host .* as excluded because it's alive"):
|
||||
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
||||
|
||||
with pytest.raises(Exception, match=".* does not belong to this cluster"):
|
||||
await manager.api.exclude_node(live_node.ip_addr, hosts=[str(uuid.uuid4())])
|
||||
|
||||
await manager.server_stop(node_to_remove.server_id)
|
||||
await manager.others_not_see_server(node_to_remove.ip_addr)
|
||||
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
||||
|
||||
# Check that tablets can be rebuilt in a new rack with rack2 down.
|
||||
await cql.run_async(f"ALTER KEYSPACE {ks} WITH REPLICATION = {{ 'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2', 'rack3']}}")
|
||||
|
||||
# Check that removenode succeeds on the node which is excluded
|
||||
await manager.remove_node(live_node.server_id, server_id=node_to_remove.server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
||||
async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_token_node: bool):
|
||||
|
||||
@@ -1727,7 +1727,6 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
|
||||
cmdline = [
|
||||
'--logger-log-level', 'storage_service=debug',
|
||||
'--logger-log-level', 'table=debug',
|
||||
'--smp', '1',
|
||||
]
|
||||
servers = [await manager.server_add(config={
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
|
||||
@@ -534,7 +534,8 @@ async def test_view_building_while_tablet_streaming_fail(manager: ManagerClient)
|
||||
|
||||
tablet_token = 0 # Doesn't matter since there is one tablet
|
||||
replica = await get_tablet_replica(manager, servers[0], ks, 'tab', tablet_token)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_fail", one_shot=True)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_fail", one_shot=False)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_move_to_cleanup", one_shot=False)
|
||||
await asyncio.gather(*(manager.api.disable_injection(s.ip_addr, VIEW_BUILDING_WORKER_PAUSE_BUILD_RANGE_TASK) for s in servers))
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "tab", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
-- Error messages contain a keyspace name. Make the output stable.
|
||||
CREATE KEYSPACE ks
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
AND TABLETS = {'enabled': false};
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
|
||||
USE ks;
|
||||
|
||||
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
> -- Error messages contain a keyspace name. Make the output stable.
|
||||
> CREATE KEYSPACE ks
|
||||
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
> AND TABLETS = {'enabled': false};
|
||||
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
|
||||
OK
|
||||
> USE ks;
|
||||
OK
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
-- Error messages contain a keyspace name. Make the output stable.
|
||||
CREATE KEYSPACE ks
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
AND TABLETS = {'enabled': false};
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
|
||||
USE ks;
|
||||
|
||||
CREATE TABLE ks.tbl_cnt (pk int PRIMARY KEY, c1 counter);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
> -- Error messages contain a keyspace name. Make the output stable.
|
||||
> CREATE KEYSPACE ks
|
||||
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
|
||||
> AND TABLETS = {'enabled': false};
|
||||
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
|
||||
OK
|
||||
> USE ks;
|
||||
OK
|
||||
|
||||
@@ -44,39 +44,21 @@ def testNonCounterInCounterBatch(cql, test_keyspace):
|
||||
with pytest.raises(InvalidRequest):
|
||||
sendBatch(cql, test_keyspace, BatchType.COUNTER, False, True, False)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testNonCounterInLoggedBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.LOGGED, False, True, False)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testNonCounterInUnLoggedBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, False, True, False)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterInCounterBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.COUNTER, True, False, False)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterInUnLoggedBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, True, False, False)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testTableWithClusteringInLoggedBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.LOGGED, False, False, True)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testTableWithClusteringInUnLoggedBatch(cql, test_keyspace):
|
||||
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, False, False, True)
|
||||
|
||||
|
||||
@@ -242,9 +242,6 @@ def testCastsWithReverseOrder(cql, test_keyspace):
|
||||
# row("2.0"))
|
||||
|
||||
# Reproduces #14501:
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterCastsInSelectionClause(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a int primary key, b counter)") as table:
|
||||
execute(cql, table, "UPDATE %s SET b = b + 2 WHERE a = 1")
|
||||
|
||||
@@ -12,9 +12,6 @@ from cassandra.query import UNSET_VALUE
|
||||
|
||||
# Test for the validation bug of CASSANDRA-4706,
|
||||
# migrated from cql_tests.py:TestCQL.validate_counter_regular_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testRegularCounters(cql, test_keyspace):
|
||||
# The Cassandra and Scylla error messages are different: Cassandra says
|
||||
# "Cannot mix counter and non counter columns in the same table", Scylla
|
||||
@@ -31,9 +28,6 @@ def testRegularCounters(cql, test_keyspace):
|
||||
"CREATE TABLE %s (id bigint PRIMARY KEY, count counter, things set<text>)")
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.collection_counter_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCountersOnCollections(cql, test_keyspace):
|
||||
assert_invalid_throw(cql, test_keyspace + "." + unique_name(),
|
||||
InvalidRequest,
|
||||
@@ -47,9 +41,6 @@ def testCountersOnCollections(cql, test_keyspace):
|
||||
InvalidRequest,
|
||||
"CREATE TABLE %s (k int PRIMARY KEY, m map<text, counter>)")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterUpdatesWithUnset(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter)") as table:
|
||||
# set up
|
||||
@@ -67,9 +58,6 @@ def testCounterUpdatesWithUnset(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT c FROM %s WHERE k = 10"), [1]) # no change to the counter value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterFiltering(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a counter)") as table:
|
||||
for i in range(10):
|
||||
@@ -97,9 +85,6 @@ def testCounterFiltering(cql, test_keyspace):
|
||||
assert_rows_ignoring_order(execute(cql, table, "SELECT * FROM %s WHERE a = ? ALLOW FILTERING", 6),
|
||||
[6, 6], [10, 6])
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterFilteringWithNull(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a counter, b counter)") as table:
|
||||
execute(cql, table, "UPDATE %s SET a = a + ? WHERE k = ?", 1, 1)
|
||||
@@ -120,9 +105,6 @@ def testCounterFilteringWithNull(cql, test_keyspace):
|
||||
"SELECT * FROM %s WHERE b = null ALLOW FILTERING")
|
||||
|
||||
# Test for the validation bug of CASSANDRA-9395.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testProhibitReversedCounterAsPartOfPrimaryKey(cql, test_keyspace):
|
||||
# The Cassandra message and Scylla message differ slightly -
|
||||
# counter type is not supported for PRIMARY KEY column 'a'"
|
||||
@@ -134,9 +116,6 @@ def testProhibitReversedCounterAsPartOfPrimaryKey(cql, test_keyspace):
|
||||
"CREATE TABLE %s (a counter, b int, PRIMARY KEY (b, a)) WITH CLUSTERING ORDER BY (a desc);")
|
||||
|
||||
# Check that a counter batch works as intended
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterBatch(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(userid int, url text, total counter, PRIMARY KEY (userid, url))") as table:
|
||||
# Ensure we handle updates to the same CQL row in the same partition properly
|
||||
|
||||
@@ -282,9 +282,6 @@ def testIndexOnCompoundRowKey(cql, test_keyspace):
|
||||
["t", 1, 4, 3])
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.secondary_index_counters()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testIndexOnCountersInvalid(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter)") as table:
|
||||
assert_invalid(cql, table, "CREATE INDEX ON %s(c)")
|
||||
|
||||
@@ -126,9 +126,6 @@ def testCounters(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT total FROM %s WHERE userid = 1 AND url = 'http://foo.com'"),
|
||||
[1])
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterFiltering(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a COUNTER) WITH COMPACT STORAGE") as table:
|
||||
for i in range(10):
|
||||
@@ -171,9 +168,6 @@ def testCounterFiltering(cql, test_keyspace):
|
||||
[10, 6])
|
||||
|
||||
# Test for the bug of CASSANDRA-11726.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterAndColumnSelection(cql, test_keyspace):
|
||||
for compactStorageClause in ["", " WITH COMPACT STORAGE"]:
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter) " + compactStorageClause) as table:
|
||||
@@ -188,9 +182,6 @@ def testCounterAndColumnSelection(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT k FROM %s"), [0])
|
||||
|
||||
# Check that a counter batch works as intended
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testCounterBatch(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(userid int, url text, total counter, PRIMARY KEY (userid, url)) WITH COMPACT STORAGE") as table:
|
||||
# Ensure we handle updates to the same CQL row in the same partition properly
|
||||
|
||||
@@ -1649,9 +1649,6 @@ def executeFilteringOnly(cql, table, statement):
|
||||
assert_invalid(cql, table, statement)
|
||||
return execute_without_paging(cql, table, statement + " ALLOW FILTERING")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testAllowFilteringOnPartitionKeyWithCounters(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a int, b int, c int, cnt counter, PRIMARY KEY ((a, b), c))") as table:
|
||||
execute(cql, table, "UPDATE %s SET cnt = cnt + ? WHERE a = ? AND b = ? AND c = ?", 14, 11, 12, 13)
|
||||
@@ -1975,9 +1972,6 @@ def testCustomIndexWithFiltering(cql, test_keyspace):
|
||||
assert_rows(executeFilteringOnly(cql, table, "SELECT * FROM %s WHERE c = 'b' AND d = 4"),
|
||||
["c", 3, "b", 4])
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def testFilteringWithCounters(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a int, b int, c int, cnt counter, PRIMARY KEY (a, b, c))") as table:
|
||||
execute(cql, table, "UPDATE %s SET cnt = cnt + ? WHERE a = ? AND b = ? AND c = ?", 14, 11, 12, 13)
|
||||
|
||||
@@ -119,9 +119,6 @@ def test_ignore_cast_to_the_same_type(cql, table1):
|
||||
assert cql.execute(f"SELECT CAST(p as int) FROM {table1} WHERE p={p}").one()._fields[0] == "p"
|
||||
|
||||
# Test casting a counter to various other types. Reproduces #14501.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def test_cast_from_counter(cql, table2):
|
||||
p = unique_key_int()
|
||||
# Set the counter to 1000 in two increments, to make it less trivial to
|
||||
@@ -156,9 +153,6 @@ def test_cast_from_counter(cql, table2):
|
||||
# "text" type. Since "varchar" is just an alias for "text", casting
|
||||
# to varchar should work too, but in Cassandra it doesn't so this test
|
||||
# is marked a Cassandra bug.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def test_cast_from_counter_to_varchar(cql, table2, cassandra_bug):
|
||||
p = unique_key_int()
|
||||
cql.execute(f'UPDATE {table2} SET c = c + 1000 WHERE p = {p}')
|
||||
|
||||
@@ -25,9 +25,6 @@ def table2(cql, test_keyspace):
|
||||
# Test that the function counterasblob() exists and works as expected -
|
||||
# same as bigintasblob on the same number (a counter is a 64-bit number).
|
||||
# Reproduces #14742
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def test_counter_to_blob(cql, table1, table2):
|
||||
p = unique_key_int()
|
||||
cql.execute(f'UPDATE {table1} SET i = 1000 WHERE p = {p}')
|
||||
@@ -72,9 +69,6 @@ def test_blobascounter_wrong_size(cql, table1):
|
||||
# Drop a table while there is a counter update operation in progress.
|
||||
# Verify the table waits for the operation to complete before it's destroyed.
|
||||
# Reproduces scylladb/scylla-enterprise#4475
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
|
||||
indirect=True)
|
||||
def test_counter_update_while_table_dropped(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, c counter") as table, \
|
||||
scylla_inject_error(cql, "apply_counter_update_delay_5s", one_shot=True):
|
||||
|
||||
@@ -190,10 +190,7 @@ def test_desc_scylla_keyspace(scylla_only, cql, random_seed):
|
||||
|
||||
# Test that `DESC TABLE {tbl}` contains appropriate create statement for table
|
||||
# This test compares the content of `system_schema.tables` and `system_schema.columns` tables.
|
||||
def test_desc_table(cql, test_keyspace, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_desc_table(cql, test_keyspace, random_seed):
|
||||
with new_random_table(cql, test_keyspace) as tbl:
|
||||
desc = cql.execute(f"DESC TABLE {tbl}")
|
||||
desc_stmt = desc.one().create_statement
|
||||
@@ -243,10 +240,7 @@ def test_desc_table(cql, test_keyspace, random_seed, has_tablets):
|
||||
|
||||
# This test compares the content of `system_schema.tables` and `system_schema.columns` tables
|
||||
# when providing tablet options to CREATE TABLE.
|
||||
def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed):
|
||||
tablet_options = {
|
||||
'min_tablet_count': '100',
|
||||
'min_per_shard_tablet_count': '0.8', # Verify that a floating point value works for this hint
|
||||
@@ -269,10 +263,7 @@ def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed, has_tab
|
||||
# Test that `DESC TABLE {tbl}` contains appropriate create statement for table
|
||||
# This test compares the content of `system_schema.scylla_tables` tables, thus the test
|
||||
# is `scylla_only`.
|
||||
def test_desc_scylla_table(scylla_only, cql, test_keyspace, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_desc_scylla_table(scylla_only, cql, test_keyspace, random_seed):
|
||||
with new_random_table(cql, test_keyspace) as tbl:
|
||||
desc = cql.execute(f"DESC TABLE {tbl}")
|
||||
desc_stmt = desc.one().create_statement
|
||||
@@ -426,10 +417,7 @@ def test_desc_table_internals(cql, test_keyspace):
|
||||
assert f"ALTER TABLE {tbl} ADD b int" in desc_internals
|
||||
|
||||
# Test that `DESC KEYSPACE {ks}` contains not only keyspace create statement but also for its elements
|
||||
def test_desc_keyspace_elements(cql, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_desc_keyspace_elements(cql, random_seed):
|
||||
with new_random_keyspace(cql) as ks:
|
||||
with new_random_type(cql, ks) as udt:
|
||||
with new_random_table(cql, ks, [udt]) as tbl:
|
||||
@@ -449,10 +437,7 @@ def test_desc_keyspace_elements(cql, random_seed, has_tablets):
|
||||
|
||||
# Test that `DESC SCHEMA` contains all information for user created keyspaces
|
||||
# and `DESC FULL SCHEMA` contains also information for system keyspaces
|
||||
def test_desc_schema(cql, test_keyspace, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_desc_schema(cql, test_keyspace, random_seed):
|
||||
with new_random_keyspace(cql) as ks:
|
||||
with new_random_table(cql, test_keyspace) as tbl1, new_random_table(cql, ks) as tbl2:
|
||||
desc = cql.execute("DESC SCHEMA")
|
||||
@@ -690,10 +675,7 @@ def test_view_desc_in_table_desc(cql, test_keyspace, cassandra_bug):
|
||||
# keyspace, table, view, index, UDT, UDF, UDA
|
||||
|
||||
# Cassandra compatibility require us to be able generic describe: keyspace, table, view, index.
|
||||
def test_generic_desc(cql, random_seed, has_tablets):
|
||||
if has_tablets: # issue #18180
|
||||
global counter_table_chance
|
||||
counter_table_chance = 0
|
||||
def test_generic_desc(cql, random_seed):
|
||||
with new_random_keyspace(cql) as ks:
|
||||
with new_random_table(cql, ks) as t1, new_test_table(cql, ks, "a int primary key, b int, c int") as tbl:
|
||||
cql.execute(f"CREATE INDEX idx ON {tbl}(b)")
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user