Compare commits
152 Commits
copilot/fi
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db0e209a9b | ||
|
|
4361e41728 | ||
|
|
1857721717 | ||
|
|
9d997458d7 | ||
|
|
8e7652d986 | ||
|
|
8e9ab7618e | ||
|
|
1288adb3a6 | ||
|
|
092be62fca | ||
|
|
c70656c0db | ||
|
|
bb142bdc10 | ||
|
|
b52055b7a1 | ||
|
|
5c7a9e89ea | ||
|
|
2e55f9521b | ||
|
|
64a0efe297 | ||
|
|
3b801f3d80 | ||
|
|
bafe2bbbbc | ||
|
|
1f5082d1ce | ||
|
|
8f7a6fd5eb | ||
|
|
0656a73c52 | ||
|
|
527124d363 | ||
|
|
0b30c111d6 | ||
|
|
260c9972b0 | ||
|
|
1c04ce3415 | ||
|
|
af8df987f5 | ||
|
|
09d59de736 | ||
|
|
ce04e2cb7d | ||
|
|
3367ffa14f | ||
|
|
ad1e5718ae | ||
|
|
27bf0fdf60 | ||
|
|
9459a58116 | ||
|
|
59f97d0b71 | ||
|
|
0a07c2cb19 | ||
|
|
080c55a115 | ||
|
|
93de570e33 | ||
|
|
680bfa9ab7 | ||
|
|
ed58815199 | ||
|
|
9c8812a154 | ||
|
|
aac49601c6 | ||
|
|
89364d3576 | ||
|
|
68ea778b6b | ||
|
|
087f739bf9 | ||
|
|
332b776e87 | ||
|
|
ff0e7ac853 | ||
|
|
5319759bdb | ||
|
|
55d9d5e7c2 | ||
|
|
7f08d0a6cf | ||
|
|
c406e1dd17 | ||
|
|
e85ab70054 | ||
|
|
41f8f6b571 | ||
|
|
31e4bb1bc3 | ||
|
|
be94aab207 | ||
|
|
a5be65785c | ||
|
|
5720dd52b8 | ||
|
|
aa2021888c | ||
|
|
a09c1b355e | ||
|
|
67e0c8e4b0 | ||
|
|
03d57bae80 | ||
|
|
76560ca095 | ||
|
|
8a11535a12 | ||
|
|
d727a086c5 | ||
|
|
d1274f01aa | ||
|
|
aa2065fe2e | ||
|
|
5c7eb2ac61 | ||
|
|
0621a8aee5 | ||
|
|
10db3f7c85 | ||
|
|
f6dde0aa4b | ||
|
|
207c273b29 | ||
|
|
6df48aacd7 | ||
|
|
45341ca246 | ||
|
|
1efb2eb174 | ||
|
|
320ef84367 | ||
|
|
01658f9fcb | ||
|
|
e56f14b9c5 | ||
|
|
5865dad0c9 | ||
|
|
388dfbe3ee | ||
|
|
92a603699e | ||
|
|
d998d9d418 | ||
|
|
6f6b3a26c4 | ||
|
|
4eb427976b | ||
|
|
94d49da8ec | ||
|
|
f9bc211966 | ||
|
|
4aff338282 | ||
|
|
8b7dce8334 | ||
|
|
2afd323838 | ||
|
|
f2f415a742 | ||
|
|
5c2d8bd273 | ||
|
|
45b9675d28 | ||
|
|
f1e1c7db4c | ||
|
|
5e1f32b3d4 | ||
|
|
99f2dd02bf | ||
|
|
76a6a059c8 | ||
|
|
6ff4910d96 | ||
|
|
d213953d0a | ||
|
|
f5e76d0fcb | ||
|
|
2819b8b755 | ||
|
|
245d27347b | ||
|
|
323a7b8c55 | ||
|
|
cd0bb11eef | ||
|
|
95d4206585 | ||
|
|
2bc0c9c45b | ||
|
|
de5a13db28 | ||
|
|
dc3c6c3090 | ||
|
|
83babc20e3 | ||
|
|
04b9e98ef8 | ||
|
|
de8c2a8196 | ||
|
|
dd2e8a2105 | ||
|
|
90fd618967 | ||
|
|
da8bd30a5b | ||
|
|
4e9a42f343 | ||
|
|
6db7481c7a | ||
|
|
62a5d4f932 | ||
|
|
f5319b06ae | ||
|
|
c191c31682 | ||
|
|
a4fd7019e3 | ||
|
|
e18072d4b8 | ||
|
|
7353aa5aa5 | ||
|
|
ec0b31b193 | ||
|
|
b5c3e2465f | ||
|
|
3cae4a21ab | ||
|
|
5c6335e029 | ||
|
|
de4975d181 | ||
|
|
1f73e18eaf | ||
|
|
931f9ca3db | ||
|
|
3775e8e49a | ||
|
|
f4d9513e0f | ||
|
|
5eeb1e3e76 | ||
|
|
989aa0b237 | ||
|
|
eba0a2cf72 | ||
|
|
3a9eb9b65f | ||
|
|
f75541b7b3 | ||
|
|
879db5855d | ||
|
|
22d3ee5670 | ||
|
|
2bdf792f8e | ||
|
|
2e2d1f17bb | ||
|
|
e9aba62cc5 | ||
|
|
a7d0cf6dd0 | ||
|
|
6e94c075e3 | ||
|
|
f90ca413a0 | ||
|
|
5e0f5f4b44 | ||
|
|
5d32fef3ae | ||
|
|
1b5c46a796 | ||
|
|
f2c5874fa9 | ||
|
|
4049dae0b2 | ||
|
|
8b83294c0f | ||
|
|
5930726b38 | ||
|
|
664cdd3d99 | ||
|
|
4ea6c51fb1 | ||
|
|
eb9babfd4a | ||
|
|
558f460517 | ||
|
|
a9f4024c1b | ||
|
|
6969918d31 | ||
|
|
d69edfcd34 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.4.0-dev
|
||||
VERSION=2025.4.0-rc4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -136,6 +136,7 @@ future<> controller::start_server() {
|
||||
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds,
|
||||
_config.alternator_enforce_authorization,
|
||||
_config.alternator_warn_authorization,
|
||||
&_memory_limiter.local().get_semaphore(),
|
||||
_config.max_concurrent_requests_per_shard);
|
||||
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "cdc/cdc_options.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "mutation/tombstone.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
@@ -243,7 +244,8 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
_cdc_metadata(cdc_metadata),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
|
||||
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
|
||||
_ssg(ssg),
|
||||
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
|
||||
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
|
||||
@@ -879,15 +881,37 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
co_return rjson::print(std::move(response));
|
||||
}
|
||||
|
||||
// This function increments the authorization_failures counter, and may also
|
||||
// log a warn-level message and/or throw an access_denied exception, depending
|
||||
// on what enforce_authorization and warn_authorization are set to.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authorization_error must explicitly return after calling this function.
|
||||
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
|
||||
stats.authorization_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("alternator_warn_authorization=true: {}", msg);
|
||||
}
|
||||
throw api_error::access_denied(std::move(msg));
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization,
|
||||
const service::client_state& client_state,
|
||||
const schema_ptr& schema,
|
||||
auth::permission permission_to_check) {
|
||||
if (!enforce_authorization) {
|
||||
auth::permission permission_to_check,
|
||||
alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
// Unfortunately, the fix for issue #23218 did not modify the function
|
||||
@@ -902,31 +926,33 @@ future<> verify_permission(
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(fmt::format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
|
||||
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
if (!client_state.user() || !client_state.user()->name ||
|
||||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
// Using exceptions for errors makes this function faster in the
|
||||
// success path (when the operation is allowed).
|
||||
throw api_error::access_denied(format(
|
||||
"{} access on table {}.{} is denied to role {}",
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"{} access on table {}.{} is denied to role {}, client address {}",
|
||||
auth::permissions::to_string(permission_to_check),
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to verify_permission() above, but just for CREATE operations.
|
||||
// Those do not operate on any specific table, so require permissions on
|
||||
// ALL KEYSPACES instead of any specific table.
|
||||
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
|
||||
if (!enforce_authorization) {
|
||||
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
auto resource = auth::resource(auth::resource_kind::data);
|
||||
@@ -935,7 +961,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
throw api_error::access_denied(format(
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"CREATE access on ALL KEYSPACES is denied to role {}", username));
|
||||
}
|
||||
}
|
||||
@@ -952,7 +978,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
@@ -1290,7 +1316,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
|
||||
if (tags->Size() < 1) {
|
||||
co_return api_error::validation("The number of tags must be at least 1") ;
|
||||
}
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
|
||||
});
|
||||
@@ -1311,7 +1337,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
|
||||
|
||||
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
|
||||
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
|
||||
});
|
||||
@@ -1496,8 +1522,25 @@ bytes extract_from_attrs_column_computation::compute_value(const schema&, const
|
||||
on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value called without row");
|
||||
}
|
||||
|
||||
// Because `CreateTable` request creates GSI/LSI together with the base table (so the base table is empty),
|
||||
// we can skip view building process and immediately mark the view as built on all nodes.
|
||||
//
|
||||
// However, we can do this only for tablet-based views because `view_building_worker` will automatically propagate
|
||||
// this information to `system.built_views` table (see `view_building_worker::update_built_views()`).
|
||||
// For vnode-based views, `view_builder` will process the view and mark it as built.
|
||||
static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out, std::vector<schema_ptr> schemas, api::timestamp_type ts, service::storage_proxy& sp) {
|
||||
auto token_metadata = sp.get_token_metadata_ptr();
|
||||
for (auto& schema: schemas) {
|
||||
if (schema->is_view()) {
|
||||
for (auto& host_id: token_metadata->get_topology().get_all_host_ids()) {
|
||||
auto view_status_mut = co_await sp.system_keyspace().make_view_build_status_mutation(ts, {schema->ks_name(), schema->cf_name()}, host_id, db::view::build_status::SUCCESS);
|
||||
out.push_back(std::move(view_status_mut));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
@@ -1703,7 +1746,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, client_state);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1754,6 +1797,9 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
schemas.push_back(view_builder.build());
|
||||
}
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
|
||||
if (ksm->uses_tablets()) {
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
|
||||
}
|
||||
|
||||
// If a role is allowed to create a table, we must give it permissions to
|
||||
// use (and eventually delete) the specific table it just created (and
|
||||
@@ -1800,9 +1846,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1855,7 +1901,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
verify_billing_mode(request);
|
||||
}
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
schema_ptr schema;
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -2026,7 +2072,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
|
||||
for (view_ptr view : new_views) {
|
||||
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
|
||||
@@ -2789,7 +2835,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -2892,7 +2938,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -3163,7 +3209,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
|
||||
}
|
||||
for (const auto& b : mutation_builders) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
|
||||
}
|
||||
// If alternator_force_read_before_write is true we will first get the previous item size
|
||||
// and only then do send the mutation.
|
||||
@@ -3636,16 +3682,16 @@ future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schem
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
||||
uint64_t& rcu_half_units) {
|
||||
noncopyable_function<void(uint64_t)> item_callback) {
|
||||
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
||||
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
||||
auto result_set = builder.build();
|
||||
std::vector<rjson::value> ret;
|
||||
for (auto& result_row : result_set->rows()) {
|
||||
rjson::value item = rjson::empty_object();
|
||||
rcu_consumed_capacity_counter consumed_capacity;
|
||||
describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes);
|
||||
rcu_half_units += consumed_capacity.get_half_units();
|
||||
uint64_t item_length_in_bytes = 0;
|
||||
describe_single_item(*selection, result_row, *attrs_to_get, item, &item_length_in_bytes);
|
||||
item_callback(item_length_in_bytes);
|
||||
ret.push_back(std::move(item));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
@@ -4365,7 +4411,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
|
||||
auto cas_shard = op->shard_for_execute(needs_read_before_write);
|
||||
|
||||
@@ -4475,7 +4521,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
|
||||
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
|
||||
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
|
||||
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
service::storage_proxy::coordinator_query_result qr =
|
||||
co_await _proxy.query(
|
||||
schema, std::move(command), std::move(partition_ranges), cl,
|
||||
@@ -4584,7 +4630,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
std::vector<std::vector<uint64_t>> responses_sizes;
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
@@ -4604,7 +4649,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
|
||||
for (const table_requests& tr : requests) {
|
||||
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
@@ -4612,11 +4657,10 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
responses_sizes.resize(requests.size());
|
||||
size_t responses_sizes_pos = 0;
|
||||
for (const auto& rs : requests) {
|
||||
responses_sizes[responses_sizes_pos].resize(rs.requests.size());
|
||||
size_t pos = 0;
|
||||
std::vector<uint64_t> consumed_rcu_half_units_per_table(requests.size());
|
||||
for (size_t i = 0; i < requests.size(); i++) {
|
||||
const table_requests& rs = requests[i];
|
||||
bool is_quorum = rs.cl == db::consistency_level::LOCAL_QUORUM;
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_histogram.add(rs.requests.size());
|
||||
for (const auto &r : rs.requests) {
|
||||
@@ -4639,16 +4683,17 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
command->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
const auto item_callback = [is_quorum, &rcus_per_table = consumed_rcu_half_units_per_table[i]](uint64_t size) {
|
||||
rcus_per_table += rcu_consumed_capacity_counter::get_half_units(size, is_quorum);
|
||||
};
|
||||
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
|
||||
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
|
||||
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, item_callback = std::move(item_callback)] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
|
||||
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size);
|
||||
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), std::move(item_callback));
|
||||
});
|
||||
pos++;
|
||||
response_futures.push_back(std::move(f));
|
||||
}
|
||||
responses_sizes_pos++;
|
||||
}
|
||||
|
||||
// Wait for all requests to complete, and then return the response.
|
||||
@@ -4660,14 +4705,11 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Responses", rjson::empty_object());
|
||||
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
|
||||
size_t rcu_half_units;
|
||||
auto fut_it = response_futures.begin();
|
||||
responses_sizes_pos = 0;
|
||||
rjson::value consumed_capacity = rjson::empty_array();
|
||||
for (const auto& rs : requests) {
|
||||
for (size_t i = 0; i < requests.size(); i++) {
|
||||
const table_requests& rs = requests[i];
|
||||
std::string table = table_name(*rs.schema);
|
||||
size_t pos = 0;
|
||||
rcu_half_units = 0;
|
||||
for (const auto &r : rs.requests) {
|
||||
auto& pk = r.first;
|
||||
auto& cks = r.second;
|
||||
@@ -4682,7 +4724,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
for (rjson::value& json : results) {
|
||||
rjson::push_back(response["Responses"][table], std::move(json));
|
||||
}
|
||||
rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM);
|
||||
} catch(...) {
|
||||
eptr = std::current_exception();
|
||||
// This read of potentially several rows in one partition,
|
||||
@@ -4706,8 +4747,8 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
|
||||
}
|
||||
}
|
||||
pos++;
|
||||
}
|
||||
uint64_t rcu_half_units = consumed_rcu_half_units_per_table[i];
|
||||
_stats.rcu_half_units_total += rcu_half_units;
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->rcu_half_units_total += rcu_half_units;
|
||||
@@ -4717,7 +4758,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::add(entry, "CapacityUnits", rcu_half_units*0.5);
|
||||
rjson::push_back(consumed_capacity, std::move(entry));
|
||||
}
|
||||
responses_sizes_pos++;
|
||||
}
|
||||
|
||||
if (should_add_rcu) {
|
||||
@@ -5069,10 +5109,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
filter filter,
|
||||
query::partition_slice::option_set custom_opts,
|
||||
service::client_state& client_state,
|
||||
cql3::cql_stats& cql_stats,
|
||||
alternator::stats& stats,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit,
|
||||
bool enforce_authorization) {
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization) {
|
||||
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
|
||||
|
||||
tracing::trace(trace_state, "Performing a database query");
|
||||
@@ -5099,7 +5140,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
|
||||
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
|
||||
|
||||
auto regular_columns =
|
||||
table_schema->regular_columns() | std::views::transform(&column_definition::id)
|
||||
@@ -5134,10 +5175,10 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
if (paging_state) {
|
||||
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
|
||||
}
|
||||
if (has_filter){
|
||||
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
if (has_filter) {
|
||||
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
|
||||
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
|
||||
cql_stats.filtered_rows_matched_total += size;
|
||||
stats.cql_stats.filtered_rows_matched_total += size;
|
||||
}
|
||||
if (opt_items) {
|
||||
if (opt_items->size() >= max_items_for_rapidjson_array) {
|
||||
@@ -5261,7 +5302,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
||||
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
|
||||
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
|
||||
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
|
||||
@@ -5742,7 +5783,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
||||
query::partition_slice::option_set opts;
|
||||
opts.set_if<query::partition_slice::option::reversed>(!forward);
|
||||
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
|
||||
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
|
||||
@@ -139,6 +139,7 @@ class executor : public peering_sharded_service<executor> {
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
// An smp_service_group to be used for limiting the concurrency when
|
||||
// forwarding Alternator request between shards - if necessary for LWT.
|
||||
smp_service_group _ssg;
|
||||
@@ -228,12 +229,15 @@ public:
|
||||
const std::optional<attrs_to_get>&,
|
||||
uint64_t* = nullptr);
|
||||
|
||||
// Converts a multi-row selection result to JSON compatible with DynamoDB.
|
||||
// For each row, this method calls item_callback, which takes the size of
|
||||
// the item as the parameter.
|
||||
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice&& slice,
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
||||
uint64_t& rcu_half_units);
|
||||
noncopyable_function<void(uint64_t)> item_callback = {});
|
||||
|
||||
static void describe_single_item(const cql3::selection::selection&,
|
||||
const std::vector<managed_bytes_opt>&,
|
||||
@@ -261,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
|
||||
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
|
||||
|
||||
/**
|
||||
* Make return type for serializing the object "streamed",
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "utils/aws_sigv4.hh"
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -270,24 +271,57 @@ protected:
|
||||
}
|
||||
};
|
||||
|
||||
// This function increments the authentication_failures counter, and may also
|
||||
// log a warn-level message and/or throw an exception, depending on what
|
||||
// enforce_authorization and warn_authorization are set to.
|
||||
// The username and client address are only used for logging purposes -
|
||||
// they are not included in the error message returned to the client, since
|
||||
// the client knows who it is.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authentication_error must explicitly return after calling this function.
|
||||
template<typename Exception>
|
||||
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
|
||||
stats.authentication_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
throw std::move(e);
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
|
||||
if (!_enforce_authorization) {
|
||||
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
|
||||
slogger.debug("Skipping authorization");
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto host_it = req._headers.find("Host");
|
||||
if (host_it == req._headers.end()) {
|
||||
throw api_error::invalid_signature("Host header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature("Host header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
auto authorization_it = req._headers.find("Authorization");
|
||||
if (authorization_it == req._headers.end()) {
|
||||
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string host = host_it->second;
|
||||
std::string_view authorization_header = authorization_it->second;
|
||||
auto pos = authorization_header.find_first_of(' ');
|
||||
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
|
||||
"", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
authorization_header.remove_prefix(pos+1);
|
||||
std::string credential;
|
||||
@@ -322,7 +356,9 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
|
||||
std::vector<std::string_view> credential_split = split(credential, '/');
|
||||
if (credential_split.size() != 5) {
|
||||
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
|
||||
return make_ready_future<std::string>();
|
||||
}
|
||||
std::string user(credential_split[0]);
|
||||
std::string datestamp(credential_split[1]);
|
||||
@@ -346,7 +382,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
|
||||
return get_key_from_roles(proxy, as, std::move(username));
|
||||
};
|
||||
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
|
||||
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
|
||||
user = std::move(user),
|
||||
host = std::move(host),
|
||||
datestamp = std::move(datestamp),
|
||||
@@ -354,18 +390,32 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
signed_headers_map = std::move(signed_headers_map),
|
||||
region = std::move(region),
|
||||
service = std::move(service),
|
||||
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
|
||||
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
|
||||
key_cache::value_ptr key_ptr(nullptr);
|
||||
try {
|
||||
key_ptr = key_ptr_fut.get();
|
||||
} catch (const api_error& e) {
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
e, user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
std::string signature;
|
||||
try {
|
||||
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
|
||||
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
|
||||
} catch (const std::exception& e) {
|
||||
throw api_error::invalid_signature(e.what());
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
|
||||
if (signature != std::string_view(user_signature)) {
|
||||
_key_cache.remove(user);
|
||||
throw api_error::unrecognized_client("The security token included in the request is invalid.");
|
||||
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
|
||||
api_error::unrecognized_client("wrong signature"),
|
||||
user, req.get_client_address());
|
||||
return std::string();
|
||||
}
|
||||
return user;
|
||||
});
|
||||
@@ -597,9 +647,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
_enforce_authorization = std::move(enforce_authorization);
|
||||
_warn_authorization = std::move(warn_authorization);
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
if (!port && !https_port) {
|
||||
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
|
||||
|
||||
@@ -43,6 +43,7 @@ class server : public peering_sharded_service<server> {
|
||||
|
||||
key_cache _key_cache;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
|
||||
named_gate _pending_requests;
|
||||
// In some places we will need a CQL updateable_timeout_config object even
|
||||
@@ -94,7 +95,8 @@ public:
|
||||
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
// get_client_data() is called (on each shard separately) when the virtual
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
|
||||
@@ -176,6 +176,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
|
||||
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
|
||||
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
|
||||
});
|
||||
|
||||
// Only register the following metrics for the global metrics, not per-table
|
||||
if (!has_table) {
|
||||
metrics.add_group("alternator", {
|
||||
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
|
||||
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
|
||||
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {
|
||||
|
||||
@@ -79,6 +79,17 @@ public:
|
||||
utils::estimated_histogram batch_get_item_histogram{22}; // a histogram that covers the range 1 - 100
|
||||
utils::estimated_histogram batch_write_item_histogram{22}; // a histogram that covers the range 1 - 100
|
||||
} api_operations;
|
||||
// Count of authentication and authorization failures, counted if either
|
||||
// alternator_enforce_authorization or alternator_warn_authorization are
|
||||
// set to true. If both are false, no authentication or authorization
|
||||
// checks are performed, so failures are not recognized or counted.
|
||||
// "authentication" failure means the request was not signed with a valid
|
||||
// user and key combination. "authorization" failure means the request was
|
||||
// authenticated to a valid user - but this user did not have permissions
|
||||
// to perform the operation (considering RBAC settings and the user's
|
||||
// superuser status).
|
||||
uint64_t authentication_failures = 0;
|
||||
uint64_t authorization_failures = 0;
|
||||
// Miscellaneous event counters
|
||||
uint64_t total_operations = 0;
|
||||
uint64_t unsupported_operations = 0;
|
||||
|
||||
@@ -828,7 +828,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
||||
|
||||
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
|
||||
partition_key pk = iter.shard.id.to_partition_key(*schema);
|
||||
|
||||
@@ -94,7 +94,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
}
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
if (enabled) {
|
||||
if (tags_map.contains(TTL_TAG_KEY)) {
|
||||
|
||||
@@ -2924,7 +2924,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -233,9 +233,9 @@ future<role_set> ldap_role_manager::query_granted(std::string_view grantee_name,
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map>
|
||||
ldap_role_manager::query_all_directly_granted() {
|
||||
ldap_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
role_to_directly_granted_map result;
|
||||
auto roles = co_await query_all();
|
||||
auto roles = co_await query_all(qs);
|
||||
for (auto& role: roles) {
|
||||
auto granted_set = co_await query_granted(role, recursive_role_query::no);
|
||||
for (auto& granted: granted_set) {
|
||||
@@ -247,8 +247,8 @@ ldap_role_manager::query_all_directly_granted() {
|
||||
co_return result;
|
||||
}
|
||||
|
||||
future<role_set> ldap_role_manager::query_all() {
|
||||
return _std_mgr.query_all();
|
||||
future<role_set> ldap_role_manager::query_all(::service::query_state& qs) {
|
||||
return _std_mgr.query_all(qs);
|
||||
}
|
||||
|
||||
future<> ldap_role_manager::create_role(std::string_view role_name) {
|
||||
@@ -311,12 +311,12 @@ future<bool> ldap_role_manager::can_login(std::string_view role_name) {
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> ldap_role_manager::get_attribute(
|
||||
std::string_view role_name, std::string_view attribute_name) {
|
||||
return _std_mgr.get_attribute(role_name, attribute_name);
|
||||
std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return _std_mgr.get_attribute(role_name, attribute_name, qs);
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name) {
|
||||
return _std_mgr.query_attribute_for_all(attribute_name);
|
||||
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return _std_mgr.query_attribute_for_all(attribute_name, qs);
|
||||
}
|
||||
|
||||
future<> ldap_role_manager::set_attribute(
|
||||
|
||||
@@ -75,9 +75,9 @@ class ldap_role_manager : public role_manager {
|
||||
|
||||
future<role_set> query_granted(std::string_view, recursive_role_query) override;
|
||||
|
||||
future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
future<role_set> query_all() override;
|
||||
future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
future<bool> exists(std::string_view) override;
|
||||
|
||||
@@ -85,9 +85,9 @@ class ldap_role_manager : public role_manager {
|
||||
|
||||
future<bool> can_login(std::string_view) override;
|
||||
|
||||
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view) override;
|
||||
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view, ::service::query_state&) override;
|
||||
|
||||
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view) override;
|
||||
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view, ::service::query_state&) override;
|
||||
|
||||
future<> set_attribute(std::string_view, std::string_view, std::string_view, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -78,11 +78,11 @@ future<role_set> maintenance_socket_role_manager::query_granted(std::string_view
|
||||
return operation_not_supported_exception<role_set>("QUERY GRANTED");
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted() {
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
|
||||
}
|
||||
|
||||
future<role_set> maintenance_socket_role_manager::query_all() {
|
||||
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_set>("QUERY ALL");
|
||||
}
|
||||
|
||||
@@ -98,11 +98,11 @@ future<bool> maintenance_socket_role_manager::can_login(std::string_view role_na
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name) {
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
|
||||
}
|
||||
|
||||
|
||||
@@ -53,9 +53,9 @@ public:
|
||||
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
|
||||
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
virtual future<role_set> query_all() override;
|
||||
virtual future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) override;
|
||||
|
||||
@@ -63,9 +63,9 @@ public:
|
||||
|
||||
virtual future<bool> can_login(std::string_view role_name) override;
|
||||
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -36,7 +36,8 @@ static const std::unordered_map<sstring, auth::permission> permission_names({
|
||||
{"MODIFY", auth::permission::MODIFY},
|
||||
{"AUTHORIZE", auth::permission::AUTHORIZE},
|
||||
{"DESCRIBE", auth::permission::DESCRIBE},
|
||||
{"EXECUTE", auth::permission::EXECUTE}});
|
||||
{"EXECUTE", auth::permission::EXECUTE},
|
||||
{"VECTOR_SEARCH_INDEXING", auth::permission::VECTOR_SEARCH_INDEXING}});
|
||||
|
||||
const sstring& auth::permissions::to_string(permission p) {
|
||||
for (auto& v : permission_names) {
|
||||
|
||||
@@ -33,6 +33,7 @@ enum class permission {
|
||||
// data access
|
||||
SELECT, // required for SELECT.
|
||||
MODIFY, // required for INSERT, UPDATE, DELETE, TRUNCATE.
|
||||
VECTOR_SEARCH_INDEXING, // required for SELECT from tables with vector indexes if SELECT permission is not granted.
|
||||
|
||||
// permission management
|
||||
AUTHORIZE, // required for GRANT and REVOKE.
|
||||
@@ -54,7 +55,8 @@ typedef enum_set<
|
||||
permission::MODIFY,
|
||||
permission::AUTHORIZE,
|
||||
permission::DESCRIBE,
|
||||
permission::EXECUTE>> permission_set;
|
||||
permission::EXECUTE,
|
||||
permission::VECTOR_SEARCH_INDEXING>> permission_set;
|
||||
|
||||
bool operator<(const permission_set&, const permission_set&);
|
||||
|
||||
|
||||
@@ -41,22 +41,26 @@ static const std::unordered_map<resource_kind, std::size_t> max_parts{
|
||||
{resource_kind::functions, 2}};
|
||||
|
||||
static permission_set applicable_permissions(const data_resource_view& dv) {
|
||||
if (dv.table()) {
|
||||
return permission_set::of<
|
||||
|
||||
// We only support VECTOR_SEARCH_INDEXING permission for ALL KEYSPACES.
|
||||
|
||||
auto set = permission_set::of<
|
||||
permission::ALTER,
|
||||
permission::DROP,
|
||||
permission::SELECT,
|
||||
permission::MODIFY,
|
||||
permission::AUTHORIZE>();
|
||||
|
||||
if (!dv.table()) {
|
||||
set.add(permission_set::of<permission::CREATE>());
|
||||
}
|
||||
|
||||
return permission_set::of<
|
||||
permission::CREATE,
|
||||
permission::ALTER,
|
||||
permission::DROP,
|
||||
permission::SELECT,
|
||||
permission::MODIFY,
|
||||
permission::AUTHORIZE>();
|
||||
if (!dv.table() && !dv.keyspace()) {
|
||||
set.add(permission_set::of<permission::VECTOR_SEARCH_INDEXING>());
|
||||
}
|
||||
|
||||
return set;
|
||||
|
||||
}
|
||||
|
||||
static permission_set applicable_permissions(const role_resource_view& rv) {
|
||||
|
||||
@@ -17,12 +17,17 @@
|
||||
#include <seastar/core/format.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "cql3/description.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace service {
|
||||
class query_state;
|
||||
};
|
||||
|
||||
namespace auth {
|
||||
|
||||
struct role_config final {
|
||||
@@ -167,9 +172,9 @@ public:
|
||||
/// (role2, role3)
|
||||
/// }
|
||||
///
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() = 0;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
virtual future<role_set> query_all() = 0;
|
||||
virtual future<role_set> query_all(::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) = 0;
|
||||
|
||||
@@ -186,12 +191,12 @@ public:
|
||||
///
|
||||
/// \returns the value of the named attribute, if one is set.
|
||||
///
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) = 0;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
///
|
||||
/// \returns a mapping of each role's value for the named attribute, if one is set for the role.
|
||||
///
|
||||
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name) = 0;
|
||||
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
/// Sets `attribute_name` with `attribute_value` for `role_name`.
|
||||
/// \returns an exceptional future with nonexistant_role if the role does not exist.
|
||||
|
||||
@@ -231,6 +231,17 @@ struct command_desc {
|
||||
} type_ = type::OTHER;
|
||||
};
|
||||
|
||||
/// Similar to command_desc, but used in cases where multiple permissions allow the access to the resource.
|
||||
struct command_desc_with_permission_set {
|
||||
permission_set permission;
|
||||
const ::auth::resource& resource;
|
||||
enum class type {
|
||||
ALTER_WITH_OPTS,
|
||||
ALTER_SYSTEM_WITH_ALLOWED_OPTS,
|
||||
OTHER
|
||||
} type_ = type::OTHER;
|
||||
};
|
||||
|
||||
///
|
||||
/// Protected resources cannot be modified even if the performer has permissions to do so.
|
||||
///
|
||||
|
||||
@@ -663,21 +663,30 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
|
||||
});
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted() {
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{}",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
qs,
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
|
||||
role_to_directly_granted_map roles_map;
|
||||
co_await _qp.query_internal(query, [&roles_map] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
roles_map.insert({row.get_as<sstring>("member"), row.get_as<sstring>("role")});
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
std::transform(
|
||||
results->begin(),
|
||||
results->end(),
|
||||
std::inserter(roles_map, roles_map.begin()),
|
||||
[] (const cql3::untyped_result_set_row& row) {
|
||||
return std::make_pair(row.get_as<sstring>("member"), row.get_as<sstring>("role")); }
|
||||
);
|
||||
|
||||
co_return roles_map;
|
||||
}
|
||||
|
||||
future<role_set> standard_role_manager::query_all() {
|
||||
future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT {} FROM {}.{}",
|
||||
meta::roles_table::role_col_name,
|
||||
get_auth_ks_name(_qp),
|
||||
@@ -695,7 +704,7 @@ future<role_set> standard_role_manager::query_all() {
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_query_state(),
|
||||
qs,
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
|
||||
role_set roles;
|
||||
@@ -727,11 +736,11 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) {
|
||||
});
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
const auto result_set = co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
if (!result_set->empty()) {
|
||||
const cql3::untyped_result_set_row &row = result_set->one();
|
||||
co_return std::optional<sstring>(row.get_as<sstring>("value"));
|
||||
@@ -739,11 +748,11 @@ future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_
|
||||
co_return std::optional<sstring>{};
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) {
|
||||
return query_all().then([this, attribute_name] (role_set roles) {
|
||||
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) {
|
||||
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) {
|
||||
return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
|
||||
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return query_all(qs).then([this, attribute_name, &qs] (role_set roles) {
|
||||
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles), &qs] (attribute_vals &role_to_att_val) {
|
||||
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name, &qs] (sstring role) {
|
||||
return get_attribute(role, attribute_name, qs).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
|
||||
if (att_val) {
|
||||
role_to_att_val.emplace(std::move(role), std::move(*att_val));
|
||||
}
|
||||
@@ -788,7 +797,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
|
||||
future<std::vector<cql3::description>> standard_role_manager::describe_role_grants() {
|
||||
std::vector<cql3::description> result{};
|
||||
|
||||
const auto grants = co_await query_all_directly_granted();
|
||||
const auto grants = co_await query_all_directly_granted(internal_distributed_query_state());
|
||||
result.reserve(grants.size());
|
||||
|
||||
for (const auto& [grantee_role, granted_role] : grants) {
|
||||
|
||||
@@ -66,9 +66,9 @@ public:
|
||||
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
|
||||
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
virtual future<role_set> query_all() override;
|
||||
virtual future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) override;
|
||||
|
||||
@@ -76,9 +76,9 @@ public:
|
||||
|
||||
virtual future<bool> can_login(std::string_view role_name) override;
|
||||
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -1209,6 +1209,23 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
|
||||
auto s = db::system_keyspace::cdc_streams_state();
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s,
|
||||
data_value(table.uuid()).serialize_nonnull()
|
||||
));
|
||||
m.set_static_cell("timestamp", stream_ts, ts);
|
||||
|
||||
for (const auto& sid : stream_ids) {
|
||||
auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token()));
|
||||
m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
utils::chunked_vector<mutation>
|
||||
make_drop_table_streams_mutations(table_id table, api::timestamp_type ts) {
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
@@ -1235,32 +1252,50 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
|
||||
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
|
||||
}
|
||||
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
|
||||
if (tables) {
|
||||
for (auto table : *tables) {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
table_streams new_table_map;
|
||||
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
|
||||
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
|
||||
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
|
||||
};
|
||||
|
||||
append_stream(base_ts, std::move(base_stream_set));
|
||||
// if we already have a loaded streams map, and the base timestamp is unchanged, then read
|
||||
// the history entries starting from the latest one we have and append it to the existing map.
|
||||
// we can do it because we only append new rows with higher timestamps to the history table.
|
||||
std::optional<std::reference_wrapper<const committed_stream_set>> from_streams;
|
||||
std::optional<db_clock::time_point> from_ts;
|
||||
const auto& all_streams = _cdc_metadata.get_all_tablet_streams();
|
||||
if (auto it = all_streams.find(table); it != all_streams.end()) {
|
||||
const auto& current_map = *it->second;
|
||||
if (current_map.cbegin()->second.ts == base_ts) {
|
||||
const auto& latest_entry = current_map.crbegin()->second;
|
||||
from_streams = std::cref(latest_entry);
|
||||
from_ts = latest_entry.ts;
|
||||
}
|
||||
}
|
||||
|
||||
co_await _sys_ks.local().read_cdc_streams_history(table, [&] (table_id tid, db_clock::time_point ts, cdc_stream_diff diff) -> future<> {
|
||||
const auto& prev_stream_set = std::crbegin(new_table_map)->second.streams;
|
||||
if (!from_ts) {
|
||||
append_stream(base_ts, std::move(base_stream_set));
|
||||
}
|
||||
|
||||
co_await _sys_ks.local().read_cdc_streams_history(table, from_ts, [&] (table_id tid, db_clock::time_point ts, cdc_stream_diff diff) -> future<> {
|
||||
const auto& prev_stream_set = new_table_map.empty() ?
|
||||
from_streams->get().streams : std::crbegin(new_table_map)->second.streams;
|
||||
|
||||
append_stream(ts, co_await cdc::metadata::construct_next_stream_set(
|
||||
prev_stream_set, std::move(diff.opened_streams), diff.closed_streams));
|
||||
@@ -1272,7 +1307,11 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
|
||||
new_table_map_copy[ts] = entry;
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
svc._cdc_metadata.load_tablet_streams_map(table, std::move(new_table_map_copy));
|
||||
if (!from_ts) {
|
||||
svc._cdc_metadata.load_tablet_streams_map(table, std::move(new_table_map_copy));
|
||||
} else {
|
||||
svc._cdc_metadata.append_tablet_streams_map(table, std::move(new_table_map_copy));
|
||||
}
|
||||
}));
|
||||
|
||||
tables_to_process.erase(table);
|
||||
@@ -1306,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
|
||||
}
|
||||
}
|
||||
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
|
||||
auto table_it = all_tables.find(table);
|
||||
if (table_it == all_tables.end()) {
|
||||
@@ -1363,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<cdc::stream_id> new_streams;
|
||||
new_streams.reserve(new_tablet_map.tablet_count());
|
||||
utils::chunked_vector<cdc::stream_id> new_streams;
|
||||
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
|
||||
for (auto tid : new_tablet_map.tablet_ids()) {
|
||||
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -1386,4 +1425,113 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
muts.emplace_back(std::move(mut));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
|
||||
utils::chunked_vector<mutation> muts;
|
||||
muts.reserve(2);
|
||||
|
||||
auto gc_now = gc_clock::now();
|
||||
auto tombstone_ts = ts - 1;
|
||||
|
||||
{
|
||||
// write the new base stream set to cdc_streams_state
|
||||
auto s = db::system_keyspace::cdc_streams_state();
|
||||
mutation m(s, partition_key::from_single_value(*s,
|
||||
data_value(table.uuid()).serialize_nonnull()
|
||||
));
|
||||
m.partition().apply(tombstone(tombstone_ts, gc_now));
|
||||
m.set_static_cell("timestamp", data_value(base_ts), ts);
|
||||
|
||||
for (const auto& sid : base_stream_set) {
|
||||
co_await coroutine::maybe_yield();
|
||||
auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token()));
|
||||
m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts);
|
||||
}
|
||||
muts.emplace_back(std::move(m));
|
||||
}
|
||||
|
||||
{
|
||||
// remove all entries from cdc_streams_history up to the new base
|
||||
auto s = db::system_keyspace::cdc_streams_history();
|
||||
mutation m(s, partition_key::from_single_value(*s,
|
||||
data_value(table.uuid()).serialize_nonnull()
|
||||
));
|
||||
auto range = query::clustering_range::make_ending_with({
|
||||
clustering_key_prefix::from_single_value(*s, timestamp_type->decompose(base_ts)), true});
|
||||
auto bv = bound_view::from_range(range);
|
||||
m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_now}});
|
||||
muts.emplace_back(std::move(m));
|
||||
}
|
||||
|
||||
co_return std::move(muts);
|
||||
}
|
||||
|
||||
table_streams::const_iterator get_new_base_for_gc(const table_streams& streams_map, std::chrono::seconds ttl) {
|
||||
// find the most recent timestamp that is older than ttl_seconds, which will become the new base.
|
||||
// all streams with older timestamps can be removed because they are closed for more than ttl_seconds
|
||||
// (they are all replaced by streams with the newer timestamp).
|
||||
|
||||
auto ts_upper_bound = db_clock::now() - ttl;
|
||||
|
||||
auto it = streams_map.begin();
|
||||
while (it != streams_map.end()) {
|
||||
auto next_it = std::next(it);
|
||||
if (next_it == streams_map.end()) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto next_tp = next_it->second.ts;
|
||||
if (next_tp <= ts_upper_bound) {
|
||||
// the next timestamp is older than ttl_seconds, so the current one is obsolete
|
||||
it = next_it;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return it;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<mutation>> generation_service::garbage_collect_cdc_streams_for_table(table_id table, std::optional<std::chrono::seconds> ttl, api::timestamp_type ts) {
|
||||
const auto& table_streams = *_cdc_metadata.get_all_tablet_streams().at(table);
|
||||
|
||||
// if TTL is not provided by the caller then use the table's CDC TTL
|
||||
auto base_schema = cdc::get_base_table(_db, *_db.find_schema(table));
|
||||
ttl = ttl.or_else([&] -> std::optional<std::chrono::seconds> {
|
||||
auto ttl_seconds = base_schema->cdc_options().ttl();
|
||||
if (ttl_seconds > 0) {
|
||||
return std::chrono::seconds(ttl_seconds);
|
||||
} else {
|
||||
// ttl=0 means no ttl
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
if (!ttl) {
|
||||
co_return utils::chunked_vector<mutation>{};
|
||||
}
|
||||
|
||||
auto new_base_it = get_new_base_for_gc(table_streams, *ttl);
|
||||
if (new_base_it == table_streams.begin() || new_base_it == table_streams.end()) {
|
||||
// nothing to gc
|
||||
co_return utils::chunked_vector<mutation>{};
|
||||
}
|
||||
|
||||
for (auto it = table_streams.begin(); it != new_base_it; ++it) {
|
||||
cdc_log.info("Garbage collecting CDC stream metadata for table {}: removing generation {} because it is older than the CDC TTL of {} seconds",
|
||||
table, it->second.ts, *ttl);
|
||||
}
|
||||
|
||||
co_return co_await get_cdc_stream_gc_mutations(table, new_base_it->second.ts, new_base_it->second.streams, ts);
|
||||
}
|
||||
|
||||
future<> generation_service::garbage_collect_cdc_streams(utils::chunked_vector<canonical_mutation>& muts, api::timestamp_type ts) {
|
||||
for (auto table : _cdc_metadata.get_tables_with_cdc_tablet_streams()) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
auto table_muts = co_await garbage_collect_cdc_streams_for_table(table, std::nullopt, ts);
|
||||
for (auto&& m : table_muts) {
|
||||
muts.emplace_back(std::move(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
|
||||
|
||||
struct committed_stream_set {
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
|
||||
struct cdc_stream_diff {
|
||||
std::vector<stream_id> closed_streams;
|
||||
std::vector<stream_id> opened_streams;
|
||||
utils::chunked_vector<stream_id> closed_streams;
|
||||
utils::chunked_vector<stream_id> opened_streams;
|
||||
};
|
||||
|
||||
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
|
||||
@@ -220,8 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
|
||||
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
|
||||
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
|
||||
|
||||
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
|
||||
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -149,10 +149,13 @@ public:
|
||||
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);
|
||||
|
||||
future<utils::chunked_vector<mutation>> garbage_collect_cdc_streams_for_table(table_id table, std::optional<std::chrono::seconds> ttl, api::timestamp_type ts);
|
||||
future<> garbage_collect_cdc_streams(utils::chunked_vector<canonical_mutation>& muts, api::timestamp_type ts);
|
||||
|
||||
private:
|
||||
/* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose)
|
||||
* and start using it for CDC log writes if it's not obsolete.
|
||||
|
||||
@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
|
||||
}
|
||||
|
||||
static cdc::stream_id get_stream(
|
||||
const std::vector<cdc::stream_id>& streams,
|
||||
const utils::chunked_vector<cdc::stream_id>& streams,
|
||||
dht::token tok) {
|
||||
if (streams.empty()) {
|
||||
on_internal_error(cdc_log, "get_stream: streams empty");
|
||||
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
|
||||
return ret;
|
||||
}
|
||||
|
||||
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
auto now = api::new_timestamp();
|
||||
if (ts > now + get_generation_leeway().count()) {
|
||||
throw exceptions::invalid_request_exception(seastar::format(
|
||||
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
|
||||
return !it->second;
|
||||
}
|
||||
|
||||
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed) {
|
||||
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed) {
|
||||
|
||||
if (closed.size() == prev_stream_set.size()) {
|
||||
// all previous streams are closed, so the next stream set is just the opened streams.
|
||||
@@ -273,8 +273,8 @@ future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
// streams and removing the closed streams. we assume each stream set is
|
||||
// sorted by token, and the result is sorted as well.
|
||||
|
||||
std::vector<cdc::stream_id> next_stream_set;
|
||||
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
|
||||
utils::chunked_vector<cdc::stream_id> next_stream_set;
|
||||
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
|
||||
|
||||
auto next_prev = prev_stream_set.begin();
|
||||
auto next_closed = closed.begin();
|
||||
@@ -306,6 +306,10 @@ void cdc::metadata::load_tablet_streams_map(table_id tid, table_streams new_tabl
|
||||
_tablet_streams[tid] = make_lw_shared(std::move(new_table_map));
|
||||
}
|
||||
|
||||
void cdc::metadata::append_tablet_streams_map(table_id tid, table_streams new_table_map) {
|
||||
_tablet_streams[tid]->insert(std::make_move_iterator(new_table_map.begin()), std::make_move_iterator(new_table_map.end()));
|
||||
}
|
||||
|
||||
void cdc::metadata::remove_tablet_streams_map(table_id tid) {
|
||||
_tablet_streams.erase(tid);
|
||||
}
|
||||
@@ -314,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
|
||||
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
|
||||
}
|
||||
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
|
||||
std::vector<stream_id> closed, opened;
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
|
||||
utils::chunked_vector<stream_id> closed, opened;
|
||||
|
||||
auto before_it = before.begin();
|
||||
auto after_it = after.begin();
|
||||
|
||||
@@ -37,7 +37,9 @@ class metadata final {
|
||||
using container_t = std::map<api::timestamp_type, std::optional<topology_description>>;
|
||||
container_t _gens;
|
||||
|
||||
using table_streams_ptr = lw_shared_ptr<const table_streams>;
|
||||
// per-table streams map for tables in tablets-based keyspaces.
|
||||
// the streams map is shared with the virtual tables reader, hence we can only insert new entries to it, not erase.
|
||||
using table_streams_ptr = lw_shared_ptr<table_streams>;
|
||||
using tablet_streams_map = std::unordered_map<table_id, table_streams_ptr>;
|
||||
|
||||
tablet_streams_map _tablet_streams;
|
||||
@@ -47,7 +49,7 @@ class metadata final {
|
||||
|
||||
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
|
||||
|
||||
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
|
||||
public:
|
||||
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
|
||||
@@ -100,6 +102,7 @@ public:
|
||||
bool prepare(db_clock::time_point ts);
|
||||
|
||||
void load_tablet_streams_map(table_id tid, table_streams new_table_map);
|
||||
void append_tablet_streams_map(table_id tid, table_streams new_table_map);
|
||||
void remove_tablet_streams_map(table_id tid);
|
||||
|
||||
const tablet_streams_map& get_all_tablet_streams() const {
|
||||
@@ -108,14 +111,14 @@ public:
|
||||
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
static future<std::vector<stream_id>> construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed);
|
||||
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed);
|
||||
|
||||
static future<cdc_stream_diff> generate_stream_diff(
|
||||
const std::vector<stream_id>& before,
|
||||
const std::vector<stream_id>& after);
|
||||
const utils::chunked_vector<stream_id>& before,
|
||||
const utils::chunked_vector<stream_id>& after);
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -1506,7 +1506,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
|
||||
co_return;
|
||||
}
|
||||
auto num_runs_for_compaction = [&, this] -> future<size_t> {
|
||||
auto& cs = t.get_compaction_strategy();
|
||||
auto cs = t.get_compaction_strategy();
|
||||
auto desc = co_await cs.get_sstables_for_compaction(t, get_strategy_control());
|
||||
co_return std::ranges::size(desc.sstables
|
||||
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
|
||||
|
||||
@@ -804,9 +804,9 @@ compaction_strategy_state compaction_strategy_state::make(const compaction_strat
|
||||
case compaction_strategy_type::incremental:
|
||||
return compaction_strategy_state(default_empty_state{});
|
||||
case compaction_strategy_type::leveled:
|
||||
return compaction_strategy_state(leveled_compaction_strategy_state{});
|
||||
return compaction_strategy_state(seastar::make_shared<leveled_compaction_strategy_state>());
|
||||
case compaction_strategy_type::time_window:
|
||||
return compaction_strategy_state(time_window_compaction_strategy_state{});
|
||||
return compaction_strategy_state(seastar::make_shared<time_window_compaction_strategy_state>());
|
||||
default:
|
||||
throw std::runtime_error("strategy not supported");
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ namespace compaction {
|
||||
class compaction_strategy_state {
|
||||
public:
|
||||
struct default_empty_state {};
|
||||
using states_variant = std::variant<default_empty_state, leveled_compaction_strategy_state, time_window_compaction_strategy_state>;
|
||||
using states_variant = std::variant<default_empty_state, leveled_compaction_strategy_state_ptr, time_window_compaction_strategy_state_ptr>;
|
||||
private:
|
||||
states_variant _state;
|
||||
public:
|
||||
|
||||
@@ -14,12 +14,12 @@
|
||||
|
||||
namespace compaction {
|
||||
|
||||
leveled_compaction_strategy_state& leveled_compaction_strategy::get_state(compaction_group_view& table_s) const {
|
||||
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state>();
|
||||
leveled_compaction_strategy_state_ptr leveled_compaction_strategy::get_state(compaction_group_view& table_s) const {
|
||||
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state_ptr>();
|
||||
}
|
||||
|
||||
future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
auto& state = get_state(table_s);
|
||||
auto state = get_state(table_s);
|
||||
auto candidates = co_await control.candidates(table_s);
|
||||
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
|
||||
// we may want to store it in the strategy itself. However, the sstable
|
||||
@@ -27,10 +27,10 @@ future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_comp
|
||||
// sstable in it may be marked for deletion after compacted.
|
||||
// Currently, we create a new manifest whenever it's time for compaction.
|
||||
leveled_manifest manifest = leveled_manifest::create(table_s, candidates, _max_sstable_size_in_mb, _stcs_options);
|
||||
if (!state.last_compacted_keys) {
|
||||
generate_last_compacted_keys(state, manifest);
|
||||
if (!state->last_compacted_keys) {
|
||||
generate_last_compacted_keys(*state, manifest);
|
||||
}
|
||||
auto candidate = manifest.get_compaction_candidates(*state.last_compacted_keys, state.compaction_counter);
|
||||
auto candidate = manifest.get_compaction_candidates(*state->last_compacted_keys, state->compaction_counter);
|
||||
|
||||
if (!candidate.sstables.empty()) {
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
@@ -78,12 +78,12 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(comp
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(compaction_group_view& table_s, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
||||
auto& state = get_state(table_s);
|
||||
auto state = get_state(table_s);
|
||||
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
|
||||
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
|
||||
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
|
||||
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
|
||||
if (removed.empty() || added.empty() || !state.last_compacted_keys) {
|
||||
if (removed.empty() || added.empty() || !state->last_compacted_keys) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
@@ -99,16 +99,16 @@ void leveled_compaction_strategy::notify_completion(compaction_group_view& table
|
||||
}
|
||||
target_level = std::max(target_level, int(candidate->get_sstable_level()));
|
||||
}
|
||||
state.last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
|
||||
state->last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
|
||||
|
||||
for (int i = leveled_manifest::MAX_LEVELS - 1; i > 0; i--) {
|
||||
state.compaction_counter[i]++;
|
||||
state->compaction_counter[i]++;
|
||||
}
|
||||
state.compaction_counter[target_level] = 0;
|
||||
state->compaction_counter[target_level] = 0;
|
||||
|
||||
if (leveled_manifest::logger.level() == logging::log_level::debug) {
|
||||
for (auto j = 0U; j < state.compaction_counter.size(); j++) {
|
||||
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, state.compaction_counter[j]);
|
||||
for (auto j = 0U; j < state->compaction_counter.size(); j++) {
|
||||
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, state->compaction_counter[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ struct leveled_compaction_strategy_state {
|
||||
leveled_compaction_strategy_state();
|
||||
};
|
||||
|
||||
using leveled_compaction_strategy_state_ptr = seastar::shared_ptr<leveled_compaction_strategy_state>;
|
||||
|
||||
class leveled_compaction_strategy : public compaction_strategy_impl {
|
||||
static constexpr int32_t DEFAULT_MAX_SSTABLE_SIZE_IN_MB = 160;
|
||||
static constexpr auto SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
|
||||
@@ -45,7 +47,7 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
|
||||
private:
|
||||
int32_t calculate_max_sstable_size_in_mb(std::optional<sstring> option_value) const;
|
||||
|
||||
leveled_compaction_strategy_state& get_state(compaction_group_view& table_s) const;
|
||||
leveled_compaction_strategy_state_ptr get_state(compaction_group_view& table_s) const;
|
||||
public:
|
||||
static unsigned ideal_level_for_input(const std::vector<sstables::shared_sstable>& input, uint64_t max_sstable_size);
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstable_set_impl.hh"
|
||||
#include "compaction_strategy_state.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
#include <ranges>
|
||||
|
||||
@@ -22,8 +23,8 @@ extern logging::logger clogger;
|
||||
|
||||
using timestamp_type = api::timestamp_type;
|
||||
|
||||
time_window_compaction_strategy_state& time_window_compaction_strategy::get_state(compaction_group_view& table_s) const {
|
||||
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state>();
|
||||
time_window_compaction_strategy_state_ptr time_window_compaction_strategy::get_state(compaction_group_view& table_s) const {
|
||||
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state_ptr>();
|
||||
}
|
||||
|
||||
const std::unordered_map<sstring, std::chrono::seconds> time_window_compaction_strategy_options::valid_window_units = {
|
||||
@@ -335,7 +336,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<sstables::shared_
|
||||
|
||||
future<compaction_descriptor>
|
||||
time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
auto& state = get_state(table_s);
|
||||
auto state = get_state(table_s);
|
||||
auto compaction_time = gc_clock::now();
|
||||
auto candidates = co_await control.candidates(table_s);
|
||||
|
||||
@@ -344,7 +345,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
}
|
||||
|
||||
auto now = db_clock::now();
|
||||
if (now - state.last_expired_check > _options.expired_sstable_check_frequency) {
|
||||
if (now - state->last_expired_check > _options.expired_sstable_check_frequency) {
|
||||
clogger.debug("[{}] TWCS expired check sufficiently far in the past, checking for fully expired SSTables", fmt::ptr(this));
|
||||
|
||||
// Find fully expired SSTables. Those will be included no matter what.
|
||||
@@ -356,12 +357,14 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
// Keep checking for fully_expired_sstables until we don't find
|
||||
// any among the candidates, meaning they are either already compacted
|
||||
// or registered for compaction.
|
||||
state.last_expired_check = now;
|
||||
state->last_expired_check = now;
|
||||
} else {
|
||||
clogger.debug("[{}] TWCS skipping check for fully expired SSTables", fmt::ptr(this));
|
||||
}
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
|
||||
co_await utils::get_local_injector().inject("twcs_get_sstables_for_compaction", utils::wait_for_message(30s));
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time, *state);
|
||||
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
|
||||
co_return compaction_descriptor(std::move(compaction_candidates));
|
||||
}
|
||||
@@ -384,8 +387,8 @@ time_window_compaction_strategy::compaction_mode(const time_window_compaction_st
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
time_window_compaction_strategy::get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control,
|
||||
std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time) {
|
||||
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables);
|
||||
std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state) {
|
||||
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables, state);
|
||||
|
||||
if (!most_interesting.empty()) {
|
||||
return most_interesting;
|
||||
@@ -410,14 +413,14 @@ time_window_compaction_strategy::get_next_non_expired_sstables(compaction_group_
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
time_window_compaction_strategy::get_compaction_candidates(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidate_sstables) {
|
||||
auto& state = get_state(table_s);
|
||||
time_window_compaction_strategy::get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
|
||||
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state) {
|
||||
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
|
||||
// Update the highest window seen, if necessary
|
||||
state.highest_window_seen = std::max(state.highest_window_seen, max_timestamp);
|
||||
|
||||
return newest_bucket(table_s, control, std::move(buckets), table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold(),
|
||||
state.highest_window_seen);
|
||||
state.highest_window_seen, state);
|
||||
}
|
||||
|
||||
timestamp_type
|
||||
@@ -465,8 +468,7 @@ namespace compaction {
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
time_window_compaction_strategy::newest_bucket(compaction_group_view& table_s, strategy_control& control, std::map<timestamp_type, std::vector<sstables::shared_sstable>> buckets,
|
||||
int min_threshold, int max_threshold, timestamp_type now) {
|
||||
auto& state = get_state(table_s);
|
||||
int min_threshold, int max_threshold, timestamp_type now, time_window_compaction_strategy_state& state) {
|
||||
clogger.debug("time_window_compaction_strategy::newest_bucket:\n now {}\n{}", now, buckets);
|
||||
|
||||
for (auto&& [key, bucket] : buckets | std::views::reverse) {
|
||||
@@ -517,7 +519,7 @@ time_window_compaction_strategy::trim_to_threshold(std::vector<sstables::shared_
|
||||
}
|
||||
|
||||
future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
|
||||
auto& state = get_state(table_s);
|
||||
auto state = get_state(table_s);
|
||||
auto min_threshold = table_s.min_compaction_threshold();
|
||||
auto max_threshold = table_s.schema()->max_compaction_threshold();
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
@@ -526,7 +528,7 @@ future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(c
|
||||
|
||||
int64_t n = 0;
|
||||
for (auto& [bucket_key, bucket] : buckets) {
|
||||
switch (compaction_mode(state, bucket, bucket_key, max_timestamp, min_threshold)) {
|
||||
switch (compaction_mode(*state, bucket, bucket_key, max_timestamp, min_threshold)) {
|
||||
case bucket_compaction_mode::size_tiered:
|
||||
n += size_tiered_compaction_strategy::estimated_pending_compactions(bucket, min_threshold, max_threshold, _stcs_options);
|
||||
break;
|
||||
|
||||
@@ -67,6 +67,8 @@ struct time_window_compaction_strategy_state {
|
||||
std::unordered_set<api::timestamp_type> recent_active_windows;
|
||||
};
|
||||
|
||||
using time_window_compaction_strategy_state_ptr = seastar::shared_ptr<time_window_compaction_strategy_state>;
|
||||
|
||||
class time_window_compaction_strategy : public compaction_strategy_impl {
|
||||
time_window_compaction_strategy_options _options;
|
||||
size_tiered_compaction_strategy_options _stcs_options;
|
||||
@@ -87,7 +89,7 @@ public:
|
||||
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
private:
|
||||
time_window_compaction_strategy_state& get_state(compaction_group_view& table_s) const;
|
||||
time_window_compaction_strategy_state_ptr get_state(compaction_group_view& table_s) const;
|
||||
|
||||
static api::timestamp_type
|
||||
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
|
||||
@@ -110,9 +112,11 @@ private:
|
||||
compaction_mode(const time_window_compaction_strategy_state&, const bucket_t& bucket, api::timestamp_type bucket_key, api::timestamp_type now, size_t min_threshold) const;
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time);
|
||||
get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> non_expiring_sstables,
|
||||
gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state);
|
||||
|
||||
std::vector<sstables::shared_sstable> get_compaction_candidates(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidate_sstables);
|
||||
std::vector<sstables::shared_sstable> get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
|
||||
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state);
|
||||
public:
|
||||
// Find the lowest timestamp for window of given size
|
||||
static api::timestamp_type
|
||||
@@ -126,7 +130,7 @@ public:
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
newest_bucket(compaction_group_view& table_s, strategy_control& control, std::map<api::timestamp_type, std::vector<sstables::shared_sstable>> buckets,
|
||||
int min_threshold, int max_threshold, api::timestamp_type now);
|
||||
int min_threshold, int max_threshold, api::timestamp_type now, time_window_compaction_strategy_state& state);
|
||||
|
||||
static std::vector<sstables::shared_sstable>
|
||||
trim_to_threshold(std::vector<sstables::shared_sstable> bucket, int max_threshold);
|
||||
|
||||
@@ -1078,7 +1078,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/s3/client.cc',
|
||||
'utils/s3/retryable_http_client.cc',
|
||||
'utils/s3/retry_strategy.cc',
|
||||
'utils/s3/s3_retry_strategy.cc',
|
||||
'utils/s3/credentials_providers/aws_credentials_provider.cc',
|
||||
'utils/s3/credentials_providers/environment_aws_credentials_provider.cc',
|
||||
'utils/s3/credentials_providers/instance_profile_credentials_provider.cc',
|
||||
|
||||
@@ -1224,7 +1224,7 @@ listPermissionsStatement returns [std::unique_ptr<list_permissions_statement> st
|
||||
;
|
||||
|
||||
permission returns [auth::permission perm = auth::permission{}]
|
||||
: p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE | K_DESCRIBE | K_EXECUTE)
|
||||
: p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE | K_DESCRIBE | K_EXECUTE | K_VECTOR_SEARCH_INDEXING)
|
||||
{ $perm = auth::permissions::from_string($p.text); }
|
||||
;
|
||||
|
||||
@@ -2398,6 +2398,8 @@ K_EXECUTE: E X E C U T E;
|
||||
|
||||
K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
|
||||
|
||||
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
|
||||
|
||||
// Case-insensitive alpha characters
|
||||
fragment A: ('a'|'A');
|
||||
fragment B: ('b'|'B');
|
||||
|
||||
@@ -136,9 +136,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
|
||||
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
|
||||
}
|
||||
compression_parameters cp(*compression_options);
|
||||
cp.validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
|
||||
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
|
||||
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
|
||||
}
|
||||
|
||||
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "create_index_statement.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -92,9 +94,13 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
|
||||
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
|
||||
}
|
||||
|
||||
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
|
||||
throw exceptions::invalid_request_exception(format("Secondary indexes are not supported on base tables with tablets (keyspace '{}')", keyspace()));
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
validate_for_local_index(*schema);
|
||||
|
||||
std::vector<::shared_ptr<index_target>> targets;
|
||||
|
||||
@@ -113,8 +113,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
|
||||
if (rs->uses_tablets()) {
|
||||
warnings.push_back(
|
||||
"Tables in this keyspace will be replicated using Tablets "
|
||||
"and will not support Materialized Views, Secondary Indexes and counters features. "
|
||||
"To use Materialized Views, Secondary Indexes or counters, drop this keyspace and re-create it "
|
||||
"and will not support counters features. To use counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
|
||||
if (ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
|
||||
@@ -152,9 +152,13 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
|
||||
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
|
||||
|
||||
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
|
||||
throw exceptions::invalid_request_exception(format("Materialized views are not supported on base tables with tablets"));
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
if (schema->is_counter()) {
|
||||
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include "index/vector_index.hh"
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
#include "service/qos/qos_common.hh"
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
@@ -245,7 +246,8 @@ future<> select_statement::check_access(query_processor& qp, const service::clie
|
||||
auto& cf_name = s->is_view()
|
||||
? s->view_info()->base_name()
|
||||
: (cdc ? cdc->cf_name() : column_family());
|
||||
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT);
|
||||
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*_schema);
|
||||
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT, auth::command_desc::type::OTHER, is_vector_indexed);
|
||||
} catch (const data_dictionary::no_such_column_family& e) {
|
||||
// Will be validated afterwards.
|
||||
co_return;
|
||||
|
||||
11
db/config.cc
11
db/config.cc
@@ -1318,15 +1318,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
|
||||
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
|
||||
"Server-global user table compression options. If enabled, all user tables"
|
||||
"will be compressed using the provided options, unless overridden"
|
||||
"by compression options in the table schema. The available options are:\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
|
||||
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
|
||||
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
|
||||
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
|
||||
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
|
||||
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
|
||||
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
|
||||
@@ -1426,7 +1426,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
|
||||
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
|
||||
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
|
||||
@@ -1756,7 +1757,7 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
|
||||
{"broadcast-tables", feature::BROADCAST_TABLES},
|
||||
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
|
||||
{"tablets", feature::UNUSED},
|
||||
{"views-with-tablets", feature::VIEWS_WITH_TABLETS}
|
||||
{"views-with-tablets", feature::UNUSED}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -136,8 +136,7 @@ struct experimental_features_t {
|
||||
UDF,
|
||||
ALTERNATOR_STREAMS,
|
||||
BROADCAST_TABLES,
|
||||
KEYSPACE_STORAGE_OPTIONS,
|
||||
VIEWS_WITH_TABLETS
|
||||
KEYSPACE_STORAGE_OPTIONS
|
||||
};
|
||||
static std::map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
@@ -478,6 +477,7 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<bool> alternator_warn_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
named_value<uint32_t> alternator_streams_time_window_s;
|
||||
named_value<uint32_t> alternator_timeout_in_ms;
|
||||
|
||||
@@ -2463,14 +2463,14 @@ future<bool> system_keyspace::cdc_is_rewritten() {
|
||||
}
|
||||
|
||||
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) {
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
|
||||
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
|
||||
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
|
||||
|
||||
struct cur_t {
|
||||
table_id tid;
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
std::optional<cur_t> cur;
|
||||
|
||||
@@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
if (cur) {
|
||||
co_await f(cur->tid, cur->ts, std::move(cur->streams));
|
||||
}
|
||||
cur = { tid, ts, std::vector<cdc::stream_id>() };
|
||||
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
|
||||
}
|
||||
cur->streams.push_back(std::move(stream_id));
|
||||
|
||||
@@ -2499,9 +2499,10 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
}
|
||||
}
|
||||
|
||||
future<> system_keyspace::read_cdc_streams_history(table_id table,
|
||||
future<> system_keyspace::read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from,
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f) {
|
||||
static const sstring query = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY);
|
||||
static const sstring query_all = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY);
|
||||
static const sstring query_from = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ? AND timestamp > ?", NAME, CDC_STREAMS_HISTORY);
|
||||
|
||||
struct cur_t {
|
||||
table_id tid;
|
||||
@@ -2510,7 +2511,11 @@ future<> system_keyspace::read_cdc_streams_history(table_id table,
|
||||
};
|
||||
std::optional<cur_t> cur;
|
||||
|
||||
co_await _qp.query_internal(query, db::consistency_level::ONE, {table.uuid()}, 1000, [&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
co_await _qp.query_internal(from ? query_from : query_all,
|
||||
db::consistency_level::ONE,
|
||||
from ? data_value_list{table.uuid(), *from} : data_value_list{table.uuid()},
|
||||
1000,
|
||||
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
auto tid = table_id(row.get_as<utils::UUID>("table_id"));
|
||||
auto ts = row.get_as<db_clock::time_point>("timestamp");
|
||||
auto stream_state = cdc::read_stream_state(row.get_as<int8_t>("stream_state"));
|
||||
|
||||
@@ -601,8 +601,8 @@ public:
|
||||
future<bool> cdc_is_rewritten();
|
||||
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_history(table_id table, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
|
||||
|
||||
// Load Raft Group 0 id from scylla.local
|
||||
future<utils::UUID> get_raft_group0_id();
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <flat_map>
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/view/base_info.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "db/view/view_consumer.hh"
|
||||
@@ -3305,15 +3306,6 @@ public:
|
||||
_step.base->schema()->cf_name(), _step.current_token(), view_names);
|
||||
}
|
||||
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
|
||||
if (_step.current_key.key().is_empty()) {
|
||||
// consumer got end-of-stream without consuming a single partition
|
||||
vlogger.debug("Reader didn't produce anything, marking views as built");
|
||||
while (!_step.build_status.empty()) {
|
||||
_built_views.views.push_back(std::move(_step.build_status.back()));
|
||||
_step.build_status.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
// before going back to the minimum token, advance current_key to the end
|
||||
// and check for built views in that range.
|
||||
_step.current_key = { _step.prange.end().value_or(dht::ring_position::max()).value().token(), partition_key::make_empty()};
|
||||
@@ -3332,6 +3324,7 @@ public:
|
||||
|
||||
// Called in the context of a seastar::thread.
|
||||
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
inject_failure("dont_start_build_step");
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto compaction_state = make_lw_shared<compact_for_query_state>(
|
||||
*step.reader.schema(),
|
||||
@@ -3365,6 +3358,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
||||
}).get();
|
||||
utils::get_local_injector().inject("delay_finishing_build_step", utils::wait_for_message(60s)).get();
|
||||
}
|
||||
|
||||
future<> view_builder::mark_as_built(view_ptr view) {
|
||||
@@ -3715,5 +3709,22 @@ sstring build_status_to_sstring(build_status status) {
|
||||
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
|
||||
}
|
||||
|
||||
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
|
||||
const bool tablet_views_enabled = db.features().views_with_tablets;
|
||||
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
|
||||
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
|
||||
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
|
||||
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
|
||||
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
|
||||
|
||||
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
|
||||
|
||||
if (!required_config && uses_tablets) {
|
||||
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
|
||||
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
|
||||
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
@@ -309,6 +309,18 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
bool use_tablets_basic_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats);
|
||||
|
||||
/// Verify that the provided keyspace is eligible for storing materialized views.
|
||||
///
|
||||
/// Result:
|
||||
/// * If the keyspace is eligible, no effect.
|
||||
/// * If the keyspace is not eligible, an exception is thrown. Its type is not specified,
|
||||
/// and the user of this function cannot make any assumption about it. The carried exception
|
||||
/// message will be worded in a way that can be directly passed on to the end user.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * The provided `keyspace_name` must correspond to an existing keyspace.
|
||||
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -127,8 +127,9 @@ view_building_worker::view_building_worker(replica::database& db, db::system_key
|
||||
init_messaging_service();
|
||||
}
|
||||
|
||||
void view_building_worker::start_background_fibers() {
|
||||
future<> view_building_worker::init() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
co_await discover_existing_staging_sstables();
|
||||
_staging_sstables_registrator = run_staging_sstables_registrator();
|
||||
_view_building_state_observer = run_view_building_state_observer();
|
||||
_mnotifier.register_listener(this);
|
||||
@@ -195,8 +196,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
}
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
co_await discover_existing_staging_sstables();
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
@@ -310,7 +309,10 @@ std::unordered_map<table_id, std::vector<view_building_worker::staging_sstable_t
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tablet_map = _db.get_token_metadata().tablets().get_tablet_map(table_id);
|
||||
// scylladb/scylladb#26403: Make sure to access the tablets map via the effective replication map of the table object.
|
||||
// The token metadata object pointed to by the database (`_db.get_token_metadata()`) may not contain
|
||||
// the tablets map of the currently processed table yet. After #24414 is fixed, this should not matter anymore.
|
||||
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
|
||||
auto sstables = table->get_sstables();
|
||||
for (auto sstable: *sstables) {
|
||||
if (!sstable->requires_view_building()) {
|
||||
@@ -340,6 +342,7 @@ future<> view_building_worker::run_view_building_state_observer() {
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
_state.some_batch_finished = false;
|
||||
try {
|
||||
vbw_logger.trace("view_building_state_observer() iteration");
|
||||
auto read_apply_mutex_holder = co_await _group0_client.hold_read_apply_mutex(_as);
|
||||
@@ -349,7 +352,12 @@ future<> view_building_worker::run_view_building_state_observer() {
|
||||
_as.check();
|
||||
|
||||
read_apply_mutex_holder.return_all();
|
||||
co_await _vb_state_machine.event.wait();
|
||||
|
||||
// A batch could finished its work while the worker was
|
||||
// updating the state. In that case we should do another iteration.
|
||||
if (!_state.some_batch_finished) {
|
||||
co_await _vb_state_machine.event.wait();
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
} catch (broken_condition_variable&) {
|
||||
} catch (...) {
|
||||
@@ -657,6 +665,7 @@ future<> view_building_worker::local_state::clear_state() {
|
||||
finished_tasks.clear();
|
||||
aborted_tasks.clear();
|
||||
state_updated_cv.broadcast();
|
||||
some_batch_finished = false;
|
||||
vbw_logger.debug("View building worker state was cleared.");
|
||||
}
|
||||
|
||||
@@ -676,6 +685,7 @@ void view_building_worker::batch::start() {
|
||||
return do_work();
|
||||
}).finally([this] () {
|
||||
state = batch_state::finished;
|
||||
_vbw.local()._state.some_batch_finished = true;
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ class view_building_worker : public seastar::peering_sharded_service<view_buildi
|
||||
std::unordered_set<utils::UUID> finished_tasks;
|
||||
std::unordered_set<utils::UUID> aborted_tasks;
|
||||
|
||||
bool some_batch_finished = false;
|
||||
condition_variable state_updated_cv;
|
||||
|
||||
// Clears completed/aborted tasks and creates batches (without starting them) for started tasks.
|
||||
@@ -166,7 +167,7 @@ public:
|
||||
view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier,
|
||||
service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms,
|
||||
view_building_state_machine& vbsm);
|
||||
void start_background_fibers();
|
||||
future<> init();
|
||||
|
||||
future<> register_staging_sstable_tasks(std::vector<sstables::shared_sstable> ssts, table_id table_id);
|
||||
|
||||
|
||||
@@ -1278,7 +1278,7 @@ public:
|
||||
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
|
||||
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
|
||||
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await emit_stream_set(ts, cdc::stream_state::current, current);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
|
||||
|
||||
@@ -204,7 +204,7 @@ ring_position_range_sharder::next(const schema& s) {
|
||||
return ring_position_range_and_shard{std::move(_range), shard};
|
||||
}
|
||||
|
||||
ring_position_range_vector_sharder::ring_position_range_vector_sharder(const sharder& sharder, dht::partition_range_vector ranges)
|
||||
ring_position_range_vector_sharder::ring_position_range_vector_sharder(const sharder& sharder, utils::chunked_vector<dht::partition_range> ranges)
|
||||
: _ranges(std::move(ranges))
|
||||
, _sharder(sharder)
|
||||
, _current_range(_ranges.begin()) {
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "dht/ring_position.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "utils/interval.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
#include <vector>
|
||||
|
||||
@@ -89,7 +90,7 @@ struct ring_position_range_and_shard_and_element : ring_position_range_and_shard
|
||||
//
|
||||
// During migration uses a view on shard routing for reads.
|
||||
class ring_position_range_vector_sharder {
|
||||
using vec_type = dht::partition_range_vector;
|
||||
using vec_type = utils::chunked_vector<dht::partition_range>;
|
||||
vec_type _ranges;
|
||||
const sharder& _sharder;
|
||||
vec_type::iterator _current_range;
|
||||
@@ -104,7 +105,7 @@ public:
|
||||
// Initializes the `ring_position_range_vector_sharder` with the ranges to be processesd.
|
||||
// Input ranges should be non-overlapping (although nothing bad will happen if they do
|
||||
// overlap).
|
||||
ring_position_range_vector_sharder(const sharder& sharder, dht::partition_range_vector ranges);
|
||||
ring_position_range_vector_sharder(const sharder& sharder, utils::chunked_vector<dht::partition_range> ranges);
|
||||
// Fetches the next range-shard mapping. When the input range is exhausted, std::nullopt is
|
||||
// returned. Within an input range, results are contiguous and non-overlapping (but since input
|
||||
// ranges usually are discontiguous, overall the results are not contiguous). Together, the results
|
||||
|
||||
24
dist/common/scripts/scylla_io_setup
vendored
24
dist/common/scripts/scylla_io_setup
vendored
@@ -131,6 +131,28 @@ def configure_iotune_open_fd_limit(shards_count):
|
||||
logging.error(f"Required FDs count: {precalculated_fds_count}, default limit: {fd_limits}!")
|
||||
sys.exit(1)
|
||||
|
||||
def force_random_request_size_of_4k():
|
||||
"""
|
||||
It is a known bug that on i4i, i7i, i8g, i8ge instances, the disk controller reports the wrong
|
||||
physical sector size as 512bytes, but the actual physical sector size is 4096bytes. This function
|
||||
helps us work around that issue until AWS manages to get a fix for it. It returns 4096 if it
|
||||
detect it's running on one of the affected instance types, otherwise it returns None and IOTune
|
||||
will use the physical sector size reported by the disk.
|
||||
"""
|
||||
path="/sys/devices/virtual/dmi/id/product_name"
|
||||
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
instance_type = f.read().strip()
|
||||
except FileNotFoundError:
|
||||
logging.warning(f"Couldn't find {path}. Falling back to IOTune using the physical sector size reported by disk.")
|
||||
return
|
||||
|
||||
prefixes = ["i7i", "i4i", "i8g", "i8ge"]
|
||||
if any(instance_type.startswith(p) for p in prefixes):
|
||||
return 4096
|
||||
|
||||
|
||||
def run_iotune():
|
||||
if "SCYLLA_CONF" in os.environ:
|
||||
conf_dir = os.environ["SCYLLA_CONF"]
|
||||
@@ -173,6 +195,8 @@ def run_iotune():
|
||||
|
||||
configure_iotune_open_fd_limit(cpudata.nr_shards())
|
||||
|
||||
if (reqsize := force_random_request_size_of_4k()):
|
||||
iotune_args += ["--random-write-io-buffer-size", f"{reqsize}"]
|
||||
try:
|
||||
subprocess.check_call([bindir() + "/iotune",
|
||||
"--format", "envfile",
|
||||
|
||||
27
dist/common/scripts/scylla_raid_setup
vendored
27
dist/common/scripts/scylla_raid_setup
vendored
@@ -17,6 +17,7 @@ import stat
|
||||
import logging
|
||||
import pyudev
|
||||
import psutil
|
||||
import platform
|
||||
from pathlib import Path
|
||||
from scylla_util import *
|
||||
from subprocess import run, SubprocessError
|
||||
@@ -102,6 +103,21 @@ def is_selinux_enabled():
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_kernel_version_at_least(major, minor):
|
||||
"""Check if the Linux kernel version is at least major.minor"""
|
||||
try:
|
||||
kernel_version = platform.release()
|
||||
# Extract major.minor from version string like "5.15.0-56-generic"
|
||||
version_parts = kernel_version.split('.')
|
||||
if len(version_parts) >= 2:
|
||||
kernel_major = int(version_parts[0])
|
||||
kernel_minor = int(version_parts[1])
|
||||
return (kernel_major, kernel_minor) >= (major, minor)
|
||||
except (ValueError, IndexError):
|
||||
# If we can't parse the version, assume older kernel for safety
|
||||
pass
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
print('Requires root permission.')
|
||||
@@ -231,8 +247,17 @@ if __name__ == '__main__':
|
||||
# see https://git.kernel.org/pub/scm/fs/xfs/xfsprogs-dev.git/tree/mkfs/xfs_mkfs.c .
|
||||
# and it also cannot be smaller than the sector size.
|
||||
block_size = max(1024, sector_size)
|
||||
|
||||
run('udevadm settle', shell=True, check=True)
|
||||
run(f'mkfs.xfs -b size={block_size} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
|
||||
|
||||
# On Linux 5.12+, sub-block overwrites are supported well, so keep the default block
|
||||
# size, which will play better with the SSD.
|
||||
if is_kernel_version_at_least(5, 12):
|
||||
block_size_opt = ""
|
||||
else:
|
||||
block_size_opt = f"-b size={block_size}"
|
||||
|
||||
run(f'mkfs.xfs {block_size_opt} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
|
||||
run('udevadm settle', shell=True, check=True)
|
||||
|
||||
if is_debian_variant():
|
||||
|
||||
17
docs/_static/data/os-support.json
vendored
17
docs/_static/data/os-support.json
vendored
@@ -1,16 +1,25 @@
|
||||
{
|
||||
"Linux Distributions": {
|
||||
"Ubuntu": ["22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Debian": ["11", "12"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9", "10"],
|
||||
"Amazon Linux": ["2023"]
|
||||
},
|
||||
"ScyllaDB Versions": [
|
||||
{
|
||||
"version": "ScyllaDB 2025.4",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["22.04", "24.04"],
|
||||
"Debian": ["11", "12"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9", "10"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"version": "ScyllaDB 2025.3",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Debian": ["11", "12"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9", "10"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
@@ -19,7 +28,7 @@
|
||||
"version": "ScyllaDB 2025.2",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Debian": ["11", "12"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
@@ -28,7 +37,7 @@
|
||||
"version": "ScyllaDB 2025.1",
|
||||
"supported_OS": {
|
||||
"Ubuntu": ["22.04", "24.04"],
|
||||
"Debian": ["11"],
|
||||
"Debian": ["11", "12"],
|
||||
"Rocky / CentOS / RHEL": ["8", "9"],
|
||||
"Amazon Linux": ["2023"]
|
||||
}
|
||||
|
||||
@@ -109,6 +109,32 @@ to do what, configure the following in ScyllaDB's configuration:
|
||||
alternator_enforce_authorization: true
|
||||
```
|
||||
|
||||
Note: switching `alternator_enforce_authorization` from `false` to `true`
|
||||
before the client application has the proper secret keys and permission
|
||||
tables set up will cause the application's requests to immediately fail.
|
||||
Therefore, we recommend to begin by keeping `alternator_enforce_authorization`
|
||||
set to `false` and setting `alternator_warn_authorization` to `true`.
|
||||
This setting will continue to allow all requests without failing on
|
||||
authentication or authorization errors - but will _count_ would-be
|
||||
authentication and authorization failures in the two metrics:
|
||||
|
||||
* `scylla_alternator_authentication_failures`
|
||||
* `scylla_alternator_authorization_failures`
|
||||
|
||||
`alternator_warn_authorization=true` also generates a WARN-level log message
|
||||
on each authentication or authorization failure. These log messages each
|
||||
includes the string `alternator_enforce_authorization=true`, and information
|
||||
that can help pinpoint the source of the error - such as the username
|
||||
involved in the attempt, and the address of the client sending the request.
|
||||
|
||||
When you see that both metrics are not increasing (or, alternatively, that no
|
||||
more log messages appear), you can be sure that the application is properly
|
||||
set up and can finally set `alternator_enforce_authorization` to `true`.
|
||||
You can leave `alternator_warn_authorization` set or unset, depending on
|
||||
whether or not you want to see log messages when requests fail on
|
||||
authentication/authorization (in any case, the metric counts these failures,
|
||||
and the client will also get the error).
|
||||
|
||||
Alternator implements the same [signature protocol](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)
|
||||
as DynamoDB and the rest of AWS. Clients use, as usual, an access key ID and
|
||||
a secret access key to prove their identity and the authenticity of their
|
||||
|
||||
@@ -202,12 +202,9 @@ enabled. If you plan to use any of the features listed below, CREATE your keyspa
|
||||
:ref:`with tablets disabled <tablets-enable-tablets>`.
|
||||
|
||||
* Counters
|
||||
* Materialized Views (MV) ``*``
|
||||
* Secondary indexes (SI, as it depends on MV) ``*``
|
||||
|
||||
``*`` You can enable experimental support for MV and SI using
|
||||
the ``--experimental-features=views-with-tablets`` configuration option.
|
||||
See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
To enable materialized views and secondary indexes for tablet keyspaces, use
|
||||
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
|
||||
Resharding in keyspaces with tablets enabled has the following limitations:
|
||||
|
||||
|
||||
@@ -263,3 +263,16 @@ To list all available CDC streams for a tablets-based keyspace:
|
||||
...
|
||||
|
||||
Query all streams to read the entire CDC log.
|
||||
|
||||
Garbage collection of CDC streams metadata
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
For tablets-based keyspaces, Scylla periodically performs garbage collection of old CDC streams metadata in the ``system.cdc_timestamps`` and ``system.cdc_streams`` tables.
|
||||
This process removes information about streams that are no longer needed, helping to prevent unbounded growth of the metadata tables.
|
||||
|
||||
The garbage collection process runs periodically in the background and examines streams that have been closed.
|
||||
It removes information about a stream if the stream's close timestamp is older than the configured TTL of the CDC table.
|
||||
Since the stream has been closed for longer than the TTL, this means that all rows in this stream have also exceeded their TTL and expired, unless the table's TTL was altered to a smaller value after some rows have been written.
|
||||
|
||||
.. warning::
|
||||
When altering the TTL of a CDC table to a smaller value, you can lose information about streams that still contain live rows. Make sure to read all the information you need from the ``system.cdc_timestamps`` and ``system.cdc_streams`` tables before performing such alterations.
|
||||
|
||||
@@ -14,6 +14,7 @@ See :doc:`OS Support by Platform and Version </getting-started/os-support/>`.
|
||||
|
||||
Install ScyllaDB with Web Installer
|
||||
---------------------------------------
|
||||
|
||||
To install ScyllaDB with Web Installer, run:
|
||||
|
||||
.. code:: console
|
||||
@@ -27,7 +28,13 @@ You can run the command with the ``-h`` or ``--help`` flag to print information
|
||||
Installing a Non-default Version
|
||||
---------------------------------------
|
||||
|
||||
You can install a version other than the default.
|
||||
You can install a version other than the default. To get the list of supported
|
||||
release versions, run:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --list-active-releases
|
||||
|
||||
|
||||
Versions 2025.1 and Later
|
||||
==============================
|
||||
|
||||
@@ -341,17 +341,13 @@ credentials and endpoint.
|
||||
Views with Tablets
|
||||
------------------
|
||||
|
||||
By default, Materialized Views (MV) and Secondary Indexes (SI)
|
||||
are disabled in keyspaces that use tablets.
|
||||
|
||||
Support for MV and SI with tablets is experimental and must be explicitly
|
||||
enabled in the ``scylla.yaml`` configuration file by specifying
|
||||
the ``views-with-tablets`` option:
|
||||
Materialized Views (MV) and Secondary Indexes (SI) are enabled in keyspaces that use tablets
|
||||
only when :term:`RF-rack-valid keyspaces <RF-rack-valid keyspace>` are enforced. That can be
|
||||
done in the ``scylla.yaml`` configuration file by specifying
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
experimental_features:
|
||||
- views-with-tablets
|
||||
rf_rack_valid_keyspaces: true
|
||||
|
||||
|
||||
Monitoring
|
||||
|
||||
@@ -53,7 +53,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
|
||||
|
||||
nodetool cluster repair --tablet-tokens 1,10474535988
|
||||
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
|
||||
|
||||
For example:
|
||||
|
||||
|
||||
@@ -29,16 +29,27 @@ Load and Stream
|
||||
|
||||
.. code::
|
||||
|
||||
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las] [--scope <scope>]
|
||||
nodetool refresh <my_keyspace> <my_table> [(--load-and-stream | -las) [[(--primary-replica-only | -pro)] | [--scope <scope>]]]
|
||||
|
||||
The Load and Stream feature extends nodetool refresh.
|
||||
|
||||
The ``--load-and-stream`` option loads arbitrary sstables into the cluster by reading the sstable data and streaming each partition to the replica(s) that owns it. In addition, the ``--scope`` and ``--primary-replica-only`` options are applied to filter the set of target replicas for each partition. For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. One can copy the sstables from the old cluster to any of the new nodes and trigger refresh with load and stream.
|
||||
|
||||
|
||||
|
||||
|
||||
The Load and Stream feature extends nodetool refresh. The new ``-las`` option loads arbitrary sstables that do not belong to a node into the cluster. It loads the sstables from the disk and calculates the data's owning nodes, and streams automatically.
|
||||
For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. We can copy the sstables from the old cluster to any of the new nodes and trigger the load and stream process.
|
||||
|
||||
Load and Stream make restores and migrations much easier:
|
||||
|
||||
* You can place sstable from every node to every node
|
||||
* No need to run nodetool cleanup to remove unused data
|
||||
|
||||
With --primary-replica-only (or -pro) option, only the primary replica of each partition in an sstable will be used as the target.
|
||||
--primary-replica-only must be applied together with --load-and-stream.
|
||||
--primary-replica-only cannot be used with --scope, they are mutually exclusive.
|
||||
--primary-replica-only requires repair to be run after the load and stream operation is completed.
|
||||
|
||||
|
||||
Scope
|
||||
-----
|
||||
|
||||
|
||||
@@ -38,14 +38,14 @@ Manual Dictionary Training
|
||||
|
||||
You can manually trigger dictionary training using the REST API::
|
||||
|
||||
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&table=mytable"
|
||||
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&cf=mytable"
|
||||
|
||||
Estimating Compression Ratios
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To choose the best compression configuration, you can estimate compression ratios using the REST API::
|
||||
|
||||
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&table=mytable"
|
||||
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&cf=mytable"
|
||||
|
||||
This will return a report with estimated compression ratios for various combinations of compression
|
||||
parameters (algorithm, chunk size, zstd level, dictionary).
|
||||
|
||||
@@ -387,6 +387,7 @@ The full set of available permissions is:
|
||||
- ``MODIFY``
|
||||
- ``AUTHORIZE``
|
||||
- ``DESCRIBE``
|
||||
- ``VECTOR_SEARCH_INDEXING``
|
||||
|
||||
..
|
||||
- ``EXECUTE``
|
||||
@@ -458,6 +459,9 @@ permissions can be granted on which types of resources, and which statements are
|
||||
* - ``DESCRIBE``
|
||||
- ``ALL ROLES``
|
||||
- ``LIST ROLES`` on all roles or only roles granted to another specified role
|
||||
* - ``VECTOR_SEARCH_INDEXING``
|
||||
- ``ALL KEYSPACES``
|
||||
- ``SELECT`` on all tables with vector search indexes
|
||||
|
||||
.. _grant-permission-statement:
|
||||
|
||||
|
||||
@@ -204,6 +204,10 @@ public:
|
||||
return bool(_mask & mask_for(e));
|
||||
}
|
||||
|
||||
bool intersects(const enum_set& other) const {
|
||||
return bool(_mask & other._mask);
|
||||
}
|
||||
|
||||
template<enum_type e>
|
||||
void remove() {
|
||||
_mask &= ~mask_for<e>();
|
||||
|
||||
@@ -76,7 +76,7 @@ struct repair_row_level_start_response {
|
||||
|
||||
namespace locator {
|
||||
enum class tablet_repair_incremental_mode : uint8_t {
|
||||
regular,
|
||||
incremental,
|
||||
full,
|
||||
disabled,
|
||||
};
|
||||
|
||||
3
init.cc
3
init.cc
@@ -99,9 +99,6 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
||||
disabled.insert("KEYSPACE_STORAGE_OPTIONS"s);
|
||||
}
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::VIEWS_WITH_TABLETS)) {
|
||||
disabled.insert("VIEWS_WITH_TABLETS"s);
|
||||
}
|
||||
if (cfg.force_gossip_topology_changes()) {
|
||||
if (cfg.enable_tablets_by_default()) {
|
||||
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");
|
||||
|
||||
@@ -754,7 +754,7 @@ tablet_task_type tablet_task_type_from_string(const sstring& name) {
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<locator::tablet_repair_incremental_mode, sstring> tablet_repair_incremental_mode_to_name = {
|
||||
{locator::tablet_repair_incremental_mode::disabled, "disabled"},
|
||||
{locator::tablet_repair_incremental_mode::regular, "regular"},
|
||||
{locator::tablet_repair_incremental_mode::incremental, "incremental"},
|
||||
{locator::tablet_repair_incremental_mode::full, "full"},
|
||||
};
|
||||
|
||||
@@ -1204,10 +1204,23 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica
|
||||
void tablet_metadata_guard::check() noexcept {
|
||||
auto erm = _table->get_effective_replication_map();
|
||||
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table);
|
||||
auto& old_tmap = _erm->get_token_metadata().tablets().get_tablet_map(_tablet.table);
|
||||
auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet);
|
||||
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage)) {
|
||||
tablet_logger.debug("tablet_metadata_guard::check: table {}.{}, tablet {}, "
|
||||
"old erm version {}, new erm version {}, old tablet map {}, new tablet map {}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(),
|
||||
_tablet,
|
||||
_erm.get()->get_token_metadata().get_version(),
|
||||
erm.get()->get_token_metadata().get_version(),
|
||||
old_tmap,
|
||||
tmap);
|
||||
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage) ||
|
||||
old_tmap.tablet_count() != tmap.tablet_count())
|
||||
{
|
||||
tablet_logger.debug("tablet_metadata_guard::check: retain the erm and abort the guard");
|
||||
_abort_source.request_abort();
|
||||
} else {
|
||||
tablet_logger.debug("tablet_metadata_guard::check: refresh the erm");
|
||||
_erm = std::move(erm);
|
||||
subscribe();
|
||||
}
|
||||
|
||||
@@ -162,11 +162,11 @@ sstring tablet_task_type_to_string(tablet_task_type);
|
||||
tablet_task_type tablet_task_type_from_string(const sstring&);
|
||||
|
||||
|
||||
// - regular (regular incremental repair): The incremental repair logic is enabled.
|
||||
// - incremental (incremental repair): The incremental repair logic is enabled.
|
||||
// Unrepaired sstables will be included for repair. Repaired sstables will be
|
||||
// skipped. The incremental repair states will be updated after repair.
|
||||
|
||||
// - full (full incremental repair): The incremental repair logic is enabled.
|
||||
// - full (full repair): The incremental repair logic is enabled.
|
||||
// Both repaired and unrepaired sstables will be included for repair. The
|
||||
// incremental repair states will be updated after repair.
|
||||
|
||||
@@ -175,12 +175,12 @@ tablet_task_type tablet_task_type_from_string(const sstring&);
|
||||
// sstables_repaired_at in system.tablets table, will not be updated after
|
||||
// repair.
|
||||
enum class tablet_repair_incremental_mode : uint8_t {
|
||||
regular,
|
||||
incremental,
|
||||
full,
|
||||
disabled,
|
||||
};
|
||||
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::regular};
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
|
||||
|
||||
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
|
||||
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);
|
||||
|
||||
52
main.cc
52
main.cc
@@ -2128,7 +2128,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting sstables loader");
|
||||
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
|
||||
sst_loader.stop().get();
|
||||
});
|
||||
@@ -2208,15 +2208,55 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), token_metadata.local().get());
|
||||
|
||||
// Materialized views and secondary indexes are still restricted and require specific configuration
|
||||
// options to work. Make sure that if there are existing views or indexes, they don't violate
|
||||
// the requirements imposed on them.
|
||||
db.local().validate_tablet_views_indexes();
|
||||
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
//
|
||||
// Also, if the dictionary compression feature is not enabled, use
|
||||
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
|
||||
|
||||
gms::feature::listener_registration reg_listener;
|
||||
|
||||
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
|
||||
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
|
||||
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
|
||||
}
|
||||
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
|
||||
compression_parameters original_params{sstable_compression_options().get_options()};
|
||||
auto params = sstable_compression_options().get_options();
|
||||
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
|
||||
// Register a callback to update the default compression algorithm when the feature is enabled.
|
||||
// Precondition:
|
||||
// The callback must run inside seastar::async context:
|
||||
// - If the listener fires immediately, we are running inside seastar::async already.
|
||||
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
|
||||
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(params, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
|
||||
cfg->sstable_compression_user_table_options().validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
|
||||
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
|
||||
} catch (const std::exception& e) {
|
||||
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
|
||||
throw bad_configuration_error();
|
||||
@@ -2426,7 +2466,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
checkpoint(stop_signal, "starting view building worker's background fibers");
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
view_building_worker.local().start_background_fibers();
|
||||
return view_building_worker.local().init();
|
||||
}).get();
|
||||
auto drain_view_buiding_worker = defer_verbose_shutdown("draining view building worker", [&] {
|
||||
view_building_worker.invoke_on_all(&db::view::view_building_worker::drain).get();
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:103bd12a1f0feb60d814da074b81ebafaa13059d1267ee3612c48a8bc96798b6
|
||||
size 6242980
|
||||
oid sha256:a2233592a0d28e525a0daf9a7f706929d0ae7588ffcaf7b4d76cf209245cbca1
|
||||
size 6302064
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:2cb637e741a2b9badc96f3f175f15db257b9273ea43040289de7d72657b5505a
|
||||
size 6240824
|
||||
oid sha256:ad9e9cc9364a3f01cfe3f320cde2a8dce74bf2e9ed6dfbc3118a5110f2f720f3
|
||||
size 6310124
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
#include <seastar/core/format.hh>
|
||||
|
||||
static const char scylla_product_str[] = SCYLLA_PRODUCT;
|
||||
static const char scylla_version_str[] = SCYLLA_VERSION;
|
||||
static const char scylla_release_str[] = SCYLLA_RELEASE;
|
||||
static const char scylla_build_mode_str[] = SCYLLA_BUILD_MODE_STR;
|
||||
@@ -31,12 +30,9 @@ std::string scylla_build_mode()
|
||||
}
|
||||
|
||||
std::string doc_link(std::string_view url_tail) {
|
||||
const std::string_view product = scylla_product_str;
|
||||
const std::string_view version = scylla_version_str;
|
||||
|
||||
const auto prefix = product == "scylla-enterprise" ? "enterprise" : "opensource";
|
||||
|
||||
std::string branch = product == "scylla-enterprise" ? "enterprise" : "master";
|
||||
std::string branch = "master";
|
||||
if (!version.ends_with("~dev")) {
|
||||
std::vector<std::string> components;
|
||||
boost::split(components, version, boost::algorithm::is_any_of("."));
|
||||
@@ -45,7 +41,7 @@ std::string doc_link(std::string_view url_tail) {
|
||||
branch = fmt::format("branch-{}.{}", components[0], components[1]);
|
||||
}
|
||||
|
||||
return fmt::format("https://{}.docs.scylladb.com/{}/{}", prefix, branch, url_tail);
|
||||
return fmt::format("https://docs.scylladb.com/manual/{}/{}", branch, url_tail);
|
||||
}
|
||||
|
||||
// get the version number into writeable memory, so we can grep for it if we get a core dump
|
||||
|
||||
@@ -420,7 +420,7 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
rlogger.warn("repair[{}]: Skipped sending repair_flush_hints_batchlog due to nodes_down={}, continue to run repair",
|
||||
nodes_down, uuid);
|
||||
uuid, nodes_down);
|
||||
co_return std::make_tuple(hints_batchlog_flushed, flush_time);
|
||||
}
|
||||
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req] (locator::host_id node) -> future<> {
|
||||
|
||||
@@ -196,6 +196,7 @@ struct row_level_repair_metrics {
|
||||
uint64_t rx_hashes_nr{0};
|
||||
uint64_t inc_sst_skipped_bytes{0};
|
||||
uint64_t inc_sst_read_bytes{0};
|
||||
uint64_t tablet_time_ms{0};
|
||||
row_level_repair_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
_metrics.add_group("repair", {
|
||||
@@ -219,6 +220,8 @@ struct row_level_repair_metrics {
|
||||
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
|
||||
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
|
||||
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
|
||||
sm::make_counter("tablet_time_ms", tablet_time_ms,
|
||||
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -3473,7 +3476,16 @@ future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
service::frozen_topology_guard topo_guard) {
|
||||
auto start_time = flush_time;
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
|
||||
co_return co_await repair.run();
|
||||
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
|
||||
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
|
||||
auto t = std::chrono::steady_clock::now();
|
||||
auto update_time = seastar::defer([&] {
|
||||
if (is_tablet && !is_tablet_rebuild) {
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
|
||||
_metrics.tablet_time_ms += duration.count();
|
||||
}
|
||||
});
|
||||
co_await repair.run();
|
||||
}
|
||||
|
||||
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <fmt/std.h>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "db/view/view.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
@@ -86,6 +87,7 @@
|
||||
#include "tracing/trace_keyspace_helper.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <flat_set>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace db;
|
||||
@@ -3483,6 +3485,37 @@ void database::check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces
|
||||
}
|
||||
}
|
||||
|
||||
void database::validate_tablet_views_indexes() const {
|
||||
dblog.info("Verifying that all existing materialized views are valid");
|
||||
const data_dictionary::database& db = this->as_data_dictionary();
|
||||
|
||||
std::flat_set<std::string_view> invalid_keyspaces;
|
||||
|
||||
for (const view_ptr& view : get_views()) {
|
||||
const auto& ks = view->ks_name();
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, ks);
|
||||
} catch (...) {
|
||||
invalid_keyspaces.emplace(ks);
|
||||
}
|
||||
}
|
||||
|
||||
if (invalid_keyspaces.empty()) {
|
||||
dblog.info("All existing materialized views are valid");
|
||||
return;
|
||||
}
|
||||
|
||||
// `std::flat_set` guarantees iteration in the increasing order.
|
||||
const std::string ks_list = invalid_keyspaces
|
||||
| std::views::join_with(std::string_view(", "))
|
||||
| std::ranges::to<std::string>();
|
||||
|
||||
dblog.warn("Some of the existing keyspaces violate the requirements "
|
||||
"for using materialized views or secondary indexes. Those features require enabling "
|
||||
"the configuration option `rf_rack_valid_keyspaces` and the cluster feature "
|
||||
"`VIEWS_WITH_TABLETS`. The keyspaces that violate that condition: {}", ks_list);
|
||||
}
|
||||
|
||||
utils::chunked_vector<uint64_t> compute_random_sorted_ints(uint64_t max_value, uint64_t n_values) {
|
||||
static thread_local std::minstd_rand rng{std::random_device{}()};
|
||||
std::uniform_int_distribution<uint64_t> dist(0, max_value);
|
||||
|
||||
@@ -2091,6 +2091,20 @@ public:
|
||||
// * the `locator::topology` instance corresponding to the passed `locator::token_metadata_ptr`
|
||||
// must contain a complete list of racks and data centers in the cluster.
|
||||
void check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces, const locator::token_metadata_ptr) const;
|
||||
|
||||
/// Verify that all existing materialized views are valid.
|
||||
///
|
||||
/// We consider a materialized view valid if one of the following
|
||||
/// conditions is satisfied:
|
||||
/// * it resides in a vnode-based keyspace,
|
||||
/// * it resides in a tablet-based keyspace, the cluster feature `VIEWS_WITH_TABLETS`
|
||||
/// is enabled, and the configuration option `rf_rack_valid_keyspaces` is enabled.
|
||||
///
|
||||
/// Result:
|
||||
/// * Depending on whether there are invalid materialized views, the function will
|
||||
/// log that either everything's OK, or that there are some keyspaces that violate
|
||||
/// the requirement.
|
||||
void validate_tablet_views_indexes() const;
|
||||
private:
|
||||
// SSTable sampling might require considerable amounts of memory,
|
||||
// so we want to limit the number of concurrent sampling operations.
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: c8a3515f9b...60e4b3b921
@@ -93,9 +93,9 @@ future<> service::client_state::has_function_access(const sstring& ks, const sst
|
||||
}
|
||||
|
||||
future<> service::client_state::has_column_family_access(const sstring& ks,
|
||||
const sstring& cf, auth::permission p, auth::command_desc::type t) const {
|
||||
const sstring& cf, auth::permission p, auth::command_desc::type t, std::optional<bool> is_vector_indexed) const {
|
||||
auto r = auth::make_data_resource(ks, cf);
|
||||
co_return co_await has_access(ks, {p, r, t});
|
||||
co_return co_await has_access(ks, {p, r, t}, is_vector_indexed);
|
||||
}
|
||||
|
||||
future<> service::client_state::has_schema_access(const schema& s, auth::permission p) const {
|
||||
@@ -148,7 +148,7 @@ future<> service::client_state::check_internal_table_permissions(std::string_vie
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd) const {
|
||||
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd, std::optional<bool> is_vector_indexed) const {
|
||||
if (ks.empty()) {
|
||||
throw exceptions::invalid_request_exception("You have not set a keyspace for this session");
|
||||
}
|
||||
@@ -224,10 +224,40 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
|
||||
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
|
||||
}
|
||||
|
||||
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
|
||||
|
||||
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});
|
||||
|
||||
}
|
||||
|
||||
co_return co_await ensure_has_permission(cmd);
|
||||
}
|
||||
|
||||
future<bool> service::client_state::check_has_permission(auth::command_desc cmd) const {
|
||||
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc_with_permission_set& cmd) {
|
||||
return permissions.intersects(cmd.permission);
|
||||
}
|
||||
|
||||
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc& cmd) {
|
||||
return permissions.contains(cmd.permission);
|
||||
}
|
||||
|
||||
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc& cmd) const {
|
||||
return format("User {} has no {} permission on {} or any of its parents",
|
||||
*_user,
|
||||
auth::permissions::to_string(cmd.permission),
|
||||
cmd.resource);
|
||||
}
|
||||
|
||||
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc_with_permission_set& cmd) const {
|
||||
sstring perm_names = fmt::format("{}", fmt::join(auth::permissions::to_strings(cmd.permission), ", "));
|
||||
return format("User {} has none of the permissions ({}) on {} or any of its parents",
|
||||
*_user,
|
||||
perm_names,
|
||||
cmd.resource);
|
||||
}
|
||||
|
||||
template <typename Cmd>
|
||||
future<bool> service::client_state::check_has_permission(Cmd cmd) const {
|
||||
if (_is_internal) {
|
||||
co_return true;
|
||||
}
|
||||
@@ -235,27 +265,29 @@ future<bool> service::client_state::check_has_permission(auth::command_desc cmd)
|
||||
std::optional<auth::resource> parent_r = cmd.resource.parent();
|
||||
|
||||
auth::permission_set set = co_await auth::get_permissions(*_auth_service, *_user, cmd.resource);
|
||||
if (set.contains(cmd.permission)) {
|
||||
if (intersects_permissions(set, cmd)) {
|
||||
co_return true;
|
||||
}
|
||||
if (parent_r) {
|
||||
co_return co_await check_has_permission({cmd.permission, *parent_r});
|
||||
co_return co_await check_has_permission<Cmd>({cmd.permission, *parent_r});
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
template future<bool> service::client_state::check_has_permission(auth::command_desc) const;
|
||||
template future<bool> service::client_state::check_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
|
||||
|
||||
future<> service::client_state::ensure_has_permission(auth::command_desc cmd) const {
|
||||
template <typename Cmd>
|
||||
future<> service::client_state::ensure_has_permission(Cmd cmd) const {
|
||||
return check_has_permission(cmd).then([this, cmd](bool ok) {
|
||||
if (!ok) {
|
||||
return make_exception_future<>(exceptions::unauthorized_exception(
|
||||
format("User {} has no {} permission on {} or any of its parents",
|
||||
*_user,
|
||||
auth::permissions::to_string(cmd.permission),
|
||||
cmd.resource)));
|
||||
generate_authorization_error_msg(cmd)));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
template future<> service::client_state::ensure_has_permission(auth::command_desc) const;
|
||||
template future<> service::client_state::ensure_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
|
||||
|
||||
void service::client_state::set_keyspace(replica::database& db, std::string_view keyspace) {
|
||||
// Skip keyspace validation for non-authenticated users. Apparently, some client libraries
|
||||
|
||||
@@ -332,7 +332,7 @@ public:
|
||||
future<> has_all_keyspaces_access(auth::permission) const;
|
||||
future<> has_keyspace_access(const sstring&, auth::permission) const;
|
||||
future<> has_column_family_access(const sstring&, const sstring&, auth::permission,
|
||||
auth::command_desc::type = auth::command_desc::type::OTHER) const;
|
||||
auth::command_desc::type = auth::command_desc::type::OTHER, std::optional<bool> is_vector_indexed = std::nullopt) const;
|
||||
future<> has_schema_access(const schema& s, auth::permission p) const;
|
||||
future<> has_schema_access(const sstring&, const sstring&, auth::permission p) const;
|
||||
|
||||
@@ -341,11 +341,13 @@ public:
|
||||
future<> has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const;
|
||||
private:
|
||||
future<> check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const;
|
||||
future<> has_access(const sstring& keyspace, auth::command_desc) const;
|
||||
future<> has_access(const sstring& keyspace, auth::command_desc, std::optional<bool> is_vector_indexed = std::nullopt) const;
|
||||
sstring generate_authorization_error_msg(const auth::command_desc&) const;
|
||||
sstring generate_authorization_error_msg(const auth::command_desc_with_permission_set&) const;
|
||||
|
||||
public:
|
||||
future<bool> check_has_permission(auth::command_desc) const;
|
||||
future<> ensure_has_permission(auth::command_desc) const;
|
||||
template<typename Cmd = auth::command_desc> future<bool> check_has_permission(Cmd) const;
|
||||
template<typename Cmd = auth::command_desc> future<> ensure_has_permission(Cmd) const;
|
||||
future<> maybe_update_per_service_level_params();
|
||||
void update_per_service_level_params(qos::service_level_options& slo);
|
||||
|
||||
|
||||
@@ -54,29 +54,28 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
|
||||
}
|
||||
}
|
||||
|
||||
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const dht::token& key,
|
||||
clock_type::time_point timeout, const dht::shard_replica_set& shards)
|
||||
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const schema& s, const dht::token& token,
|
||||
clock_type::time_point timeout)
|
||||
{
|
||||
if (shards.empty()) {
|
||||
on_internal_error(logger, "empty shards");
|
||||
}
|
||||
// When a tablet is migrated between shards on the same node, during the
|
||||
// write_both_read_new state we begin switching reads to the new shard.
|
||||
// Until the corresponding global barrier completes, some requests may still
|
||||
// use write_both_read_old erm, while others already use the write_both_read_new erm.
|
||||
// To ensure mutual exclusion between these two types of requests, we must
|
||||
// acquire locks on both the old and new shards.
|
||||
// Once the global barrier completes, no requests remain on the old shard,
|
||||
// so we can safely switch to acquiring locks only on the new shard.
|
||||
auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token);
|
||||
std::ranges::sort(shards);
|
||||
|
||||
replica_guard replica_guard;
|
||||
replica_guard.resize(shards.size());
|
||||
|
||||
auto acquire = [&shards, &replica_guard, &key, timeout](unsigned index) {
|
||||
return smp::submit_to(shards[index], [&key, timeout] {
|
||||
auto g = make_lw_shared<guard>(_paxos_table_lock, key, timeout);
|
||||
for (size_t i = 0; i < shards.size(); ++i) {
|
||||
replica_guard[i] = co_await smp::submit_to(shards[i], [token, timeout] {
|
||||
auto g = make_lw_shared<guard>(_paxos_table_lock, token, timeout);
|
||||
return g->lock().then([g]{ return make_foreign(std::move(g)); });
|
||||
}).then([&replica_guard, index](guard_foreign_ptr g) {
|
||||
replica_guard[index] = std::move(g);
|
||||
});
|
||||
};
|
||||
|
||||
co_await acquire(0);
|
||||
if (shards.size() > 1) {
|
||||
co_await acquire(1);
|
||||
}
|
||||
|
||||
co_return replica_guard;
|
||||
}
|
||||
|
||||
@@ -86,16 +85,6 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
|
||||
co_return m;
|
||||
}
|
||||
|
||||
static dht::shard_replica_set shards_for_writes(const schema& s, dht::token token) {
|
||||
auto shards = s.table().shard_for_writes(token);
|
||||
if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) {
|
||||
on_internal_error(paxos_state::logger,
|
||||
format("invalid shard, this_shard_id {}, shard_for_writes {}", this_shard_id(), shards));
|
||||
}
|
||||
std::ranges::sort(shards);
|
||||
return shards;
|
||||
}
|
||||
|
||||
future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& paxos_store, tracing::trace_state_ptr tr_state, schema_ptr schema,
|
||||
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
|
||||
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
|
||||
@@ -111,8 +100,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& pa
|
||||
}
|
||||
});
|
||||
|
||||
const auto shards = shards_for_writes(*schema, token);
|
||||
auto guard = co_await get_replica_lock(token, timeout, shards);
|
||||
auto guard = co_await get_replica_lock(*schema, token, timeout);
|
||||
|
||||
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
|
||||
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
|
||||
@@ -203,8 +191,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, paxos_store& paxos_store, tr
|
||||
}
|
||||
});
|
||||
|
||||
const auto shards = shards_for_writes(*schema, token);
|
||||
auto guard = co_await get_replica_lock(token, timeout, shards);
|
||||
auto guard = co_await get_replica_lock(*schema, token, timeout);
|
||||
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
|
||||
paxos_state state = co_await paxos_store.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
@@ -283,6 +270,8 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_
|
||||
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("paxos_state_learn_after_mutate", utils::wait_for_message(5min));
|
||||
|
||||
// We don't need to lock the partition key if there is no gap between loading paxos
|
||||
// state and saving it, and here we're just blindly updating.
|
||||
co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
|
||||
|
||||
@@ -86,8 +86,8 @@ private:
|
||||
|
||||
using guard_foreign_ptr = foreign_ptr<lw_shared_ptr<guard>>;
|
||||
using replica_guard = boost::container::static_vector<guard_foreign_ptr, 2>;
|
||||
static future<replica_guard> get_replica_lock(const dht::token& key, clock_type::time_point timeout,
|
||||
const dht::shard_replica_set& shards);
|
||||
static future<replica_guard> get_replica_lock(const schema& s, const dht::token& token,
|
||||
clock_type::time_point timeout);
|
||||
|
||||
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID();
|
||||
std::optional<proposal> _accepted_proposal;
|
||||
|
||||
@@ -319,7 +319,7 @@ future<> service_level_controller::update_service_levels_cache(qos::query_contex
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::auth_integration::reload_cache() {
|
||||
future<> service_level_controller::auth_integration::reload_cache(qos::query_context ctx) {
|
||||
SCYLLA_ASSERT(this_shard_id() == global_controller);
|
||||
const auto _ = _stop_gate.hold();
|
||||
|
||||
@@ -336,11 +336,12 @@ future<> service_level_controller::auth_integration::reload_cache() {
|
||||
}
|
||||
auto units = co_await get_units(_sl_controller._global_controller_db->notifications_serializer, 1);
|
||||
|
||||
auto& qs = qos_query_state(ctx);
|
||||
auto& role_manager = _auth_service.underlying_role_manager();
|
||||
const auto all_roles = co_await role_manager.query_all();
|
||||
const auto hierarchy = co_await role_manager.query_all_directly_granted();
|
||||
const auto all_roles = co_await role_manager.query_all(qs);
|
||||
const auto hierarchy = co_await role_manager.query_all_directly_granted(qs);
|
||||
// includes only roles with attached service level
|
||||
const auto attributes = co_await role_manager.query_attribute_for_all("service_level");
|
||||
const auto attributes = co_await role_manager.query_attribute_for_all("service_level", qs);
|
||||
|
||||
std::map<sstring, service_level_options> effective_sl_map;
|
||||
|
||||
@@ -403,7 +404,7 @@ future<> service_level_controller::update_cache(update_both_cache_levels update_
|
||||
}
|
||||
|
||||
if (_auth_integration) {
|
||||
co_await _auth_integration->reload_cache();
|
||||
co_await _auth_integration->reload_cache(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -173,7 +173,7 @@ public:
|
||||
future<std::vector<cql3::description>> describe_attached_service_levels();
|
||||
|
||||
/// Must be executed on shard 0.
|
||||
future<> reload_cache();
|
||||
future<> reload_cache(qos::query_context ctx);
|
||||
|
||||
void clear_cache();
|
||||
};
|
||||
|
||||
@@ -18,26 +18,23 @@ namespace service {
|
||||
|
||||
static logging::logger slogger("group0_tombstone_gc_handler");
|
||||
|
||||
raft::server* group0_server_accessor::get_server() const {
|
||||
return _raft_gr.find_server(_group0_id);
|
||||
}
|
||||
|
||||
lowres_clock::duration group0_state_id_handler::get_refresh_interval(const replica::database& db) {
|
||||
return std::chrono::milliseconds{db.get_config().group0_tombstone_gc_refresh_interval_in_ms()};
|
||||
}
|
||||
|
||||
void group0_state_id_handler::refresh() {
|
||||
auto* const group0_server = _server_accessor.get_server();
|
||||
if (!group0_server) {
|
||||
slogger.debug("Skipping due to group0 server not found");
|
||||
return;
|
||||
}
|
||||
// It's not enough to consider only the current group 0 members. In the Raft-based recovery procedure, there can be
|
||||
// nodes that haven't joined the current group 0 yet but they have belonged to a different group 0 and thus have
|
||||
// a non-empty group 0 state ID.
|
||||
//
|
||||
// Ignored nodes are permanently banned, so we can safely filter them out even if they belong to the group 0.
|
||||
const auto& group0_members = std::ranges::join_view(std::to_array({
|
||||
std::views::all(_topo_sm._topology.normal_nodes),
|
||||
std::views::all(_topo_sm._topology.transition_nodes)
|
||||
})) | std::views::keys | std::ranges::views::filter([this] (const raft::server_id& id) {
|
||||
return !_topo_sm._topology.ignored_nodes.contains(id);
|
||||
}) | std::ranges::to<std::vector>();
|
||||
|
||||
const auto& group0_members = std::invoke([&] {
|
||||
auto members = group0_server->get_configuration().current;
|
||||
members.merge(group0_server->get_configuration().previous);
|
||||
return members;
|
||||
});
|
||||
if (group0_members.empty()) {
|
||||
slogger.info("Skipping due to empty group0");
|
||||
return;
|
||||
@@ -48,10 +45,10 @@ void group0_state_id_handler::refresh() {
|
||||
std::vector<raft::server_id> group0_members_missing_endpoint;
|
||||
std::vector<raft::server_id> group0_members_missing_state_id;
|
||||
|
||||
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& member) -> std::optional<utils::UUID> {
|
||||
const auto* state_id_ptr = _gossiper.get_application_state_ptr(locator::host_id{member.addr.id.uuid()}, gms::application_state::GROUP0_STATE_ID);
|
||||
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& id) -> std::optional<utils::UUID> {
|
||||
const auto* state_id_ptr = _gossiper.get_application_state_ptr(locator::host_id{id.uuid()}, gms::application_state::GROUP0_STATE_ID);
|
||||
if (!state_id_ptr) {
|
||||
group0_members_missing_state_id.push_back(member.addr.id);
|
||||
group0_members_missing_state_id.push_back(id);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -100,11 +97,10 @@ void group0_state_id_handler::refresh() {
|
||||
gc_state.update_group0_refresh_time(tombstone_gc_time);
|
||||
}
|
||||
|
||||
group0_state_id_handler::group0_state_id_handler(
|
||||
replica::database& local_db, gms::gossiper& gossiper, group0_server_accessor server_accessor)
|
||||
: _local_db(local_db)
|
||||
group0_state_id_handler::group0_state_id_handler(topology_state_machine& topo_sm, replica::database& local_db, gms::gossiper& gossiper)
|
||||
: _topo_sm(topo_sm)
|
||||
, _local_db(local_db)
|
||||
, _gossiper(gossiper)
|
||||
, _server_accessor(server_accessor)
|
||||
, _refresh_interval(get_refresh_interval(local_db)) {
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "raft/raft.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
@@ -29,25 +30,11 @@ namespace service {
|
||||
|
||||
class raft_group_registry;
|
||||
|
||||
|
||||
class group0_server_accessor {
|
||||
raft_group_registry& _raft_gr;
|
||||
raft::group_id _group0_id;
|
||||
|
||||
public:
|
||||
group0_server_accessor(raft_group_registry& raft_gr, raft::group_id group0_id)
|
||||
: _raft_gr(raft_gr)
|
||||
, _group0_id(group0_id) {
|
||||
}
|
||||
|
||||
[[nodiscard]] raft::server* get_server() const;
|
||||
};
|
||||
|
||||
class group0_state_id_handler {
|
||||
|
||||
topology_state_machine& _topo_sm;
|
||||
replica::database& _local_db;
|
||||
gms::gossiper& _gossiper;
|
||||
group0_server_accessor _server_accessor;
|
||||
lowres_clock::duration _refresh_interval;
|
||||
|
||||
timer<> _timer;
|
||||
@@ -60,7 +47,7 @@ class group0_state_id_handler {
|
||||
void refresh();
|
||||
|
||||
public:
|
||||
group0_state_id_handler(replica::database& local_db, gms::gossiper& gossiper, group0_server_accessor server_accessor);
|
||||
group0_state_id_handler(topology_state_machine& topo_sm, replica::database& local_db, gms::gossiper& gossiper);
|
||||
|
||||
void run();
|
||||
|
||||
|
||||
@@ -54,12 +54,13 @@ namespace service {
|
||||
static logging::logger slogger("group0_raft_sm");
|
||||
|
||||
group0_state_machine::group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
|
||||
group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat,
|
||||
gms::gossiper& gossiper, gms::feature_service& feat,
|
||||
bool topology_change_enabled)
|
||||
: _client(client), _mm(mm), _sp(sp), _ss(ss)
|
||||
, _gate("group0_state_machine")
|
||||
, _topology_change_enabled(topology_change_enabled)
|
||||
, _state_id_handler(sp.local_db(), gossiper, server_accessor), _feature_service(feat)
|
||||
, _state_id_handler(ss._topology_state_machine, sp.local_db(), gossiper)
|
||||
, _feature_service(feat)
|
||||
, _topology_on_raft_support_listener(feat.supports_consistent_topology_changes.when_enabled([this] () noexcept {
|
||||
// Using features to decide whether to start fetching topology snapshots
|
||||
// or not is technically not correct because we also use features to guard
|
||||
|
||||
@@ -118,7 +118,7 @@ class group0_state_machine : public raft_state_machine {
|
||||
future<> merge_and_apply(group0_state_machine_merger& merger);
|
||||
public:
|
||||
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
|
||||
group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat, bool topology_change_enabled);
|
||||
gms::gossiper& gossiper, gms::feature_service& feat, bool topology_change_enabled);
|
||||
future<> apply(std::vector<raft::command_cref> command) override;
|
||||
future<raft::snapshot_id> take_snapshot() override;
|
||||
void drop_snapshot(raft::snapshot_id id) override;
|
||||
|
||||
@@ -497,7 +497,15 @@ future<> group0_voter_handler::update_nodes(
|
||||
};
|
||||
|
||||
// Helper for adding a single node to the nodes list
|
||||
auto add_node = [&nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
|
||||
auto add_node = [this, &nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
|
||||
// Some topology members may not belong to the new group 0 in the Raft-based recovery procedure.
|
||||
if (!group0_config.contains(id)) {
|
||||
if (!_gossiper.get_recovery_leader()) {
|
||||
rvlogger.warn("node {} in state {} is not a part of the group 0 configuration {}, ignoring",
|
||||
id, rs.state, group0_config);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const auto is_voter = group0_config.can_vote(id);
|
||||
const auto is_leader = (id == leader_id);
|
||||
nodes.emplace(id, group0_voter_calculator::node_descriptor{
|
||||
|
||||
@@ -243,7 +243,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) {
|
||||
auto state_machine = std::make_unique<group0_state_machine>(
|
||||
_client, mm, qp.proxy(), ss, group0_server_accessor{_raft_gr, gid}, _gossiper, _feat, topology_change_enabled);
|
||||
_client, mm, qp.proxy(), ss, _gossiper, _feat, topology_change_enabled);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.failure_detector(), gid, my_id);
|
||||
// Keep a reference to a specific RPC class.
|
||||
auto& rpc_ref = *rpc;
|
||||
@@ -745,7 +745,10 @@ future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service
|
||||
} else {
|
||||
// We'll disable them once we complete the upgrade procedure.
|
||||
}
|
||||
} else if (!qp.db().get_config().recovery_leader.is_set()) {
|
||||
} else if (qp.db().get_config().recovery_leader.is_set()) {
|
||||
group0_log.info("Disabling migration_manager schema pulls in the Raft-based recovery procedure");
|
||||
co_await mm.disable_schema_pulls();
|
||||
} else {
|
||||
// Scylla has bootstrapped earlier but group 0 ID is not present and we are not recovering from majority loss
|
||||
// using the Raft-based procedure. This means we're upgrading.
|
||||
// Upgrade will start through a feature listener created after we enter NORMAL state.
|
||||
|
||||
@@ -1599,6 +1599,7 @@ protected:
|
||||
service_permit _permit; // holds admission permit until operation completes
|
||||
db::per_partition_rate_limit::info _rate_limit_info;
|
||||
db::view::update_backlog _view_backlog; // max view update backlog of all participating targets
|
||||
utils::small_vector<gate::holder, 2> _holders;
|
||||
|
||||
protected:
|
||||
virtual bool waited_for(locator::host_id from) = 0;
|
||||
@@ -1630,6 +1631,11 @@ public:
|
||||
if (cancellable) {
|
||||
register_cancellable();
|
||||
}
|
||||
|
||||
attach_to(_proxy->_write_handlers_gate);
|
||||
}
|
||||
void attach_to(gate& g) {
|
||||
_holders.push_back(g.hold());
|
||||
}
|
||||
virtual ~abstract_write_response_handler() {
|
||||
--_stats.writes;
|
||||
@@ -7143,54 +7149,65 @@ void storage_proxy::on_released(const locator::host_id& hid) {
|
||||
}
|
||||
}
|
||||
|
||||
void storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
future<> storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
|
||||
gate g;
|
||||
auto it = _cancellable_write_handlers_list->begin();
|
||||
while (it != _cancellable_write_handlers_list->end()) {
|
||||
auto guard = it->shared_from_this();
|
||||
if (filter_fun(*it) && _response_handlers.contains(it->id())) {
|
||||
it->timeout_cb();
|
||||
// timeout_cb() may destroy the current handler. Since the list uses
|
||||
// bi::link_mode<bi::auto_unlink>, destruction automatically unlinks the
|
||||
// element. To avoid accessing an invalidated iterator, we cache `next`
|
||||
// before invoking timeout_cb().
|
||||
const auto next = std::next(it);
|
||||
|
||||
if (filter_fun(*it)) {
|
||||
it->attach_to(g);
|
||||
if (_response_handlers.contains(it->id())) {
|
||||
it->timeout_cb();
|
||||
}
|
||||
}
|
||||
++it;
|
||||
it = next;
|
||||
if (need_preempt() && it != _cancellable_write_handlers_list->end()) {
|
||||
// Save the iterator position. If the handler is destroyed during yield(),
|
||||
// the iterator will be updated to point to the next item in the list.
|
||||
//
|
||||
// Handler destruction triggers sending a response to the client.
|
||||
// To avoid delaying that, we don’t hold a strong reference here; instead,
|
||||
// iterator_guard handles safe iterator updates while allowing prompt
|
||||
// handler destruction and client response.
|
||||
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
|
||||
seastar::thread::yield();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_await g.close();
|
||||
}
|
||||
|
||||
void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id id) {
|
||||
// FIXME: make gossiper notifictaions to pass host ids
|
||||
return cancel_write_handlers([id] (const abstract_write_response_handler& handler) {
|
||||
cancel_write_handlers([id] (const abstract_write_response_handler& handler) {
|
||||
const auto& targets = handler.get_targets();
|
||||
return std::ranges::find(targets, id) != targets.end();
|
||||
});
|
||||
}).get();
|
||||
};
|
||||
|
||||
future<> storage_proxy::drain_on_shutdown() {
|
||||
//NOTE: the thread is spawned here because there are delicate lifetime issues to consider
|
||||
// and writing them down with plain futures is error-prone.
|
||||
return async([this] {
|
||||
cancel_all_write_response_handlers().get();
|
||||
_hints_resource_manager.stop().get();
|
||||
});
|
||||
co_await cancel_all_write_response_handlers();
|
||||
co_await _hints_resource_manager.stop();
|
||||
}
|
||||
|
||||
future<> storage_proxy::abort_view_writes() {
|
||||
return async([this] {
|
||||
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_view(); });
|
||||
return cancel_write_handlers([] (const abstract_write_response_handler& handler) {
|
||||
return handler.is_view();
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_proxy::abort_batch_writes() {
|
||||
return async([this] {
|
||||
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_batch(); });
|
||||
return cancel_write_handlers([] (const abstract_write_response_handler& handler) {
|
||||
return handler.is_batch();
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::stop() {
|
||||
return make_ready_future<>();
|
||||
co_await utils::get_local_injector().inject("storage_proxy::stop", utils::wait_for_message(5min));
|
||||
}
|
||||
|
||||
locator::token_metadata_ptr storage_proxy::get_token_metadata_ptr() const noexcept {
|
||||
@@ -7202,12 +7219,11 @@ future<utils::chunked_vector<dht::token_range_endpoints>> storage_proxy::describ
|
||||
}
|
||||
|
||||
future<> storage_proxy::cancel_all_write_response_handlers() {
|
||||
auto f = _write_handlers_gate.close();
|
||||
while (!_response_handlers.empty()) {
|
||||
_response_handlers.begin()->second->timeout_cb();
|
||||
|
||||
if (!_response_handlers.empty()) {
|
||||
co_await maybe_yield();
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_await std::move(f);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,6 +318,8 @@ private:
|
||||
class cancellable_write_handlers_list;
|
||||
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
|
||||
|
||||
gate _write_handlers_gate;
|
||||
|
||||
/* This is a pointer to the shard-local part of the sharded cdc_service:
|
||||
* storage_proxy needs access to cdc_service to augment mutations.
|
||||
*
|
||||
@@ -500,7 +502,7 @@ private:
|
||||
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);
|
||||
|
||||
// Retires (times out) write response handlers which were constructed as `cancellable` and pass the given filter.
|
||||
void cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
|
||||
future<> cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
|
||||
|
||||
/**
|
||||
* Returns whether for a range query doing a query against merged is likely
|
||||
|
||||
@@ -5928,7 +5928,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
|
||||
const auto current_version = ss._shared_token_metadata.get()->get_version();
|
||||
rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
|
||||
rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
|
||||
version, current_version);
|
||||
|
||||
// This shouldn't happen under normal operation, it's only plausible
|
||||
@@ -5949,7 +5949,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
co_await get_topology_session_manager().drain_closing_sessions();
|
||||
|
||||
rtlogger.debug("raft_topology_cmd::barrier_and_drain done");
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {
|
||||
@@ -6693,11 +6693,14 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
|
||||
// repair can only be requested for the base table, and this will repair the base table's tablets
|
||||
// and all its colocated tablets as well.
|
||||
if (!get_token_metadata().tablets().is_base_table(table)) {
|
||||
auto table_schema = _db.local().find_schema(table);
|
||||
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
|
||||
|
||||
throw std::invalid_argument(::format(
|
||||
"Cannot set repair request on table {} because it is colocated with the base table {}. "
|
||||
"Cannot set repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
|
||||
"Repair requests can be made only on the base table. "
|
||||
"Repairing the base table will also repair all tables colocated with it.",
|
||||
table, get_token_metadata().tablets().get_base_table(table)));
|
||||
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
|
||||
}
|
||||
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
@@ -6777,10 +6780,13 @@ future<> storage_service::del_repair_tablet_request(table_id table, locator::tab
|
||||
|
||||
// see add_repair_tablet_request. repair requests can only be added on base tables.
|
||||
if (!get_token_metadata().tablets().is_base_table(table)) {
|
||||
auto table_schema = _db.local().find_schema(table);
|
||||
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
|
||||
|
||||
throw std::invalid_argument(::format(
|
||||
"Cannot delete repair request on table {} because it is colocated with the base table {}. "
|
||||
"Cannot delete repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
|
||||
"Repair requests can be added and deleted only on the base table.",
|
||||
table, get_token_metadata().tablets().get_base_table(table)));
|
||||
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
|
||||
}
|
||||
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
@@ -7163,6 +7169,20 @@ future<> storage_service::await_topology_quiesced() {
|
||||
co_await _topology_state_machine.await_not_busy();
|
||||
}
|
||||
|
||||
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
|
||||
auto holder = _async_gate.hold();
|
||||
|
||||
if (this_shard_id() != 0) {
|
||||
// group0 is only set on shard 0.
|
||||
co_return co_await container().invoke_on(0, [&] (auto& ss) {
|
||||
return ss.verify_topology_quiesced(expected_version);
|
||||
});
|
||||
}
|
||||
|
||||
co_await _group0->group0_server().read_barrier(&_group0_as);
|
||||
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
|
||||
}
|
||||
|
||||
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
|
||||
join_node_request_result result;
|
||||
rtlogger.info("received request to join from host_id: {}", params.host_id);
|
||||
@@ -8135,6 +8155,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) {
|
||||
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
|
||||
}
|
||||
|
||||
bool storage_service::raft_topology_change_enabled() const {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
|
||||
}
|
||||
return _topology_change_kind_enabled == topology_change_kind::raft;
|
||||
}
|
||||
|
||||
bool storage_service::legacy_topology_change_enabled() const {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
|
||||
}
|
||||
return _topology_change_kind_enabled == topology_change_kind::legacy;
|
||||
}
|
||||
|
||||
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
|
||||
_protocol_servers.push_back(&server);
|
||||
if (start_instantly) {
|
||||
@@ -8150,7 +8184,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n
|
||||
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
|
||||
}
|
||||
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
|
||||
}
|
||||
|
||||
|
||||
@@ -358,7 +358,7 @@ public:
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
private:
|
||||
inet_address get_broadcast_address() const noexcept {
|
||||
@@ -870,12 +870,8 @@ private:
|
||||
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
|
||||
|
||||
public:
|
||||
bool raft_topology_change_enabled() const {
|
||||
return _topology_change_kind_enabled == topology_change_kind::raft;
|
||||
}
|
||||
bool legacy_topology_change_enabled() const {
|
||||
return _topology_change_kind_enabled == topology_change_kind::legacy;
|
||||
}
|
||||
bool raft_topology_change_enabled() const;
|
||||
bool legacy_topology_change_enabled() const;
|
||||
|
||||
private:
|
||||
future<> _raft_state_monitor = make_ready_future<>();
|
||||
@@ -1002,7 +998,11 @@ public:
|
||||
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
||||
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
||||
future<> set_tablet_balancing_enabled(bool);
|
||||
|
||||
future<> await_topology_quiesced();
|
||||
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
|
||||
// by the caller.
|
||||
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
|
||||
|
||||
// In the maintenance mode, other nodes won't be available thus we disabled joining
|
||||
// the token ring and the token metadata won't be populated with the local node's endpoint.
|
||||
|
||||
@@ -760,20 +760,22 @@ public:
|
||||
const locator::topology& topo = _tm->get_topology();
|
||||
migration_plan plan;
|
||||
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces()) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
|
||||
plan.merge(std::move(rack_plan));
|
||||
if (!utils::get_local_injector().enter("tablet_migration_bypass")) {
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces()) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
|
||||
plan.merge(std::move(rack_plan));
|
||||
}
|
||||
} else {
|
||||
auto dc_plan = co_await make_plan(dc);
|
||||
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
|
||||
plan.merge(std::move(dc_plan));
|
||||
}
|
||||
} else {
|
||||
auto dc_plan = co_await make_plan(dc);
|
||||
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
|
||||
plan.merge(std::move(dc_plan));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -145,6 +145,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
std::unordered_map<locator::host_id, locator::load_stats> _load_stats_per_node;
|
||||
serialized_action _tablet_load_stats_refresh;
|
||||
|
||||
static constexpr std::chrono::seconds cdc_streams_gc_refresh_interval = std::chrono::seconds(60);
|
||||
|
||||
std::chrono::milliseconds _ring_delay;
|
||||
|
||||
gate::holder _group0_holder;
|
||||
@@ -765,6 +767,42 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
}
|
||||
|
||||
future<> cdc_streams_gc_fiber() {
|
||||
auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); };
|
||||
while (can_proceed()) {
|
||||
bool sleep = true;
|
||||
try {
|
||||
auto guard = co_await start_operation();
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
|
||||
co_await _cdc_gens.garbage_collect_cdc_streams(updates, guard.write_timestamp());
|
||||
|
||||
if (!updates.empty()) {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "CDC streams GC");
|
||||
} else {
|
||||
release_guard(std::move(guard));
|
||||
}
|
||||
} catch (raft::request_aborted&) {
|
||||
rtlogger.debug("CDC streams GC fiber aborted");
|
||||
sleep = false;
|
||||
} catch (seastar::abort_requested_exception&) {
|
||||
rtlogger.debug("CDC streams GC fiber aborted");
|
||||
sleep = false;
|
||||
} catch (...) {
|
||||
rtlogger.warn("CDC streams GC fiber got error {}", std::current_exception());
|
||||
}
|
||||
auto refresh_interval = utils::get_local_injector().is_enabled("short_cdc_streams_gc_refresh_interval") ?
|
||||
std::chrono::seconds(1) : cdc_streams_gc_refresh_interval;
|
||||
if (sleep && can_proceed()) {
|
||||
try {
|
||||
co_await seastar::sleep_abortable(refresh_interval, _as);
|
||||
} catch (...) {
|
||||
rtlogger.debug("CDC streams GC: sleep failed: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If a node crashes after initiating gossip and before joining group0, it becomes orphan and
|
||||
// remains in gossip. This background fiber periodically checks for such nodes and purges those
|
||||
// entries from gossiper by committing the node as left in raft.
|
||||
@@ -1471,10 +1509,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
|
||||
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
|
||||
}
|
||||
utils::get_local_injector().inject("stream_tablet_fail",
|
||||
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
|
||||
|
||||
if (action_failed(tablet_state.streaming)) {
|
||||
if (action_failed(tablet_state.streaming) || utils::get_local_injector().enter("stream_tablet_fail")) {
|
||||
const bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
|
||||
bool critical_disk_utilization = false;
|
||||
if (auto stats = _tablet_allocator.get_load_stats()) {
|
||||
@@ -1619,8 +1655,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_transition(last_token)
|
||||
.del_migration_task_info(last_token, _feature_service)
|
||||
.build());
|
||||
if (trinfo.pending_replica) {
|
||||
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *trinfo.pending_replica, last_token);
|
||||
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
|
||||
if (leaving_replica) {
|
||||
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *leaving_replica, last_token);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -1854,6 +1891,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// of token metadata will complete before we update topology.
|
||||
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
|
||||
|
||||
co_await utils::get_local_injector().inject("tablet_resize_finalization_post_barrier", utils::wait_for_message(std::chrono::minutes(2)));
|
||||
|
||||
auto tm = get_token_metadata_ptr();
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());
|
||||
|
||||
@@ -1907,7 +1946,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_transition_state()
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet split finalization"));
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
|
||||
}
|
||||
|
||||
future<> handle_truncate_table(group0_guard guard) {
|
||||
@@ -3708,6 +3747,7 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await fence_previous_coordinator();
|
||||
auto cdc_generation_publisher = cdc_generation_publisher_fiber();
|
||||
auto cdc_streams_gc = cdc_streams_gc_fiber();
|
||||
auto tablet_load_stats_refresher = start_tablet_load_stats_refresher();
|
||||
auto gossiper_orphan_remover = gossiper_orphan_remover_fiber();
|
||||
auto group0_voter_refresher = group0_voter_refresher_fiber();
|
||||
@@ -3751,6 +3791,7 @@ future<> topology_coordinator::run() {
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
co_await std::move(group0_voter_refresher);
|
||||
co_await std::move(vb_coordinator_fiber);
|
||||
|
||||
@@ -539,17 +539,13 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
|
||||
}
|
||||
}
|
||||
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
|
||||
void compression_parameters::validate(dicts_feature_enabled dicts_enabled) const {
|
||||
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
|
||||
if (!dicts_enabled) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
|
||||
"all nodes are upgraded to a versions which supports it",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
if (!dicts_allowed) {
|
||||
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
|
||||
algorithm_to_name(_algorithm)));
|
||||
}
|
||||
}
|
||||
if (_chunk_length) {
|
||||
auto chunk_length = _chunk_length.value();
|
||||
|
||||
@@ -106,8 +106,7 @@ public:
|
||||
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
|
||||
|
||||
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
|
||||
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
|
||||
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
|
||||
void validate(dicts_feature_enabled) const;
|
||||
|
||||
std::map<sstring, sstring> get_options() const;
|
||||
|
||||
|
||||
@@ -3280,7 +3280,7 @@ future<uint64_t> sstable::estimated_keys_for_range(const dht::token_range& range
|
||||
std::vector<unsigned>
|
||||
sstable::compute_shards_for_this_sstable(const dht::sharder& sharder_) const {
|
||||
std::unordered_set<unsigned> shards;
|
||||
dht::partition_range_vector token_ranges;
|
||||
utils::chunked_vector<dht::partition_range> token_ranges;
|
||||
const auto* sm = _components->scylla_metadata
|
||||
? _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>()
|
||||
: nullptr;
|
||||
@@ -3298,7 +3298,7 @@ sstable::compute_shards_for_this_sstable(const dht::sharder& sharder_) const {
|
||||
};
|
||||
token_ranges = sm->token_ranges.elements
|
||||
| std::views::transform(disk_token_range_to_ring_position_range)
|
||||
| std::ranges::to<dht::partition_range_vector>();
|
||||
| std::ranges::to<utils::chunked_vector<dht::partition_range>>();
|
||||
}
|
||||
sstlog.trace("{}: token_ranges={}", get_filename(), token_ranges);
|
||||
auto sharder = dht::ring_position_range_vector_sharder(sharder_, std::move(token_ranges));
|
||||
@@ -3642,7 +3642,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
auto cached_partitions_file = caching == use_caching::yes
|
||||
? _cached_partitions_file
|
||||
: seastar::make_shared<cached_file>(
|
||||
_partitions_file,
|
||||
uncached_partitions_file(),
|
||||
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
||||
_manager.get_cache_tracker().get_lru(),
|
||||
_manager.get_cache_tracker().region(),
|
||||
@@ -3652,7 +3652,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
auto cached_rows_file = caching == use_caching::yes
|
||||
? _cached_rows_file
|
||||
: seastar::make_shared<cached_file>(
|
||||
_rows_file,
|
||||
uncached_rows_file(),
|
||||
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
||||
_manager.get_cache_tracker().get_lru(),
|
||||
_manager.get_cache_tracker().region(),
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user