Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
cb0d8a38f1 Fix stack-use-after-return in client_routes delete/set functions
Change lambda captures in set_client_routes and delete_client_routes to move
route_keys/route_entries into the inner lambda instead of capturing by reference.
This prevents use-after-return when the with_retry coroutine suspends.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-21 07:54:34 +00:00
copilot-swe-agent[bot]
12787302bf Initial plan 2025-12-21 07:50:24 +00:00
529 changed files with 3830 additions and 15959 deletions

View File

@@ -84,14 +84,3 @@ ninja build/<mode>/scylla
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context
## Test Philosophy
- Performance matters. Tests should run as quickly as possible. Sleeps in the code are highly discouraged and should be avoided, to reduce run time and flakiness.
- Stability matters. Tests should be stable. New tests should be executed 100 times at least to ensure they pass 100 out of 100 times. (use --repeat 100 --max-failures 1 when running it)
- Unit tests should ideally test one thing and one thing only.
- Tests for bug fixes should run before the fix - and show the failure and after the fix - and show they now pass.
- Tests for bug fixes should have in their comments which bug fixes (GitHub or JIRA issue) they test.
- Tests in debug are always slower, so if needed, reduce number of iterations, rows, data used, cycles, etc. in debug mode.
- Tests should strive to be repeatable, and not use random input that will make their results unpredictable.
- Tests should consume as little resources as possible. Prefer running tests on a single node if it is sufficient, for example.

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Progress
on:
pull_request_target:
types: [opened]
jobs:
call-jira-status-in-progress:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Review
on:
pull_request_target:
types: [ready_for_review, review_requested]
jobs:
call-jira-status-in-review:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status Ready For Merge
on:
pull_request_target:
types: [labeled]
jobs:
call-jira-status-update:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,41 +0,0 @@
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
pull-requests: write
issues: write
jobs:
jira-sync-pr-opened:
if: github.event.action == 'opened'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-in-review:
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-add-label:
if: github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-remove-label:
if: github.event.action == 'unlabeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-pr-closed:
if: github.event.action == 'closed'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,13 +0,0 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -13,5 +13,5 @@ jobs:
- uses: codespell-project/actions-codespell@master
with:
only_warn: 1
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"

View File

@@ -18,8 +18,6 @@ on:
jobs:
release:
permissions:
contents: write
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -2,9 +2,6 @@ name: "Docs / Build PR"
# For more information,
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows
permissions:
contents: read
env:
FLAG: ${{ github.repository == 'scylladb/scylla-enterprise' && 'enterprise' || 'opensource' }}

View File

@@ -1,8 +1,5 @@
name: Docs / Validate metrics
permissions:
contents: read
on:
pull_request:
branches:

View File

@@ -10,8 +10,6 @@ on:
jobs:
read-toolchain:
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
image: ${{ steps.read.outputs.image }}
steps:

View File

@@ -18,7 +18,6 @@ target_sources(alternator
consumed_capacity.cc
ttl.cc
parsed_expression_cache.cc
http_compression.cc
${cql_grammar_srcs})
target_include_directories(alternator
PUBLIC

View File

@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
controller::controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -40,7 +39,6 @@ controller::controller(
: protocol_server(sg)
, _gossiper(gossiper)
, _proxy(proxy)
, _ss(ss)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
@@ -91,7 +89,7 @@ future<> controller::start_server() {
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
@@ -105,23 +103,11 @@ future<> controller::start_server() {
alternator_port = _config.alternator_port();
_listen_addresses.push_back({addr, *alternator_port});
}
std::optional<uint16_t> alternator_port_proxy_protocol;
if (_config.alternator_port_proxy_protocol()) {
alternator_port_proxy_protocol = _config.alternator_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_port_proxy_protocol});
}
std::optional<uint16_t> alternator_https_port;
std::optional<uint16_t> alternator_https_port_proxy_protocol;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port() || _config.alternator_https_port_proxy_protocol()) {
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
}
if (_config.alternator_https_port_proxy_protocol()) {
alternator_https_port_proxy_protocol = _config.alternator_https_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_https_port_proxy_protocol});
}
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
@@ -147,29 +133,20 @@ future<> controller::start_server() {
}
}
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds,
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}, proxy-protocol port {}, TLS proxy-protocol port {}: {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF",
ep);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
}).then([addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}, proxy-protocol port {}, TLS proxy-protocol port {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF");
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
@@ -192,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -15,7 +15,6 @@
namespace service {
class storage_proxy;
class storage_service;
class migration_manager;
class memory_limiter;
}
@@ -58,7 +57,6 @@ class server;
class controller : public protocol_server {
sharded<gms::gossiper>& _gossiper;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<cdc::generation_service>& _cdc_gen_svc;
@@ -76,7 +74,6 @@ public:
controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -96,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -67,14 +67,6 @@ using namespace std::chrono_literals;
logging::logger elogger("alternator-executor");
namespace std {
template <> struct hash<std::pair<sstring, sstring>> {
size_t operator () (const std::pair<sstring, sstring>& p) const {
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
}
};
}
namespace alternator {
// Alternator-specific table properties stored as hidden table tags:
@@ -256,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
return *(v.MemberBegin());
}
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
executor &_executor;
struct table_info {
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
};
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
bool active = false;
public:
describe_table_info_manager(executor& executor) : _executor(executor) {
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
active = true;
}
describe_table_info_manager(const describe_table_info_manager &) = delete;
describe_table_info_manager(describe_table_info_manager&&) = delete;
~describe_table_info_manager() {
if (active) {
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
}
}
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
static std::chrono::high_resolution_clock::time_point now() {
return std::chrono::high_resolution_clock::now();
}
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
auto it = info_for_tables.find({ks_name, cf_name});
if (it != info_for_tables.end()) {
return it->second.size_in_bytes.get();
}
return std::nullopt;
}
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
}
future<> stop() {
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
active = false;
co_return;
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
info_for_tables.erase({ks_name, cf_name});
}
};
executor::executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
smp_service_group ssg,
utils::updateable_value<uint32_t> default_timeout_in_ms)
: _gossiper(gossiper),
_ss(ss),
_proxy(proxy),
_mm(mm),
_sdks(sdks),
@@ -328,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
_stats))
{
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
register_metrics(_metrics, _stats);
}
@@ -813,44 +752,12 @@ static future<bool> is_view_built(
}
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
auto expiry = describe_table_info_manager::now() + ttl;
return container().invoke_on_all(
[schema, size_in_bytes, expiry] (executor& exec) {
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
});
}
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
std::uint64_t total_size = 0;
if (cached_size) {
total_size = *cached_size;
} else {
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
// move forward with deletion faster than we calculate the size
if (!deleting) {
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}
rjson::add(table_description, "TableSizeBytes", total_size);
}
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
{
rjson::value table_description = rjson::empty_object();
auto tags_ptr = db::get_tags_of_table(schema);
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
auto creation_timestamp = get_table_creation_time(*schema);
@@ -894,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
if (tbl_status != table_status::deleting) {
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
@@ -931,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
// (for a built view) or CREATING+Backfilling (if view building
// is in progress).
if (!is_lsi) {
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
rjson::add(view_entry, "IndexStatus", "ACTIVE");
} else {
rjson::add(view_entry, "IndexStatus", "CREATING");
@@ -959,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
}
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
}
executor::supplement_table_stream_info(table_description, *schema, _proxy);
executor::supplement_table_stream_info(table_description, *schema, proxy);
// FIXME: still missing some response fields (issue #5026)
co_return table_description;
}
@@ -980,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
rjson::add(response, "Table", std::move(table_description));
elogger.trace("returning {}", response);
@@ -1083,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
@@ -1647,7 +1557,8 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1834,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
if (executor::add_stream_options(*stream_specification, builder, sp)) {
validate_cdc_log_name_length(builder.cf_name());
}
}
@@ -1853,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1869,18 +1780,18 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
}
size_t retries = _mm.get_concurrent_ddl_retries();
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
auto group0_guard = co_await _mm.start_group0_operation();
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
const auto& topo = sp.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
@@ -1890,17 +1801,17 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
if (!view_builders.empty() && ksm->uses_tablets() && !sp.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
}
try {
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
} catch (exceptions::already_exists_exception&) {
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
}
}
if (_proxy.data_dictionary().try_find_table(schema->id())) {
if (sp.data_dictionary().try_find_table(schema->id())) {
// This should never happen, the ID is supposed to be unique
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
}
@@ -1909,9 +1820,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
for (schema_builder& view_builder : view_builders) {
schemas.push_back(view_builder.build());
}
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
if (ksm->uses_tablets()) {
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
}
// If a role is allowed to create a table, we must give it permissions to
@@ -1936,7 +1847,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
try {
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
break;
} catch (const service::group0_concurrent_modification& ex) {
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
@@ -1948,9 +1859,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
}
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, _proxy);
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return rjson::print(std::move(status));
}
@@ -1959,11 +1870,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
});
}
@@ -5986,11 +5896,6 @@ future<executor::request_return_type> executor::list_tables(client_state& client
_stats.api_operations.list_tables++;
elogger.trace("Listing tables {}", request);
co_await utils::get_local_injector().inject("alternator_list_tables", [] (auto& handler) -> future<> {
handler.set("waiting", true);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
});
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
@@ -6182,10 +6087,9 @@ future<> executor::start() {
}
future<> executor::stop() {
co_await _describe_table_info_manager->stop();
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
co_await _parsed_expression_cache->stop();
return _parsed_expression_cache->stop();
}
} // namespace alternator

View File

@@ -17,13 +17,11 @@
#include "service/client_state.hh"
#include "service_permit.hh"
#include "db/timeout_clock.hh"
#include "db/config.hh"
#include "alternator/error.hh"
#include "stats.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
#include "utils/simple_value_with_expiry.hh"
#include "tracing/trace_state.hh"
@@ -43,7 +41,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
class storage_service;
}
namespace cdc {
@@ -60,7 +57,6 @@ class schema_builder;
namespace alternator {
enum class table_status;
class rmw_operation;
class put_or_delete_item;
@@ -140,7 +136,6 @@ class expression_cache;
class executor : public peering_sharded_service<executor> {
gms::gossiper& _gossiper;
service::storage_service& _ss;
service::storage_proxy& _proxy;
service::migration_manager& _mm;
db::system_distributed_keyspace& _sdks;
@@ -153,11 +148,6 @@ class executor : public peering_sharded_service<executor> {
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
struct describe_table_info_manager;
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
public:
using client_state = service::client_state;
// request_return_type is the return type of the executor methods, which
@@ -183,7 +173,6 @@ public:
executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
@@ -231,8 +220,6 @@ private:
friend class rmw_operation;
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,

View File

@@ -1,301 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "alternator/http_compression.hh"
#include "alternator/server.hh"
#include <seastar/coroutine/maybe_yield.hh>
#include <zlib.h>
static logging::logger slogger("alternator-http-compression");
namespace alternator {
static constexpr size_t compressed_buffer_size = 1024;
class zlib_compressor {
z_stream _zs;
temporary_buffer<char> _output_buf;
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
public:
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
: _write_func(std::move(write_func)) {
memset(&_zs, 0, sizeof(_zs));
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~zlib_compressor() {
deflateEnd(&_zs);
}
future<> close() {
return compress(nullptr, 0, true);
}
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
_zs.avail_in = (uInt) len;
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
while(_zs.avail_in > 0 || is_last_chunk) {
co_await coroutine::maybe_yield();
if (_output_buf.empty()) {
if (is_last_chunk) {
uint32_t max_buffer_size = 0;
deflatePending(&_zs, &max_buffer_size, nullptr);
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
} else {
_output_buf = temporary_buffer<char>(compressed_buffer_size);
}
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
_zs.avail_out = compressed_buffer_size;
}
int e = deflate(&_zs, mode);
if (e < Z_OK) {
throw api_error::internal("Error during compression of response body");
}
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
co_await _write_func(std::move(_output_buf));
if (e == Z_STREAM_END) {
break;
}
}
}
}
};
// Helper string_view functions for parsing Accept-Encoding header
struct case_insensitive_cmp_sv {
bool operator()(std::string_view s1, std::string_view s2) const {
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
}
};
static inline std::string_view trim_left(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
sv.remove_prefix(1);
return sv;
}
static inline std::string_view trim_right(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
sv.remove_suffix(1);
return sv;
}
static inline std::string_view trim(std::string_view sv) {
return trim_left(trim_right(sv));
}
inline std::vector<std::string_view> split(std::string_view text, char separator) {
std::vector<std::string_view> tokens;
if (text == "") {
return tokens;
}
while (true) {
auto pos = text.find_first_of(separator);
if (pos != std::string_view::npos) {
tokens.emplace_back(text.data(), pos);
text.remove_prefix(pos + 1);
} else {
tokens.emplace_back(text);
break;
}
}
return tokens;
}
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
return static_cast<compression_type>(i);
}
}
return compression_type::unknown;
}
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
compression_type selected_ct = compression_type::none;
std::vector<std::string_view> entries = split(accept_encoding, ',');
for (auto& e : entries) {
std::vector<std::string_view> params = split(e, ';');
if (params.size() == 0) {
continue;
}
compression_type ct = get_compression_type(trim(params[0]));
if (ct == compression_type::unknown) {
continue; // ignore unknown encoding types
}
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
continue; // already processed this encoding
}
if (response_size < _threshold[static_cast<size_t>(ct)]) {
continue; // below threshold treat as unknown
}
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
auto pos = params[i].find("q=");
if (pos == std::string_view::npos) {
continue;
}
std::string_view param = params[i].substr(pos + 2);
param = trim(param);
// parse quality value
float q_value = 1.0f;
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
if (ec != std::errc() || ptr != param.data() + param.size()) {
continue;
}
if (q_value < 0.0) {
q_value = 0.0;
} else if (q_value > 1.0) {
q_value = 1.0;
}
ct_q[static_cast<size_t>(ct)] = q_value;
break; // we parsed quality value
}
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
}
// keep the highest encoding (in the order, unless 'any')
if (selected_ct == compression_type::any) {
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
} else {
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
}
}
if (selected_ct == compression_type::any) {
// select any not mentioned or highest quality
selected_ct = compression_type::none;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (!ct_q[i].has_value()) {
return static_cast<compression_type>(i);
}
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = static_cast<compression_type>(i);
}
}
}
return selected_ct;
}
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
chunked_content compressed;
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
compressed.push_back(std::move(buf));
return make_ready_future<>();
};
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
cfg.alternator_response_gzip_compression_level(), std::move(write));
co_await compressor.compress(str.data(), str.size(), true);
co_return compressed;
}
static sstring flatten(chunked_content&& cc) {
size_t total_size = 0;
for (const auto& chunk : cc) {
total_size += chunk.size();
}
sstring result = sstring{ sstring::initialized_later{}, total_size };
size_t offset = 0;
for (const auto& chunk : cc) {
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
offset += chunk.size();
}
return result;
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->set_content_type(content_type);
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
rep->_content = flatten(std::move(compressed));
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
});
} else {
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(response_body);
rep->set_content_type(content_type);
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
template<typename Compressor>
class compressed_data_sink_impl : public data_sink_impl {
output_stream<char> _out;
Compressor _compressor;
public:
template<typename... Args>
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
return _out.write(std::move(buf));
}) { }
future<> put(std::span<temporary_buffer<char>> data) override {
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
return do_put(std::move(buf));
});
}
private:
future<> do_put(temporary_buffer<char> buf) {
co_return co_await _compressor.compress(buf.get(), buf.size());
}
future<> close() override {
return _compressor.close().then([this] {
return _out.close();
});
}
};
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
output_stream_options opts;
opts.trim_to_size = true;
std::unique_ptr<data_sink_impl> data_sink_impl;
switch (ct) {
case response_compressor::compression_type::gzip:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
break;
case response_compressor::compression_type::deflate:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
break;
case response_compressor::compression_type::none:
case response_compressor::compression_type::any:
case response_compressor::compression_type::unknown:
on_internal_error(slogger,"Compression not selected");
default:
on_internal_error(slogger, "Unsupported compression type for data sink");
}
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
};
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
} else {
rep->write_body(content_type, std::move(body_writer));
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
} // namespace alternator

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "alternator/executor.hh"
#include <seastar/http/httpd.hh>
#include "db/config.hh"
namespace alternator {
class response_compressor {
public:
enum class compression_type {
gzip,
deflate,
compressions_count,
any = compressions_count,
none,
count,
unknown = count
};
static constexpr std::string_view compression_names[] = {
"gzip",
"deflate",
"*",
"identity"
};
static sstring get_encoding_name(compression_type ct) {
return sstring(compression_names[static_cast<size_t>(ct)]);
}
static constexpr compression_type get_compression_type(std::string_view encoding);
sstring get_accepted_encoding(const http::request& req) {
if (get_threshold() == 0) {
return "";
}
return req.get_header("Accept-Encoding");
}
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
response_compressor(const db::config& cfg)
: cfg(cfg)
,_gzip_level_observer(
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
update_threshold();
}))
,_gzip_threshold_observer(
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
update_threshold();
}))
{
update_threshold();
}
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
private:
const db::config& cfg;
utils::observable<int>::observer _gzip_level_observer;
utils::observable<uint32_t>::observer _gzip_threshold_observer;
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
void update_threshold() {
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
: cfg.alternator_response_compression_threshold_in_bytes();
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
}
}
}
public:
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, std::string&& response_body);
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
};
}

View File

@@ -34,7 +34,6 @@
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
#include "alternator/http_compression.hh"
static logging::logger slogger("alternator-server");
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
// type applies to all replies, both success and error.
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
const db::config& config) : _response_compressor(config), _f_handle(
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of
@@ -137,20 +133,22 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
auto res = resf.get();
return std::visit(overloaded_functor {
std::visit(overloaded_functor {
[&] (std::string&& str) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(str));
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(str);
rep->set_content_type(REPLY_CONTENT_TYPE);
},
[&] (executor::body_writer&& body_writer) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(body_writer));
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
}, std::move(res));
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}) { }
@@ -179,7 +177,6 @@ protected:
slogger.trace("api_handler error case: {}", rep._content);
}
response_compressor _response_compressor;
future_handler_function _f_handle;
};
@@ -374,40 +371,13 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
for (const auto& header : signed_headers) {
signed_headers_map.emplace(header, std::string_view());
}
std::vector<std::string> modified_values;
for (auto& header : req._headers) {
std::string header_str;
header_str.resize(header.first.size());
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
auto it = signed_headers_map.find(header_str);
if (it != signed_headers_map.end()) {
// replace multiple spaces in the header value header.second with
// a single space, as required by AWS SigV4 header canonization.
// If we modify the value, we need to save it in modified_values
// to keep it alive.
std::string value;
value.reserve(header.second.size());
bool prev_space = false;
bool modified = false;
for (char ch : header.second) {
if (ch == ' ') {
if (!prev_space) {
value += ch;
prev_space = true;
} else {
modified = true; // skip a space
}
} else {
value += ch;
prev_space = false;
}
}
if (modified) {
modified_values.emplace_back(std::move(value));
it->second = std::string_view(modified_values.back());
} else {
it->second = std::string_view(header.second);
}
it->second = std::string_view(header.second);
}
}
@@ -420,7 +390,6 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
datestamp = std::move(datestamp),
signed_headers_str = std::move(signed_headers_str),
signed_headers_map = std::move(signed_headers_map),
modified_values = std::move(modified_values),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
@@ -591,11 +560,11 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
class safe_gzip_zstream {
z_stream _zs;
public:
// If gzip is true, decode a gzip header (for "Content-Encoding: gzip").
// Otherwise, a zlib header (for "Content-Encoding: deflate").
safe_gzip_zstream(bool gzip = true) {
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
if (inflateInit2(&_zs, gzip ? 16 + MAX_WBITS : MAX_WBITS) != Z_OK) {
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
@@ -614,21 +583,19 @@ public:
}
};
// ungzip() takes a chunked_content of a compressed request body, and returns
// the uncompressed content as a chunked_content. If gzip is true, we expect
// gzip header (for "Content-Encoding: gzip"), if gzip is false, we expect a
// zlib header (for "Content-Encoding: deflate").
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit, bool gzip = true) {
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm(gzip);
bool complete_stream = false; // empty input is not a valid gzip/deflate
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
@@ -731,8 +698,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (content_encoding == "deflate") {
content = co_await ungzip(std::move(content), request_content_length_limit, false);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
@@ -743,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), std::move(user_agent_header),
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -793,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
void server::set_routes(routes& r) {
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
return handle_api_request(std::move(req));
}, _proxy.data_dictionary().get_config());
});
r.put(operation_type::POST, "/", req_handler);
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
@@ -904,9 +865,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
} {
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
@@ -914,28 +873,20 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port && !port_proxy_protocol && !https_port_proxy_protocol) {
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));
}
return seastar::async([this, addr, port, https_port, port_proxy_protocol, https_port_proxy_protocol, creds] {
return seastar::async([this, addr, port, https_port, creds] {
_executor.start().get();
if (port || port_proxy_protocol) {
if (port) {
set_routes(_http_server._routes);
_http_server.set_content_streaming(true);
if (port) {
_http_server.listen(socket_address{addr, *port}).get();
}
if (port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_http_server.listen(socket_address{addr, *port_proxy_protocol}, lo).get();
}
_http_server.listen(socket_address{addr, *port}).get();
_enabled_servers.push_back(std::ref(_http_server));
}
if (https_port || https_port_proxy_protocol) {
if (https_port) {
set_routes(_https_server._routes);
_https_server.set_content_streaming(true);
@@ -955,15 +906,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
} else {
_credentials = creds->build_server_credentials();
}
if (https_port) {
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
}
if (https_port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_https_server.listen(socket_address{addr, *https_port_proxy_protocol}, lo, _credentials).get();
}
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
_enabled_servers.push_back(std::ref(_https_server));
}
});
@@ -1036,15 +979,16 @@ client_data server::ongoing_request::make_client_data() const {
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
return cd;
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
ret.emplace_back(r.make_client_data());
});
co_return ret;
}

View File

@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
client_options_cache_entry_type _user_agent;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -100,9 +99,7 @@ class server : public peering_sharded_service<server> {
public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
@@ -110,7 +107,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -100,8 +100,9 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
const auto route_entries = parse_set_client_array(root);
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_await cr.local().set_client_routes(route_entries);
co_return seastar::json::json_void();
}
@@ -131,7 +132,8 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
const auto route_keys = parse_delete_client_array(root);
co_await cr.local().delete_client_routes(route_keys);
co_return seastar::json::json_void();
}

View File

@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
vp.insert(b.second);
}
}
std::vector<sstring> res;
replica::database& db = vb.local().get_db();
auto uuid = validate_table(db, ks, cf_name);
replica::column_family& cf = db.find_column_family(uuid);
co_return cf.get_index_manager().list_indexes()
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
| std::ranges::to<std::vector>();
res.reserve(cf.get_index_manager().list_indexes().size());
for (auto&& i : cf.get_index_manager().list_indexes()) {
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
res.emplace_back(i.metadata().name());
}
}
co_return res;
});
}

View File

@@ -9,7 +9,6 @@
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/http/exception.hh>
#include "task_manager.hh"
@@ -265,7 +264,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
if (id) {
module->unregister_task(id);
}
co_await coroutine::maybe_yield();
co_await maybe_yield();
}
});
co_return json_void();

View File

@@ -15,7 +15,6 @@
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
@@ -23,11 +22,9 @@ namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp)
, _loading_sem(1)
, _as(as) {
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
@@ -119,8 +116,6 @@ future<> cache::load_all() {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
++_current_version;
logger.info("Loading all roles");
@@ -151,9 +146,6 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);

View File

@@ -8,7 +8,6 @@
#pragma once
#include <seastar/core/abort_source.hh>
#include <unordered_set>
#include <unordered_map>
@@ -16,7 +15,6 @@
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include <absl/container/flat_hash_map.h>
@@ -43,7 +41,7 @@ public:
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
@@ -54,8 +52,6 @@ private:
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
semaphore _loading_sem;
abort_source& _as;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;

View File

@@ -76,14 +76,11 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
///
/// Hash a password combined with an implementation-specific salt string.
/// Deprecated in favor of `hash_with_salt_async`. This function is still used
/// when generating password hashes for storage to ensure that
/// `hash_with_salt` and `hash_with_salt_async` produce identical results,
/// preserving backward compatibility.
/// Deprecated in favor of `hash_with_salt_async`.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
sstring hash_with_salt(const sstring& pass, const sstring& salt);
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
///
/// Async version of `hash_with_salt` that returns a future.

View File

@@ -204,7 +204,7 @@ future<topology_description> topology_description::clone_async() const {
for (const auto& entry : _entries) {
vec.push_back(entry);
co_await coroutine::maybe_yield();
co_await seastar::maybe_yield();
}
co_return topology_description{std::move(vec)};

View File

@@ -15,7 +15,7 @@
#include "mutation/tombstone.hh"
#include "schema/schema.hh"
#include <seastar/core/sstring.hh>
#include "seastar/core/sstring.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "types/user.hh"

View File

@@ -10,9 +10,7 @@
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "utils/loading_shared_values.hh"
#include <list>
#include <optional>
enum class client_type {
@@ -29,20 +27,6 @@ enum class client_connection_stage {
ready,
};
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
struct options_cache_value_type {};
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
using client_options_cache_key_type = client_options_cache_type::key_type;
// This struct represents a single OPTION key-value pair from the client's connection options.
// Both key and value are represented by corresponding "references" to their cached values.
// Each "reference" is effectively a lw_shared_ptr value.
struct client_option_key_value_cached_entry {
client_options_cache_entry_type key;
client_options_cache_entry_type value;
};
sstring to_string(client_connection_stage ct);
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
@@ -53,8 +37,8 @@ struct client_data {
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
std::optional<client_options_cache_entry_type> driver_name;
std::optional<client_options_cache_entry_type> driver_version;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
@@ -62,7 +46,6 @@ struct client_data {
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
std::optional<sstring> scheduling_group_name;
std::list<client_option_key_value_cached_entry> client_options;
sstring stage_str() const { return to_string(connection_stage); }
sstring client_type_str() const { return to_string(ct); }

View File

@@ -125,6 +125,10 @@ if(target_arch)
add_compile_options("-march=${target_arch}")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
endif()
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")

View File

@@ -12,7 +12,6 @@
#include <seastar/core/condition-variable.hh>
#include "schema/schema_fwd.hh"
#include "sstables/open_info.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -45,7 +44,7 @@ public:
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;

View File

@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
}
descriptor.creator = [&t] (shard_id) {
// All compaction types going through this path will work on normal input sstables only.
// Off-strategy, for example, waits until the sstables move out of staging state.
return t.make_sstable(sstables::sstable_state::normal);
return t.make_sstable();
};
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
@@ -1849,10 +1847,6 @@ protected:
throw make_compaction_stopped_exception();
}
}, false);
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
throw make_compaction_stopped_exception();
}
co_return co_await do_rewrite_sstable(std::move(sst));
}
};
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
// Throw an error if split cannot be performed due to e.g. out of space prevention.
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
if (is_disabled()) {
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
"reason might be out of space prevention", sst->get_filename()))));
if (!can_proceed(&t)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
std::vector<sstables::shared_sstable> ret;
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
compaction_progress_monitor monitor;
compaction_data info = create_compaction_data();
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t, sst] (shard_id _) {
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
// For example, if base table has views, it's important that sstable produced by repair will be
// in the staging state.
return t.make_sstable(sst->state());
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
};
desc.replacer = [&] (compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));

View File

@@ -376,8 +376,7 @@ public:
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately

View File

@@ -571,10 +571,10 @@ commitlog_total_space_in_mb: -1
# - "none": auditing is disabled (default)
# - "table": save audited events in audit.audit_log column family
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
audit: "table"
# audit: "none"
#
# List of statement categories that should be audited.
audit_categories: "DCL,DDL,AUTH,ADMIN"
# audit_categories: "DCL,DDL,AUTH"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"

View File

@@ -368,87 +368,6 @@ def find_ninja():
sys.exit(1)
def find_compiler(name):
"""
Find a compiler by name, skipping ccache wrapper directories.
This is useful when using sccache to avoid double-caching through ccache.
Args:
name: The compiler name (e.g., 'clang++', 'clang', 'gcc')
Returns:
Path to the compiler, skipping ccache directories, or None if not found.
"""
ccache_dirs = {'/usr/lib/ccache', '/usr/lib64/ccache'}
for path_dir in os.environ.get('PATH', '').split(os.pathsep):
# Skip ccache wrapper directories
if os.path.realpath(path_dir) in ccache_dirs or path_dir in ccache_dirs:
continue
candidate = os.path.join(path_dir, name)
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
return None
def resolve_compilers_for_compiler_cache(args, compiler_cache):
"""
When using a compiler cache, resolve compiler paths to avoid ccache directories.
This prevents double-caching when ccache symlinks are in PATH.
Args:
args: The argument namespace with cc and cxx attributes.
compiler_cache: Path to the compiler cache binary, or None.
"""
if not compiler_cache:
return
if not os.path.isabs(args.cxx):
real_cxx = find_compiler(args.cxx)
if real_cxx:
args.cxx = real_cxx
if not os.path.isabs(args.cc):
real_cc = find_compiler(args.cc)
if real_cc:
args.cc = real_cc
def find_compiler_cache(preference):
"""
Find a compiler cache based on the preference.
Args:
preference: One of 'auto', 'sccache', 'ccache', 'none', or a path to a binary.
Returns:
Path to the compiler cache binary, or None if not found/disabled.
"""
if preference == 'none':
return None
if preference == 'auto':
# Prefer sccache over ccache
for cache in ['sccache', 'ccache']:
path = which(cache)
if path:
return path
return None
if preference in ('sccache', 'ccache'):
path = which(preference)
if path:
return path
print(f"Warning: {preference} not found on PATH, disabling compiler cache")
return None
# Assume it's a path to a binary
if os.path.isfile(preference) and os.access(preference, os.X_OK):
return preference
print(f"Warning: compiler cache '{preference}' not found or not executable, disabling compiler cache")
return None
modes = {
'debug': {
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
@@ -813,8 +732,6 @@ arg_parser.add_argument('--compiler', action='store', dest='cxx', default='clang
help='C++ compiler path')
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clang',
help='C compiler path')
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
help='Use dpdk (from seastar dpdk sources)')
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
@@ -1034,7 +951,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/functions/aggregate_fcts.cc',
'cql3/functions/castas_fcts.cc',
'cql3/functions/error_injection_fcts.cc',
'cql3/functions/vector_similarity_fcts.cc',
'cql3/statements/cf_prop_defs.cc',
'cql3/statements/cf_statement.cc',
'cql3/statements/authentication_statement.cc',
@@ -1092,7 +1008,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/statements/list_service_level_attachments_statement.cc',
'cql3/statements/list_effective_service_level_statement.cc',
'cql3/statements/describe_statement.cc',
'cql3/statements/view_prop_defs.cc',
'cql3/update_parameters.cc',
'cql3/util.cc',
'cql3/ut_name.cc',
@@ -1455,7 +1370,6 @@ alternator = [
'alternator/auth.cc',
'alternator/streams.cc',
'alternator/ttl.cc',
'alternator/http_compression.cc'
]
idls = ['idl/gossip_digest.idl.hh',
@@ -1701,7 +1615,6 @@ deps['test/boost/combined_tests'] += [
'test/boost/schema_registry_test.cc',
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/simple_value_with_expiry_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_compression_config_test.cc',
@@ -1785,18 +1698,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
# We need to link these files to all Boost tests to make sure that
# we can execute `--list_json_content` on them. That will produce
# a similar result as calling `--list_content={HRF,DOT}`.
# Unfortunately, to be able to do that, we're forced to link the
# relevant code by hand.
for key in deps.keys():
for prefix in boost_tests_prefixes:
if key.startswith(prefix):
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
wasm_deps = {}
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'
@@ -2101,7 +2002,7 @@ def semicolon_separated(*flags):
def real_relpath(path, start):
return os.path.relpath(os.path.realpath(path), os.path.realpath(start))
def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
def configure_seastar(build_dir, mode, mode_config):
seastar_cxx_ld_flags = mode_config['cxx_ld_flags']
# We want to "undo" coverage for seastar if we have it enabled.
if args.coverage:
@@ -2148,10 +2049,6 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
'-DSeastar_IO_URING=ON',
]
if compiler_cache:
seastar_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
if args.stack_guards is not None:
stack_guards = 'ON' if args.stack_guards else 'OFF'
seastar_cmake_args += ['-DSeastar_STACK_GUARDS={}'.format(stack_guards)]
@@ -2183,7 +2080,7 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
subprocess.check_call(seastar_cmd, shell=False, cwd=cmake_dir)
def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
def configure_abseil(build_dir, mode, mode_config):
abseil_cflags = mode_config['lib_cflags']
cxx_flags = mode_config['cxxflags']
if '-DSANITIZE' in cxx_flags:
@@ -2209,10 +2106,6 @@ def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
'-DABSL_PROPAGATE_CXX_STD=ON',
]
if compiler_cache:
abseil_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
cmake_args = abseil_cmake_args[:]
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + cmake_args
@@ -2358,6 +2251,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
if debuginfo and mode_config['can_have_debug_info']:
cxxflags += ['-g', '-gz']
if 'clang' in cxx:
# Since AssignmentTracking was enabled by default in clang
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
# coroutine frame debugging info (`coro_frame_ty`) is broken.
#
# It seems that we aren't losing much by disabling AssigmentTracking,
# so for now we choose to disable it to get `coro_frame_ty` back.
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
return cxxflags
@@ -2385,15 +2287,10 @@ def write_build_file(f,
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args):
use_precompiled_header = not args.disable_precompiled_header
warnings = get_warning_options(args.cxx)
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
# If compiler cache is available, prefix the compiler with it
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
# For Rust, sccache is used via RUSTC_WRAPPER environment variable
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache else ''
f.write(textwrap.dedent('''\
configure_args = {configure_args}
builddir = {outdir}
@@ -2456,7 +2353,7 @@ def write_build_file(f,
command = clang --target=wasm32 --no-standard-libraries -Wl,--export-all -Wl,--no-entry $in -o $out
description = C2WASM $out
rule rust2wasm
command = {rustc_wrapper}cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
command = cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
&& wasm-opt -Oz $builddir/wasm/{rustc_target}/debug/examples/$example.wasm -o $builddir/wasm/$example.wasm $
&& wasm-strip $builddir/wasm/$example.wasm
description = RUST2WASM $out
@@ -2472,7 +2369,7 @@ def write_build_file(f,
command = llvm-profdata merge $in -output=$out
''').format(configure_args=configure_args,
outdir=outdir,
cxx=cxx_with_cache,
cxx=args.cxx,
user_cflags=user_cflags,
warnings=warnings,
defines=defines,
@@ -2480,7 +2377,6 @@ def write_build_file(f,
user_ldflags=user_ldflags,
libs=libs,
rustc_target=rustc_target,
rustc_wrapper=rustc_wrapper,
link_pool_depth=link_pool_depth,
seastar_path=args.seastar_path,
ninja=ninja,
@@ -2565,10 +2461,10 @@ def write_build_file(f,
description = TEST {mode}
# This rule is unused for PGO stages. They use the rust lib from the parent mode.
rule rust_lib.{mode}
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' {rustc_wrapper}cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
&& touch $out
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
mode=mode,
@@ -2632,7 +2528,7 @@ def write_build_file(f,
# In debug/sanitize modes, we compile with fsanitizers,
# so must use the same options during the link:
if '-DSANITIZE' in modes[mode]['cxxflags']:
f.write(' libs = -fsanitize=address -fsanitize=undefined -lubsan\n')
f.write(' libs = -fsanitize=address -fsanitize=undefined\n')
else:
f.write(' libs =\n')
f.write(f'build $builddir/{mode}/{binary}.stripped: strip $builddir/{mode}/{binary}\n')
@@ -2806,35 +2702,38 @@ def write_build_file(f,
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
f.write(f'build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
f.write('build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = seastar\n')
f.write(f'build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = seastar\n'.format(**locals()))
f.write('build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = seastar_testing\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = seastar_testing\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
for lib in abseil_libs:
f.write(f'build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n')
f.write(f' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/abseil\n')
f.write(f' target = {lib}\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write('build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n'.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(' subdir = $builddir/{mode}/abseil\n'.format(**locals()))
f.write(' target = {lib}\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(f'build $builddir/{mode}/stdafx.hh.pch: cxx_build_precompiled_header.{mode} stdafx.hh | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write(f'build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n')
f.write('build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = iotune\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write(textwrap.dedent(f'''\
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = iotune\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(textwrap.dedent('''\
build $builddir/{mode}/iotune: copy $builddir/{mode}/seastar/apps/iotune/iotune
build $builddir/{mode}/iotune.stripped: strip $builddir/{mode}/iotune
build $builddir/{mode}/iotune.debug: phony $builddir/{mode}/iotune.stripped
'''))
''').format(**locals()))
if args.dist_only:
include_scylla_and_iotune = ''
include_scylla_and_iotune_stripped = ''
@@ -2843,16 +2742,16 @@ def write_build_file(f,
include_scylla_and_iotune = f'$builddir/{mode}/scylla $builddir/{mode}/iotune $builddir/{mode}/patchelf'
include_scylla_and_iotune_stripped = f'$builddir/{mode}/scylla.stripped $builddir/{mode}/iotune.stripped $builddir/{mode}/patchelf.stripped'
include_scylla_and_iotune_debug = f'$builddir/{mode}/scylla.debug $builddir/{mode}/iotune.debug'
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write(f'build $builddir/dist/{mode}/redhat: rpmbuild $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
@@ -3025,9 +2924,6 @@ def create_build_system(args):
os.makedirs(outdir, exist_ok=True)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
for mode, mode_config in build_modes.items():
@@ -3044,8 +2940,8 @@ def create_build_system(args):
# {outdir}/{mode}/seastar/build.ninja, and
# {outdir}/{mode}/seastar/seastar.pc is queried for building flags
for mode, mode_config in build_modes.items():
configure_seastar(outdir, mode, mode_config, compiler_cache)
configure_abseil(outdir, mode, mode_config, compiler_cache)
configure_seastar(outdir, mode, mode_config)
configure_abseil(outdir, mode, mode_config)
user_cflags += ' -isystem abseil'
for mode, mode_config in build_modes.items():
@@ -3068,7 +2964,6 @@ def create_build_system(args):
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args)
generate_compdb('compile_commands.json', ninja, args.buildfile, selected_modes)
@@ -3111,10 +3006,6 @@ def configure_using_cmake(args):
selected_modes = args.selected_modes or default_modes
selected_configs = ';'.join(build_modes[mode].cmake_build_type for mode
in selected_modes)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
settings = {
'CMAKE_CONFIGURATION_TYPES': selected_configs,
'CMAKE_CROSS_CONFIGS': selected_configs,
@@ -3132,14 +3023,6 @@ def configure_using_cmake(args):
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
}
if compiler_cache:
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
# For Rust, sccache is used via RUSTC_WRAPPER
if 'sccache' in compiler_cache:
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
if args.date_stamp:
settings['Scylla_DATE_STAMP'] = args.date_stamp
if args.staticboost:
@@ -3171,7 +3054,7 @@ def configure_using_cmake(args):
if not args.dist_only:
for mode in selected_modes:
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode], compiler_cache)
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode])
cmake_command = ['cmake']
cmake_command += [f'-D{var}={value}' for var, value in settings.items()]

View File

@@ -47,7 +47,6 @@ target_sources(cql3
functions/aggregate_fcts.cc
functions/castas_fcts.cc
functions/error_injection_fcts.cc
functions/vector_similarity_fcts.cc
statements/cf_prop_defs.cc
statements/cf_statement.cc
statements/authentication_statement.cc
@@ -105,7 +104,6 @@ target_sources(cql3
statements/list_service_level_attachments_statement.cc
statements/list_effective_service_level_statement.cc
statements/describe_statement.cc
statements/view_prop_defs.cc
update_parameters.cc
util.cc
ut_name.cc

View File

@@ -431,7 +431,6 @@ unaliasedSelector returns [uexpression tmp]
| K_TTL '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::ttl,
unresolved_identifier{std::move(c)}}; }
| f=functionName args=selectionFunctionArgs { tmp = function_call{std::move(f), std::move(args)}; }
| f=similarityFunctionName args=vectorSimilarityArgs { tmp = function_call{std::move(f), std::move(args)}; }
| K_CAST '(' arg=unaliasedSelector K_AS t=native_type ')' { tmp = cast{.style = cast::cast_style::sql, .arg = std::move(arg), .type = std::move(t)}; }
)
( '.' fi=cident { tmp = field_selection{std::move(tmp), std::move(fi)}; }
@@ -446,18 +445,6 @@ selectionFunctionArgs returns [std::vector<expression> a]
')'
;
vectorSimilarityArgs returns [std::vector<expression> a]
: '(' ')'
| '(' v1=vectorSimilarityArg { a.push_back(std::move(v1)); }
( ',' vn=vectorSimilarityArg { a.push_back(std::move(vn)); } )*
')'
;
vectorSimilarityArg returns [uexpression a]
: s=unaliasedSelector { a = std::move(s); }
| v=value { a = std::move(v); }
;
countArgument
: '*'
| i=INTEGER { if (i->getText() != "1") {
@@ -898,10 +885,6 @@ pkDef[cql3::statements::create_table_statement::raw_statement& expr]
| '(' k1=ident { l.push_back(k1); } ( ',' kn=ident { l.push_back(kn); } )* ')' { $expr.add_key_aliases(l); }
;
cfamProperties[cql3::statements::cf_properties& expr]
: cfamProperty[expr] (K_AND cfamProperty[expr])*
;
cfamProperty[cql3::statements::cf_properties& expr]
: property[*$expr.properties()]
| K_COMPACT K_STORAGE { $expr.set_compact_storage(); }
@@ -939,22 +922,16 @@ typeColumns[create_type_statement& expr]
*/
createIndexStatement returns [std::unique_ptr<create_index_statement> expr]
@init {
auto idx_props = make_shared<index_specific_prop_defs>();
auto props = index_prop_defs();
auto props = make_shared<index_prop_defs>();
bool if_not_exists = false;
auto name = ::make_shared<cql3::index_name>();
std::vector<::shared_ptr<index_target::raw>> targets;
}
: K_CREATE (K_CUSTOM { idx_props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
: K_CREATE (K_CUSTOM { props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
(idxName[*name])? K_ON cf=columnFamilyName '(' (target1=indexIdent { targets.emplace_back(target1); } (',' target2=indexIdent { targets.emplace_back(target2); } )*)? ')'
(K_USING cls=STRING_LITERAL { idx_props->custom_class = sstring{$cls.text}; })?
(K_WITH cfamProperties[props])?
{
props.extract_index_specific_properties_to(*idx_props);
view_prop_defs view_props = std::move(props).into_view_prop_defs();
$expr = std::make_unique<create_index_statement>(cf, name, targets, std::move(idx_props), std::move(view_props), if_not_exists);
}
(K_USING cls=STRING_LITERAL { props->custom_class = sstring{$cls.text}; })?
(K_WITH properties[*props])?
{ $expr = std::make_unique<create_index_statement>(cf, name, targets, props, if_not_exists); }
;
indexIdent returns [::shared_ptr<index_target::raw> id]
@@ -1102,9 +1079,9 @@ alterTypeStatement returns [std::unique_ptr<alter_type_statement> expr]
*/
alterViewStatement returns [std::unique_ptr<alter_view_statement> expr]
@init {
auto props = cql3::statements::view_prop_defs();
auto props = cql3::statements::cf_prop_defs();
}
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[*props.properties()]
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[props]
{
$expr = std::make_unique<alter_view_statement>(std::move(cf), std::move(props));
}
@@ -1706,10 +1683,6 @@ functionName returns [cql3::functions::function_name s]
: (ks=keyspaceName '.')? f=allowedFunctionName { $s.keyspace = std::move(ks); $s.name = std::move(f); }
;
similarityFunctionName returns [cql3::functions::function_name s]
: f=allowedSimilarityFunctionName { $s = cql3::functions::function_name::native_function(std::move(f)); }
;
allowedFunctionName returns [sstring s]
: f=IDENT { $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
| f=QUOTED_NAME { $s = $f.text; }
@@ -1718,11 +1691,6 @@ allowedFunctionName returns [sstring s]
| K_COUNT { $s = "count"; }
;
allowedSimilarityFunctionName returns [sstring s]
: f=(K_SIMILARITY_COSINE | K_SIMILARITY_EUCLIDEAN | K_SIMILARITY_DOT_PRODUCT)
{ $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
;
functionArgs returns [std::vector<expression> a]
: '(' ')'
| '(' t1=term { a.push_back(std::move(t1)); }
@@ -2419,10 +2387,6 @@ K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
K_SIMILARITY_EUCLIDEAN: S I M I L A R I T Y '_' E U C L I D E A N;
K_SIMILARITY_COSINE: S I M I L A R I T Y '_' C O S I N E;
K_SIMILARITY_DOT_PRODUCT: S I M I L A R I T Y '_' D O T '_' P R O D U C T;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');

View File

@@ -25,11 +25,6 @@ public:
NOT_ASSIGNABLE,
};
struct vector_test_result {
test_result result;
std::optional<size_t> dimension_opt;
};
static bool is_assignable(test_result tr) {
return tr != test_result::NOT_ASSIGNABLE;
}
@@ -49,8 +44,6 @@ public:
*/
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const = 0;
virtual vector_test_result test_assignment_any_size_float_vector() const = 0;
virtual std::optional<data_type> assignment_testable_type_opt() const = 0;
// for error reporting

View File

@@ -1434,112 +1434,6 @@ test_assignment(const expression& expr, data_dictionary::database db, const sstr
}, expr);
}
template <cql3_type::kind... Kinds>
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
using test_result = assignment_testable::vector_test_result;
const test_result NOT_ASSIGNABLE = {assignment_testable::test_result::NOT_ASSIGNABLE, std::nullopt};
const test_result WEAKLY_ASSIGNABLE = {assignment_testable::test_result::WEAKLY_ASSIGNABLE, std::nullopt};
auto is_float_or_bind = [] (const expression& e) {
return expr::visit(overloaded_functor{
[] (const bind_variable&) {
return true;
},
[] (const untyped_constant& uc) {
return uc.partial_type == untyped_constant::type_class::floating_point
|| uc.partial_type == untyped_constant::type_class::integer;
},
[] (const constant& value) {
auto kind = value.type->as_cql3_type().get_kind();
return cql3_type::kind_enum_set::frozen<Kinds...>().contains(kind);
},
[] (const auto&) {
return false;
},
}, e);
};
auto validate_assignment = [&] (const data_type& dt) -> test_result {
auto vt = dynamic_pointer_cast<const vector_type_impl>(dt->underlying_type());
if (!vt) {
return NOT_ASSIGNABLE;
}
auto elem_kind = vt->get_elements_type()->as_cql3_type().get_kind();
if (cql3_type::kind_enum_set::frozen<Kinds...>().contains(elem_kind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, vt->get_dimension()};
}
return NOT_ASSIGNABLE;
};
return expr::visit(overloaded_functor{
[&] (const constant& value) -> test_result {
return validate_assignment(value.type);
},
[&] (const binary_operator&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const conjunction&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_value& col_val) -> test_result {
return validate_assignment(col_val.col->type);
},
[&] (const subscript&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const unresolved_identifier& ui) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_mutation_attribute& cma) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const function_call& fc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const cast& c) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const field_selection& fs) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const bind_variable& bv) -> test_result {
return WEAKLY_ASSIGNABLE;
},
[&] (const untyped_constant& uc) -> test_result {
return uc.partial_type == untyped_constant::type_class::null
? WEAKLY_ASSIGNABLE
: NOT_ASSIGNABLE;
},
[&] (const tuple_constructor& tc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const collection_constructor& c) -> test_result {
switch (c.style) {
case collection_constructor::style_type::list_or_vector: {
if(std::ranges::all_of(c.elements, is_float_or_bind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, c.elements.size()};
}
return NOT_ASSIGNABLE;
}
case collection_constructor::style_type::set: return NOT_ASSIGNABLE;
case collection_constructor::style_type::map: return NOT_ASSIGNABLE;
case collection_constructor::style_type::vector:
on_internal_error(expr_logger, "vector style type found in test_assignment, should have been introduced post-prepare");
}
on_internal_error(expr_logger, fmt::format("unexpected collection_constructor style {}", static_cast<unsigned>(c.style)));
},
[&] (const usertype_constructor& uc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const temporary& t) -> test_result {
return NOT_ASSIGNABLE;
},
}, expr);
}
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
return test_assignment_any_size_float_vector<cql3_type::kind::FLOAT, cql3_type::kind::DOUBLE>(expr);
}
expression
prepare_expression(const expression& expr, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
auto e_opt = try_prepare_expression(expr, db, keyspace, schema_opt, std::move(receiver));
@@ -1573,9 +1467,6 @@ public:
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const override {
return expr::test_assignment(_e, db, keyspace, schema_opt, receiver);
}
virtual vector_test_result test_assignment_any_size_float_vector() const override {
return expr::test_assignment_any_size_float_vector(_e);
}
virtual sstring assignment_testable_source_context() const override {
return fmt::format("{}", _e);
}

View File

@@ -16,7 +16,6 @@
#include "cql3/functions/user_function.hh"
#include "cql3/functions/user_aggregate.hh"
#include "cql3/functions/uuid_fcts.hh"
#include "cql3/functions/vector_similarity_fcts.hh"
#include "data_dictionary/data_dictionary.hh"
#include "as_json_function.hh"
#include "cql3/prepare_context.hh"
@@ -399,14 +398,6 @@ functions::get(data_dictionary::database db,
}
});
const auto func_name = name.has_keyspace() ? name : name.as_native_function();
if (SIMILARITY_FUNCTIONS.contains(func_name)) {
auto arg_types = retrieve_vector_arg_types(func_name, provided_args);
auto fun = ::make_shared<vector_similarity_fct>(func_name.name, arg_types);
validate_types(db, keyspace, schema.get(), fun, provided_args, receiver_ks, receiver_cf);
return fun;
}
if (name.has_keyspace()
? name == TOKEN_FUNCTION_NAME
: name.name == TOKEN_FUNCTION_NAME.name) {

View File

@@ -1,150 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "vector_similarity_fcts.hh"
#include "types/types.hh"
#include "types/vector.hh"
#include "exceptions/exceptions.hh"
namespace cql3 {
namespace functions {
namespace {
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
// There exist tests checking the compliance of the results.
// Reference:
// https://github.com/datastax/jvector/blob/f967f1c9249035b63b55a566fac7d4dc38380349/jvector-base/src/main/java/io/github/jbellis/jvector/vector/VectorSimilarityFunction.java#L36-L69
// You should only use this function if you need to preserve the original vectors and cannot normalize
// them in advance.
float compute_cosine_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
double squared_norm_a = 0.0;
double squared_norm_b = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
squared_norm_a += a * a;
squared_norm_b += b * b;
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
}
// The cosine similarity is in the range [-1, 1].
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
}
float compute_euclidean_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double sum = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
double diff = a - b;
sum += diff * diff;
}
// The squared Euclidean (L2) distance is of range [0, inf).
// It is mapped to a similarity score in the range (0, 1] (0 -> 1, inf -> 0)
// for consistency with other similarity functions.
return (1 / (1 + sum));
}
// Assumes that both vectors are L2-normalized.
// This similarity is intended as an optimized way to perform cosine similarity calculation.
float compute_dot_product_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
}
// The dot product is in the range [-1, 1] for L2-normalized vectors.
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return ((1 + dot_product) / 2);
}
} // namespace
thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS = {
{SIMILARITY_COSINE_FUNCTION_NAME, compute_cosine_similarity},
{SIMILARITY_EUCLIDEAN_FUNCTION_NAME, compute_euclidean_similarity},
{SIMILARITY_DOT_PRODUCT_FUNCTION_NAME, compute_dot_product_similarity},
};
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args) {
if (provided_args.size() != 2) {
throw exceptions::invalid_request_exception(fmt::format("Invalid number of arguments for function {}(vector<float, n>, vector<float, n>)", name));
}
auto [first_result, first_dim_opt] = provided_args[0]->test_assignment_any_size_float_vector();
auto [second_result, second_dim_opt] = provided_args[1]->test_assignment_any_size_float_vector();
auto invalid_type_error_message = [&name](const shared_ptr<assignment_testable>& arg) {
auto type = arg->assignment_testable_type_opt();
const auto& source_context = arg->assignment_testable_source_context();
if (type) {
return fmt::format("Function {} requires a float vector argument, but found {} of type {}", name, source_context, type.value()->cql3_type_name());
} else {
return fmt::format("Function {} requires a float vector argument, but found {}", name, source_context);
}
};
if (!is_assignable(first_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[0]));
}
if (!is_assignable(second_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[1]));
}
if (!first_dim_opt && !second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format("Cannot infer type of argument {} for function {}(vector<float, n>, vector<float, n>)",
provided_args[0]->assignment_testable_source_context(), name));
}
if (first_dim_opt && second_dim_opt) {
if (*first_dim_opt != *second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format(
"All arguments must have the same vector dimensions, but found vector<float, {}> and vector<float, {}>", *first_dim_opt, *second_dim_opt));
}
}
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
auto type = vector_type_impl::get_instance(float_type, dimension);
return {type, type};
}
bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters) {
if (std::any_of(parameters.begin(), parameters.end(), [](const auto& param) {
return !param;
})) {
return std::nullopt;
}
const auto& type = arg_types()[0];
data_value v1 = type->deserialize(*parameters[0]);
data_value v2 = type->deserialize(*parameters[1]);
const auto& v1_elements = value_cast<std::vector<data_value>>(v1);
const auto& v2_elements = value_cast<std::vector<data_value>>(v2);
float result = SIMILARITY_FUNCTIONS.at(_name)(v1_elements, v2_elements);
return float_type->decompose(result);
}
} // namespace functions
} // namespace cql3

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "native_scalar_function.hh"
#include "cql3/assignment_testable.hh"
#include "cql3/functions/function_name.hh"
namespace cql3 {
namespace functions {
static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::native_function("similarity_cosine");
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
using similarity_function_t = float (*)(const std::vector<data_value>&, const std::vector<data_value>&);
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
class vector_similarity_fct : public native_scalar_function {
public:
vector_similarity_fct(const sstring& name, const std::vector<data_type>& arg_types)
: native_scalar_function(name, float_type, arg_types) {
}
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
};
} // namespace functions
} // namespace cql3

View File

@@ -14,7 +14,6 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/try_future.hh>
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
@@ -994,7 +993,7 @@ query_processor::execute_with_params(
auto opts = make_internal_options(p, values, cl);
auto statement = p->statement;
auto msg = co_await coroutine::try_future(execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params));
auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params);
co_return ::make_shared<untyped_result_set>(msg);
}
@@ -1004,7 +1003,7 @@ query_processor::do_execute_with_params(
shared_ptr<cql_statement> statement,
const query_options& options, std::optional<service::group0_guard> guard) {
statement->validate(*this, service::client_state::for_internal_calls());
co_return co_await coroutine::try_future(statement->execute(*this, query_state, options, std::move(guard)));
co_return co_await statement->execute(*this, query_state, options, std::move(guard));
}

View File

@@ -32,7 +32,7 @@ bool
selectable_processes_selection(const expr::expression& selectable) {
return expr::visit(overloaded_functor{
[&] (const expr::constant&) -> bool {
return true;
on_internal_error(slogger, "no way to express SELECT constant in the grammar yet");
},
[&] (const expr::conjunction& conj) -> bool {
on_internal_error(slogger, "no way to express 'SELECT a AND b' in the grammar yet");

View File

@@ -19,7 +19,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "mutation/canonical_mutation.hh"
#include "prepared_statement.hh"
#include <seastar/coroutine/exception.hh>
#include "seastar/coroutine/exception.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
@@ -206,9 +206,8 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
locator::replication_strategy_params(ks_md_update->strategy_options(), ks_md_update->initial_tablets(), ks_md_update->consistency_option()),
topo);
// If RF-rack-validity must be enforced for the keyspace according to `enforce_rf_rack_validity_for_keyspace`,
// it's forbidden to perform a schema change that would lead to an RF-rack-invalid keyspace.
// Verify that this change does not.
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to perform a schema change that
// would lead to an RF-rack-valid keyspace. Verify that this change does not.
// For more context, see: scylladb/scylladb#23071.
try {
// There are two things to note here:
@@ -226,13 +225,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// disturb it (see scylladb/scylladb#23345), but we ignore that.
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
} catch (const std::exception& e) {
if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) {
if (qp.db().get_config().rf_rack_valid_keyspaces()) {
// There's no guarantee what the type of the exception will be, so we need to
// wrap it manually here in a type that can be passed to the user.
throw exceptions::invalid_request_exception(e.what());
} else {
// Even when RF-rack-validity is not enforced for the keyspace, we'd
// like to inform the user that the keyspace they're altering will not
// Even when the configuration option `rf_rack_valid_keyspaces` is set to false,
// we'd like to inform the user that the keyspace they're altering will not
// satisfy the restriction after the change--but just as a warning.
// For more context, see issue: scylladb/scylladb#23330.
warnings.push_back(seastar::format(

View File

@@ -11,7 +11,6 @@
#include <seastar/core/coroutine.hh>
#include "cql3/statements/alter_view_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/statements/view_prop_defs.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "validation.hh"
@@ -23,7 +22,7 @@ namespace cql3 {
namespace statements {
alter_view_statement::alter_view_statement(cf_name view_name, std::optional<view_prop_defs> properties)
alter_view_statement::alter_view_statement(cf_name view_name, std::optional<cf_prop_defs> properties)
: schema_altering_statement{std::move(view_name)}
, _properties{std::move(properties)}
{
@@ -53,8 +52,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
throw exceptions::invalid_request_exception("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found");
}
auto schema_extensions = _properties->properties()->make_schema_extensions(db.extensions());
_properties->validate_raw(view_prop_defs::op_type::alter, db, keyspace(), schema_extensions);
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
_properties->validate(db, keyspace(), schema_extensions);
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
@@ -71,15 +70,28 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
}();
if (is_colocated) {
auto gc_opts = _properties->properties()->get_tombstone_gc_options(schema_extensions);
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
auto builder = schema_builder(schema);
_properties->apply_to_builder(view_prop_defs::op_type::alter, builder, std::move(schema_extensions),
db, keyspace(), is_colocated);
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
if (builder.get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(
"Cannot alter gc_grace_seconds of a materialized view to 0, since this "
"value is used to TTL undelivered updates. Setting gc_grace_seconds too "
"low might cause undelivered updates to expire before being replayed.");
}
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(
"Cannot set or alter default_time_to_live for a materialized view. "
"Data in a materialized view always expire at the same time than "
"the corresponding data in the parent table.");
}
return view_ptr(builder.build());
}

View File

@@ -12,8 +12,8 @@
#include <seastar/core/shared_ptr.hh>
#include "cql3/statements/view_prop_defs.hh"
#include "data_dictionary/data_dictionary.hh"
#include "cql3/statements/cf_prop_defs.hh"
#include "cql3/statements/schema_altering_statement.hh"
namespace cql3 {
@@ -26,10 +26,10 @@ namespace statements {
/** An <code>ALTER MATERIALIZED VIEW</code> parsed from a CQL query statement. */
class alter_view_statement : public schema_altering_statement {
private:
std::optional<view_prop_defs> _properties;
std::optional<cf_prop_defs> _properties;
view_ptr prepare_view(data_dictionary::database db) const;
public:
alter_view_statement(cf_name view_name, std::optional<view_prop_defs> properties);
alter_view_statement(cf_name view_name, std::optional<cf_prop_defs> properties);
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -190,7 +190,7 @@ future<utils::chunked_vector<mutation>> batch_statement::get_mutations(query_pro
co_return vresult;
}
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const {
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) {
if (mutations.size() <= 1) {
return; // We only warn for batch spanning multiple mutations
}
@@ -209,9 +209,8 @@ void batch_statement::verify_batch_size(query_processor& qp, const utils::chunke
for (auto&& m : mutations) {
ks_cf_pairs.insert(m.schema()->ks_name() + "." + m.schema()->cf_name());
}
const auto batch_type = _type == type::LOGGED ? "Logged" : "Unlogged";
return seastar::format("{} batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
batch_type, mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
return seastar::format("Batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
};
if (size > fail_threshold) {
_logger.error("{}", error("FAIL", fail_threshold).c_str());

View File

@@ -116,7 +116,7 @@ public:
* Checks batch size to ensure threshold is met. If not, a warning is logged.
* @param cfs ColumnFamilies that will store the batch's mutations.
*/
void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const;
static void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations);
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;

View File

@@ -19,8 +19,7 @@ namespace statements {
/**
* Class for common statement properties.
*/
class cf_properties {
protected:
class cf_properties final {
const ::shared_ptr<cf_prop_defs> _properties = ::make_shared<cf_prop_defs>();
bool _use_compact_storage = false;
std::vector<std::pair<::shared_ptr<column_identifier>, bool>> _defined_ordering; // Insertion ordering is important

View File

@@ -14,7 +14,6 @@
#include "db/view/view.hh"
#include "exceptions/exceptions.hh"
#include "index/vector_index.hh"
#include "locator/token_metadata_fwd.hh"
#include "prepared_statement.hh"
#include "replica/database.hh"
#include "types/types.hh"
@@ -219,24 +218,18 @@ view_ptr create_index_statement::create_view_for_index(const schema_ptr schema,
std::map<sstring, sstring> tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}};
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
}
const schema::extensions_map exts = _view_properties.properties()->make_schema_extensions(db.extensions());
_view_properties.apply_to_builder(view_prop_defs::op_type::create, builder, exts, db, keyspace(), is_colocated);
return view_ptr{builder.build()};
}
create_index_statement::create_index_statement(cf_name name,
::shared_ptr<index_name> index_name,
std::vector<::shared_ptr<index_target::raw>> raw_targets,
::shared_ptr<index_specific_prop_defs> idx_properties,
view_prop_defs view_properties,
::shared_ptr<index_prop_defs> properties,
bool if_not_exists)
: schema_altering_statement(name)
, _index_name(index_name->get_idx())
, _raw_targets(raw_targets)
, _idx_properties(std::move(idx_properties))
, _view_properties(std::move(view_properties))
, _properties(properties)
, _if_not_exists(if_not_exists)
{
}
@@ -259,53 +252,14 @@ static sstring target_type_name(index_target::target_type type) {
void
create_index_statement::validate(query_processor& qp, const service::client_state& state) const
{
if (_raw_targets.empty() && !_idx_properties->is_custom) {
if (_raw_targets.empty() && !_properties->is_custom) {
throw exceptions::invalid_request_exception("Only CUSTOM indexes can be created without specifying a target column");
}
_idx_properties->validate();
// FIXME: This is ugly and can be improved.
const bool is_vector_index = _idx_properties->custom_class && *_idx_properties->custom_class == "vector_index";
const bool uses_view_properties = _view_properties.properties()->count() > 0
|| _view_properties.use_compact_storage()
|| _view_properties.defined_ordering().size() > 0;
if (is_vector_index && uses_view_properties) {
throw exceptions::invalid_request_exception("You cannot use view properties with a vector index");
}
const schema::extensions_map exts = _view_properties.properties()->make_schema_extensions(qp.db().extensions());
_view_properties.validate_raw(view_prop_defs::op_type::create, qp.db(), keyspace(), exts);
// These keywords are still accepted by other schema entities, but they don't have effect on them.
// Since indexes are not bound by any backward compatibility contract in this regard, let's forbid these.
static sstring obsolete_keywords[] = {
"index_interval",
"replicate_on_write",
"populate_io_cache_on_flush",
"read_repair_chance",
"dclocal_read_repair_chance",
};
for (const sstring& keyword : obsolete_keywords) {
if (_view_properties.properties()->has_property(keyword)) {
// We use the same type of exception and the same error message as would be thrown for
// an invalid property via `_view_properties.validate_raw`.
throw exceptions::syntax_exception(seastar::format("Unknown property '{}'", keyword));
}
}
// FIXME: This is a temporary limitation as it might deserve more attention.
if (!_view_properties.defined_ordering().empty()) {
throw exceptions::invalid_request_exception("Indexes do not allow for specifying the clustering order");
}
_properties->validate();
}
std::pair<std::vector<::shared_ptr<index_target>>, cql3::cql_warnings_vec>
create_index_statement::validate_while_executing(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
cql3::cql_warnings_vec warnings;
std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_executing(data_dictionary::database db) const {
auto schema = validation::validate_column_family(db, keyspace(), column_family());
if (schema->is_counter()) {
@@ -327,22 +281,13 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_idx_properties || !_idx_properties->custom_class) {
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace(), tmptr);
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
if (db.find_keyspace(keyspace()).uses_tablets()) {
warnings.emplace_back(
"Creating an index in a keyspace that uses tablets requires "
"the keyspace to remain RF-rack-valid while the index exists. "
"Some operations will be restricted to enforce this: altering the keyspace's replication "
"factor, adding a node in a new rack, and removing or decommissioning a node that would "
"eliminate a rack.");
}
}
validate_for_local_index(*schema);
@@ -352,14 +297,14 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
targets.emplace_back(raw_target->prepare(*schema));
}
if (_idx_properties && _idx_properties->custom_class) {
auto custom_index_factory = secondary_index::secondary_index_manager::get_custom_class_factory(*_idx_properties->custom_class);
if (_properties && _properties->custom_class) {
auto custom_index_factory = secondary_index::secondary_index_manager::get_custom_class_factory(*_properties->custom_class);
if (!custom_index_factory) {
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *_idx_properties->custom_class));
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *(_properties->custom_class)));
}
auto custom_index = (*custom_index_factory)();
custom_index->validate(*schema, *_idx_properties, targets, db.features(), db);
_idx_properties->index_version = custom_index->index_version(*schema);
custom_index->validate(*schema, *_properties, targets, db.features(), db);
_properties->index_version = custom_index->index_version(*schema);
}
if (targets.size() > 1) {
@@ -439,7 +384,7 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
}
}
return std::make_pair(std::move(targets), std::move(warnings));
return targets;
}
void create_index_statement::validate_for_local_index(const schema& schema) const {
@@ -578,7 +523,7 @@ void create_index_statement::validate_target_column_is_map_if_index_involves_key
void create_index_statement::validate_targets_for_multi_column_index(std::vector<::shared_ptr<index_target>> targets) const
{
if (!_idx_properties->is_custom) {
if (!_properties->is_custom) {
if (targets.size() > 2 || (targets.size() == 2 && std::holds_alternative<index_target::single_column>(targets.front()->value))) {
throw exceptions::invalid_request_exception("Only CUSTOM indexes support multiple columns");
}
@@ -592,9 +537,8 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
}
}
std::pair<std::optional<create_index_statement::base_schema_with_new_index>, cql3::cql_warnings_vec>
create_index_statement::build_index_schema(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
auto [targets, warnings] = validate_while_executing(db, tmptr);
std::optional<create_index_statement::base_schema_with_new_index> create_index_statement::build_index_schema(data_dictionary::database db) const {
auto targets = validate_while_executing(db);
auto schema = db.find_schema(keyspace(), column_family());
@@ -610,8 +554,8 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
}
index_metadata_kind kind;
index_options_map index_options;
if (_idx_properties->custom_class) {
index_options = _idx_properties->get_options();
if (_properties->custom_class) {
index_options = _properties->get_options();
kind = index_metadata_kind::custom;
} else {
kind = schema->is_compound() ? index_metadata_kind::composites : index_metadata_kind::keys;
@@ -620,17 +564,17 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
auto existing_index = schema->find_index_noname(index);
if (existing_index) {
if (_if_not_exists) {
return std::make_pair(std::nullopt, std::move(warnings));
return {};
} else {
throw exceptions::invalid_request_exception(
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
}
}
bool existing_vector_index = _idx_properties->custom_class && _idx_properties->custom_class == "vector_index" && secondary_index::vector_index::has_vector_index_on_column(*schema, targets[0]->column_name());
bool custom_index_with_same_name = _idx_properties->custom_class && db.existing_index_names(keyspace()).contains(_index_name);
bool existing_vector_index = _properties->custom_class && _properties->custom_class == "vector_index" && secondary_index::vector_index::has_vector_index_on_column(*schema, targets[0]->column_name());
bool custom_index_with_same_name = _properties->custom_class && db.existing_index_names(keyspace()).contains(_index_name);
if (existing_vector_index || custom_index_with_same_name) {
if (_if_not_exists) {
return std::make_pair(std::nullopt, std::move(warnings));
return {};
} else {
throw exceptions::invalid_request_exception("There exists a duplicate custom index");
}
@@ -646,13 +590,13 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
schema_builder builder{schema};
builder.with_index(index);
return std::make_pair(base_schema_with_new_index{builder.build(), index}, std::move(warnings));
return base_schema_with_new_index{builder.build(), index};
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chunked_vector<mutation>, cql3::cql_warnings_vec>>
create_index_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
using namespace cql_transport;
auto [res, warnings] = build_index_schema(qp.db(), qp.proxy().get_token_metadata_ptr());
auto res = build_index_schema(qp.db());
::shared_ptr<event::schema_change> ret;
utils::chunked_vector<mutation> muts;
@@ -682,7 +626,7 @@ create_index_statement::prepare_schema_mutations(query_processor& qp, const quer
column_family());
}
co_return std::make_tuple(std::move(ret), std::move(muts), std::move(warnings));
co_return std::make_tuple(std::move(ret), std::move(muts), std::vector<sstring>());
}
std::unique_ptr<cql3::statements::prepared_statement>

View File

@@ -10,8 +10,6 @@
#pragma once
#include "cql3/statements/index_prop_defs.hh"
#include "cql3/statements/view_prop_defs.hh"
#include "schema_altering_statement.hh"
#include "index_target.hh"
@@ -29,25 +27,20 @@ class index_name;
namespace statements {
class index_specific_prop_defs;
class index_prop_defs;
/** A <code>CREATE INDEX</code> statement parsed from a CQL query. */
class create_index_statement : public schema_altering_statement {
const sstring _index_name;
const std::vector<::shared_ptr<index_target::raw>> _raw_targets;
// Options specific to this index.
const ::shared_ptr<index_specific_prop_defs> _idx_properties;
// Options corresponding to the underlying materialized view.
const view_prop_defs _view_properties;
const ::shared_ptr<index_prop_defs> _properties;
const bool _if_not_exists;
cql_stats* _cql_stats = nullptr;
public:
create_index_statement(cf_name name, ::shared_ptr<index_name> index_name,
std::vector<::shared_ptr<index_target::raw>> raw_targets,
::shared_ptr<index_specific_prop_defs> idx_properties, view_prop_defs view_properties, bool if_not_exists);
::shared_ptr<index_prop_defs> properties, bool if_not_exists);
future<> check_access(query_processor& qp, const service::client_state& state) const override;
void validate(query_processor&, const service::client_state& state) const override;
@@ -60,7 +53,7 @@ public:
schema_ptr schema;
index_metadata index;
};
std::pair<std::optional<base_schema_with_new_index>, cql3::cql_warnings_vec> build_index_schema(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
std::optional<base_schema_with_new_index> build_index_schema(data_dictionary::database db) const;
view_ptr create_view_for_index(const schema_ptr, const index_metadata& im, const data_dictionary::database&) const;
private:
void validate_for_local_index(const schema& schema) const;
@@ -76,7 +69,7 @@ private:
const sstring& name,
index_metadata_kind kind,
const index_options_map& options);
std::pair<std::vector<::shared_ptr<index_target>>, cql3::cql_warnings_vec> validate_while_executing(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
std::vector<::shared_ptr<index_target>> validate_while_executing(data_dictionary::database db) const;
};
}

View File

@@ -116,21 +116,21 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}
// If RF-rack-validity must be enforced for the keyspace according to `enforce_rf_rack_validity_for_keyspace`,
// it's forbidden to create an RF-rack-invalid keyspace. Verify that it's RF-rack-valid.
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to create an RF-rack-invalid keyspace.
// Verify that it's RF-rack-valid.
// For more context, see: scylladb/scylladb#23071.
try {
// We hold a group0_guard, so it's correct to check this here.
// The topology or schema cannot change while we're performing this query.
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
} catch (const std::exception& e) {
if (replica::database::enforce_rf_rack_validity_for_keyspace(cfg, *ksm)) {
if (cfg.rf_rack_valid_keyspaces()) {
// There's no guarantee what the type of the exception will be, so we need to
// wrap it manually here in a type that can be passed to the user.
throw exceptions::invalid_request_exception(e.what());
} else {
// Even when RF-rack-validity is not enforced for the keyspace, we'd
// like to inform the user that the keyspace they're creating does not
// Even when the configuration option `rf_rack_valid_keyspaces` is set to false,
// we'd like to inform the user that the keyspace they're creating does not
// satisfy the restriction--but just as a warning.
// For more context, see issue: scylladb/scylladb#23330.
warnings.push_back(seastar::format(

View File

@@ -8,7 +8,6 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "cql3/statements/view_prop_defs.hh"
#include "exceptions/exceptions.hh"
#include "utils/assert.hh"
#include <unordered_set>
@@ -106,7 +105,7 @@ static bool validate_primary_key(
return new_non_pk_column;
}
std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(data_dictionary::database db) const {
// We need to make sure that:
// - materialized view name is valid
// - primary key includes all columns in base table's primary key
@@ -120,7 +119,15 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
cql3::cql_warnings_vec warnings;
auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions());
_properties.validate_raw(view_prop_defs::op_type::create, db, keyspace(), schema_extensions);
_properties.validate(db, keyspace(), schema_extensions);
if (_properties.use_compact_storage()) {
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
}
if (_properties.properties()->get_cdc_options(schema_extensions)) {
throw exceptions::invalid_request_exception("Cannot enable CDC for a materialized view");
}
// View and base tables must be in the same keyspace, to ensure that RF
// is the same (because we assign a view replica to each base replica).
@@ -146,21 +153,12 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
try {
db::view::validate_view_keyspace(db, keyspace(), tmptr);
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
if (db.find_keyspace(keyspace()).uses_tablets()) {
warnings.emplace_back(
"Creating a materialized view in a keyspaces that uses tablets requires "
"the keyspace to remain RF-rack-valid while the materialized view exists. "
"Some operations will be restricted to enforce this: altering the keyspace's replication "
"factor, adding a node in a new rack, and removing or decommissioning a node that would "
"eliminate a rack.");
}
if (schema->is_counter()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
}
@@ -343,7 +341,16 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
warnings.emplace_back(std::move(warning_text));
}
schema_builder builder{keyspace(), column_family()};
const auto maybe_id = _properties.properties()->get_id();
if (maybe_id && db.try_find_table(*maybe_id)) {
const auto schema_ptr = db.find_schema(*maybe_id);
const auto& ks_name = schema_ptr->ks_name();
const auto& cf_name = schema_ptr->cf_name();
throw exceptions::invalid_request_exception(seastar::format("Table with ID {} already exists: {}.{}", *maybe_id, ks_name, cf_name));
}
schema_builder builder{keyspace(), column_family(), maybe_id};
auto add_columns = [this, &builder] (std::vector<const column_definition*>& defs, column_kind kind) mutable {
for (auto* def : defs) {
auto&& type = _properties.get_reversable_type(*def->column_specification->name, def->type);
@@ -389,8 +396,14 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
}
}
_properties.apply_to_builder(view_prop_defs::op_type::create, builder, std::move(schema_extensions),
db, keyspace(), is_colocated);
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(
"Cannot set or alter default_time_to_live for a materialized view. "
"Data in a materialized view always expire at the same time than "
"the corresponding data in the parent table.");
}
auto where_clause_text = util::relations_to_where_clause(_where_clause);
builder.with_view_info(schema, included.empty(), std::move(where_clause_text));
@@ -401,7 +414,7 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chunked_vector<mutation>, cql3::cql_warnings_vec>>
create_view_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
utils::chunked_vector<mutation> m;
auto [definition, warnings] = prepare_view(qp.db(), qp.proxy().get_token_metadata_ptr());
auto [definition, warnings] = prepare_view(qp.db());
try {
m = co_await service::prepare_new_view_announcement(qp.proxy(), std::move(definition), ts);
} catch (const exceptions::already_exists_exception& e) {

View File

@@ -7,9 +7,9 @@
#pragma once
#include "cql3/statements/schema_altering_statement.hh"
#include "cql3/statements/cf_properties.hh"
#include "cql3/cf_name.hh"
#include "cql3/expr/expression.hh"
#include "cql3/statements/view_prop_defs.hh"
#include <seastar/core/shared_ptr.hh>
@@ -35,7 +35,7 @@ private:
expr::expression _where_clause;
std::vector<::shared_ptr<cql3::column_identifier::raw>> _partition_keys;
std::vector<::shared_ptr<cql3::column_identifier::raw>> _clustering_keys;
view_prop_defs _properties;
cf_properties _properties;
bool _if_not_exists;
public:
@@ -48,7 +48,7 @@ public:
std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
bool if_not_exists);
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db) const;
auto& properties() {
return _properties;

View File

@@ -710,12 +710,11 @@ std::vector<lw_shared_ptr<column_specification>> listing_describe_statement::get
future<std::vector<std::vector<managed_bytes_opt>>> listing_describe_statement::describe(cql3::query_processor& qp, const service::client_state& client_state) const {
auto db = qp.db();
auto raw_ks = client_state.get_raw_keyspace();
std::vector<sstring> keyspaces;
// For most describe statements we should limit the results to the USEd
// keyspace (client_state.get_raw_keyspace()), if any. However for DESC
// KEYSPACES we must list all keyspaces, not just the USEd one.
if (_element != element_type::keyspace && !client_state.get_raw_keyspace().empty()) {
keyspaces.push_back(client_state.get_raw_keyspace());
if (!raw_ks.empty()) {
keyspaces.push_back(raw_ks);
} else {
keyspaces = db.get_all_keyspaces();
std::ranges::sort(keyspaces);

View File

@@ -11,7 +11,6 @@
#include <set>
#include <seastar/core/format.hh>
#include "index_prop_defs.hh"
#include "cql3/statements/view_prop_defs.hh"
#include "index/secondary_index.hh"
#include "exceptions/exceptions.hh"
@@ -22,9 +21,7 @@ static void check_system_option_specified(const index_options_map& options, cons
}
}
namespace cql3::statements {
void index_specific_prop_defs::validate() const {
void cql3::statements::index_prop_defs::validate() const {
static std::set<sstring> keywords({ sstring(KW_OPTIONS) });
property_definitions::validate(keywords);
@@ -43,13 +40,13 @@ void index_specific_prop_defs::validate() const {
}
index_options_map
index_specific_prop_defs::get_raw_options() const {
cql3::statements::index_prop_defs::get_raw_options() const {
auto options = get_map(KW_OPTIONS);
return !options ? std::unordered_map<sstring, sstring>() : std::unordered_map<sstring, sstring>(options->begin(), options->end());
}
index_options_map
index_specific_prop_defs::get_options() const {
cql3::statements::index_prop_defs::get_options() const {
auto options = get_raw_options();
options.emplace(db::index::secondary_index::custom_class_option_name, *custom_class);
if (index_version.has_value()) {
@@ -57,25 +54,3 @@ index_specific_prop_defs::get_options() const {
}
return options;
}
void index_prop_defs::extract_index_specific_properties_to(index_specific_prop_defs& target) {
if (properties()->has_property(index_specific_prop_defs::KW_OPTIONS)) {
auto value = properties()->extract_property(index_specific_prop_defs::KW_OPTIONS);
std::visit([&target] <typename T> (T&& val) {
target.add_property(index_specific_prop_defs::KW_OPTIONS, std::forward<T>(val));
}, std::move(value));
}
}
view_prop_defs index_prop_defs::into_view_prop_defs() && {
if (properties()->has_property(index_specific_prop_defs::KW_OPTIONS)) {
utils::on_internal_error(seastar::format(
"Precondition has been violated. The property '{}' is still present", index_specific_prop_defs::KW_OPTIONS));
}
view_prop_defs result = std::move(static_cast<view_prop_defs&>(*this));
return result;
}
} // namespace cql3::statements

View File

@@ -10,7 +10,6 @@
#pragma once
#include "cql3/statements/view_prop_defs.hh"
#include "property_definitions.hh"
#include <seastar/core/sstring.hh>
#include "schema/schema_fwd.hh"
@@ -24,7 +23,7 @@ namespace cql3 {
namespace statements {
class index_specific_prop_defs : public property_definitions {
class index_prop_defs : public property_definitions {
public:
static constexpr auto KW_OPTIONS = "options";
@@ -38,19 +37,6 @@ public:
index_options_map get_options() const;
};
struct index_prop_defs : public view_prop_defs {
/// Extract all of the index-specific properties to `target`.
///
/// If there's a property at an index-specific key, and if `target` already has
/// a value at that key, that value will be replaced.
void extract_index_specific_properties_to(index_specific_prop_defs& target);
/// Turns this object into an object of type `view_prop_defs`, as if moved.
///
/// Precondition: the object MUST NOT contain any index-specific property.
view_prop_defs into_view_prop_defs() &&;
};
}
}

View File

@@ -8,8 +8,8 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include <seastar/core/format.hh>
#include <seastar/core/sstring.hh>
#include "seastar/core/format.hh"
#include "seastar/core/sstring.hh"
#include "utils/assert.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "cql3/statements/request_validations.hh"

View File

@@ -11,7 +11,6 @@
#include <ranges>
#include <seastar/core/format.hh>
#include <stdexcept>
#include "cql3/statements/property_definitions.hh"
#include "exceptions/exceptions.hh"
#include "utils/overloaded_functor.hh"
@@ -103,18 +102,6 @@ bool property_definitions::has_property(const sstring& name) const {
return _properties.contains(name);
}
property_definitions::value_type property_definitions::extract_property(const sstring& name) {
auto it = _properties.find(name);
if (it == _properties.end()) {
throw std::out_of_range{std::format("No property of name '{}'", std::string_view(name))};
}
value_type result = std::move(it->second);
_properties.erase(it);
return result;
}
std::optional<property_definitions::value_type> property_definitions::get(const sstring& name) const {
if (auto it = _properties.find(name); it != _properties.end()) {
return it->second;

View File

@@ -59,8 +59,6 @@ protected:
public:
bool has_property(const sstring& name) const;
value_type extract_property(const sstring& name);
std::optional<value_type> get(const sstring& name) const;
std::optional<extended_map_type> get_extended_map(const sstring& name) const;

View File

@@ -261,8 +261,7 @@ future<> select_statement::check_access(query_processor& qp, const service::clie
auto& cf_name = s->is_view()
? s->view_info()->base_name()
: (cdc ? cdc->cf_name() : column_family());
const schema_ptr& base_schema = cdc ? cdc : _schema;
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*base_schema);
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*_schema);
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT, auth::command_desc::type::OTHER, is_vector_indexed);
} catch (const data_dictionary::no_such_column_family& e) {
// Will be validated afterwards.
@@ -1977,7 +1976,9 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
if (it == indexes.end()) {
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
}
if (index_opt || parameters->allow_filtering() || !(restrictions->is_empty()) || check_needs_allow_filtering_anyway(*restrictions)) {
throw exceptions::invalid_request_exception("ANN ordering by vector does not support filtering");
}
index_opt = *it;
if (!index_opt) {
@@ -2273,9 +2274,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
throw exceptions::invalid_request_exception("PER PARTITION LIMIT is not allowed with aggregate queries.");
}
bool is_ann_query = !_parameters->orderings().empty() && std::holds_alternative<select_statement::ann_vector>(_parameters->orderings().front().second);
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering() || is_ann_query,
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering(),
restrictions::check_indexes(!_parameters->is_mutation_fragments()));
if (_parameters->is_distinct()) {

View File

@@ -1,67 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "cql3/statements/view_prop_defs.hh"
namespace cql3::statements {
void view_prop_defs::validate_raw(op_type op, const data_dictionary::database db, sstring ks_name,
const schema::extensions_map& exts) const
{
cf_properties::validate(db, std::move(ks_name), exts);
if (use_compact_storage()) {
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
}
if (properties()->get_cdc_options(exts)) {
throw exceptions::invalid_request_exception("Cannot enable CDC for a materialized view");
}
if (op == op_type::create) {
const auto maybe_id = properties()->get_id();
if (maybe_id && db.try_find_table(*maybe_id)) {
const auto schema_ptr = db.find_schema(*maybe_id);
const auto& ks_name = schema_ptr->ks_name();
const auto& cf_name = schema_ptr->cf_name();
throw exceptions::invalid_request_exception(seastar::format("Table with ID {} already exists: {}.{}", *maybe_id, ks_name, cf_name));
}
}
}
void view_prop_defs::apply_to_builder(op_type op, schema_builder& builder, schema::extensions_map exts,
const data_dictionary::database db, sstring ks_name, bool is_colocated) const
{
_properties->apply_to_builder(builder, exts, db, std::move(ks_name), !is_colocated);
if (op == op_type::create) {
const auto maybe_id = properties()->get_id();
if (maybe_id) {
builder.set_uuid(*maybe_id);
}
}
if (op == op_type::alter) {
if (builder.get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(
"Cannot alter gc_grace_seconds of a materialized view to 0, since this "
"value is used to TTL undelivered updates. Setting gc_grace_seconds too "
"low might cause undelivered updates to expire before being replayed.");
}
}
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(
"Cannot set or alter default_time_to_live for a materialized view. "
"Data in a materialized view always expire at the same time than "
"the corresponding data in the parent table.");
}
}
} // namespace cql3::statements

View File

@@ -1,62 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "cql3/statements/cf_properties.hh"
namespace cql3::statements {
/// This type represents the possible properties of the following CQL statements:
///
/// * CREATE MATERIALIZED VIEW,
/// * ALTER MATERIALIZED VIEW.
///
/// Since the sets of the valid properties may differ between those statements, this type
/// is supposed to represent a superset of them.
///
/// This type does NOT guarantee that all of the necessary validation logic will be performed
/// by it. It strives to do that, but you should keep this in mind. What does that mean?
/// Some parts of validation may require more context that's not accessible from here.
///
/// As of yet, this type does not cover all of the validation logic that could be here either.
class view_prop_defs : public cf_properties {
public:
/// The type of a schema operation on a materialized view.
/// These values will be used to guide the validation logic.
enum class op_type {
create,
alter
};
public:
template <typename... Args>
view_prop_defs(Args&&... args) : cf_properties(std::forward<Args>(args)...) {}
// Explicitly delete this method. It's declared in the inherited types.
// The user of this interface should use `validate_raw` instead.
void validate(const data_dictionary::database, sstring ks_name, const schema::extensions_map&) const = delete;
/// Validate the properties for the specified schema operation.
///
/// The validation is *raw* because we mostly validate the properties in their string form (checking if
/// a property exists or not for instance) and only focus on the properties on their own, without
/// having access to any other information.
void validate_raw(op_type, const data_dictionary::database, sstring ks_name, const schema::extensions_map&) const;
/// Apply the properties to the provided schema_builder and validate them.
///
/// NOTE: If the validation fails, this function will throw an exception. What's more important,
/// however, is that the provided schema_builder might have already been modified by that
/// point. Because of that, in presence of an exception, the schema builder should NOT be
/// used anymore.
void apply_to_builder(op_type, schema_builder&, schema::extensions_map, const data_dictionary::database,
sstring ks_name, bool is_colocated) const;
};
} // namespace cql3::statements

View File

@@ -16,9 +16,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "batchlog_manager.hh"
#include "batchlog.hh"
@@ -320,8 +318,8 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
// Use a stable `now` accross all batches, so skip/replay decisions are the
// same accross a while prefix of written_at (accross all ids).
const auto now = db_clock::now();
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
@@ -378,7 +376,7 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await coroutine::maybe_yield();
co_await maybe_yield();
}
if (!mutations.empty()) {

View File

@@ -502,9 +502,6 @@ public:
void flush_segments(uint64_t size_to_remove);
void check_no_data_older_than_allowed();
// whitebox testing
std::function<future<>()> _oversized_pre_wait_memory_func;
private:
class shutdown_marker{};
@@ -1600,15 +1597,8 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ
scope_increment_counter allocating(totals.active_allocations);
// #27992 - whitebox testing. signal we are trying to lock out
// all allocators
if (_oversized_pre_wait_memory_func) {
co_await _oversized_pre_wait_memory_func();
}
auto permit = co_await std::move(fut);
// #27992 - task reordering _can_ force the available units to negative. this is ok.
SCYLLA_ASSERT(_request_controller.available_units() <= 0);
SCYLLA_ASSERT(_request_controller.available_units() == 0);
decltype(permit) fake_permit; // can't have allocate+sync release semaphore.
bool failed = false;
@@ -1869,15 +1859,13 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ
}
}
}
auto avail = _request_controller.available_units();
SCYLLA_ASSERT(avail <= 0);
SCYLLA_ASSERT(_request_controller.available_units() == 0);
SCYLLA_ASSERT(permit.count() == max_request_controller_units());
auto nw = _request_controller.waiters();
permit.return_all();
// #20633 cannot guarantee controller avail is now full, since we could have had waiters when doing
// return all -> now will be less avail
SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == (avail + ssize_t(max_request_controller_units())));
SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == ssize_t(max_request_controller_units()));
if (!failed) {
clogger.trace("Oversized allocation succeeded.");
@@ -3961,9 +3949,6 @@ void db::commitlog::update_max_data_lifetime(std::optional<uint64_t> commitlog_d
_segment_manager->cfg.commitlog_data_max_lifetime_in_seconds = commitlog_data_max_lifetime_in_seconds;
}
void db::commitlog::set_oversized_pre_wait_memory_func(std::function<future<>()> f) {
_segment_manager->_oversized_pre_wait_memory_func = std::move(f);
}
future<std::vector<sstring>> db::commitlog::get_segments_to_replay() const {
return _segment_manager->get_segments_to_replay();

View File

@@ -385,9 +385,6 @@ public:
// (Re-)set data mix lifetime.
void update_max_data_lifetime(std::optional<uint64_t> commitlog_data_max_lifetime_in_seconds);
// Whitebox testing. Do not use for production
void set_oversized_pre_wait_memory_func(std::function<future<>()>);
using commit_load_reader_func = std::function<future<>(buffer_and_replay_position)>;
class segment_error : public std::exception {};

View File

@@ -1105,14 +1105,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142,
"Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_transport_port_proxy_protocol(this, "native_transport_port_proxy_protocol", value_status::Used, 0,
"Port on which the CQL native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
, native_transport_port_ssl_proxy_protocol(this, "native_transport_port_ssl_proxy_protocol", value_status::Used, 0,
"Port on which the CQL TLS native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
, native_shard_aware_transport_port_proxy_protocol(this, "native_shard_aware_transport_port_proxy_protocol", value_status::Used, 0,
"Like native_transport_port_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_shard_aware_transport_port_ssl_proxy_protocol(this, "native_shard_aware_transport_port_ssl_proxy_protocol", value_status::Used, 0,
"Like native_transport_port_ssl_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128,
"The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n"
"Default is different (128 versus unlimited).\n"
@@ -1318,7 +1310,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.")
, prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.")
, prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.")
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, true, "Enable Prometheus protobuf with native histogram. Set to false to force text exposition format.")
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, false, "If set allows the experimental Prometheus protobuf with native histogram")
, abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.")
, murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.")
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
@@ -1447,10 +1439,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"SELECT statements with aggregation or GROUP BYs or a secondary index may use this page size for their internal reading data, not the page size specified in the query options.")
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
, alternator_port_proxy_protocol(this, "alternator_port_proxy_protocol", value_status::Used, 0,
"Port on which the Alternator API listens for clients using proxy protocol v2. Disabled (0) by default.")
, alternator_https_port_proxy_protocol(this, "alternator_https_port_proxy_protocol", value_status::Used, 0,
"Port on which the Alternator HTTPS API listens for clients using proxy protocol v2. Disabled (0) by default.")
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
@@ -1482,15 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, alternator_describe_table_info_cache_validity_in_seconds(this, "alternator_describe_table_info_cache_validity_in_seconds", liveness::LiveUpdate, value_status::Used, 60 * 60 * 6,
"The validity of DescribeTable information - table size in bytes. This is how long calculated value will be reused before recalculation.")
, alternator_response_gzip_compression_level(this, "alternator_response_gzip_compression_level", liveness::LiveUpdate, value_status::Used, int8_t(6),
"Controls gzip and deflate compression level for Alternator response bodies (if the client requests it via Accept-Encoding header) Default of 6 is a compromise between speed and compression.\n"
"Valid values:\n"
"\t0 : No compression (disables gzip/deflate)\n"
"\t1-9: Compression levels (1 = fastest, 9 = best compression)")
, alternator_response_compression_threshold_in_bytes(this, "alternator_response_compression_threshold_in_bytes", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"When the compression is enabled, this value indicates the minimum size of data to compress. Smaller responses will not be compressed.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
@@ -1570,8 +1549,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"\tdisabled: New keyspaces use vnodes by default, unless enabled by the tablets={'enabled':true} option\n"
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'enabled':false} option\n"
"\tenforced: New keyspaces must use tablets. Tablets cannot be disabled using the CREATE KEYSPACE option")
, auto_repair_enabled_default(this, "auto_repair_enabled_default", liveness::LiveUpdate, value_status::Used, false, "Set true to enable auto repair for tablet tables by default. The value will be overridden by the per keyspace or per table configuration which is not implemented yet.")
, auto_repair_threshold_default_in_seconds(this, "auto_repair_threshold_default_in_seconds", liveness::LiveUpdate, value_status::Used, 24 * 3600 , "Set the default time in seconds for the auto repair threshold for tablet tables. If the time since last repair is bigger than the configured time, the tablet is eligible for auto repair. The value will be overridden by the per keyspace or per table configuration which is not implemented yet.")
, view_flow_control_delay_limit_in_ms(this, "view_flow_control_delay_limit_in_ms", liveness::LiveUpdate, value_status::Used, 1000,
"The maximal amount of time that materialized-view update flow control may delay responses "
"to try to slow down the client and prevent buildup of unfinished view updates. "
@@ -1589,12 +1566,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
"Tablet load stats refresh rate in seconds.")
, force_capacity_based_balancing(this, "force_capacity_based_balancing", liveness::LiveUpdate, value_status::Used, false,
"Forces the load balancer to perform capacity based balancing, instead of size based balancing.")
, size_based_balance_threshold_percentage(this, "size_based_balance_threshold_percentage", liveness::LiveUpdate, value_status::Used, 1.0,
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")

View File

@@ -324,10 +324,6 @@ public:
named_value<uint16_t> native_transport_port_ssl;
named_value<uint16_t> native_shard_aware_transport_port;
named_value<uint16_t> native_shard_aware_transport_port_ssl;
named_value<uint16_t> native_transport_port_proxy_protocol;
named_value<uint16_t> native_transport_port_ssl_proxy_protocol;
named_value<uint16_t> native_shard_aware_transport_port_proxy_protocol;
named_value<uint16_t> native_shard_aware_transport_port_ssl_proxy_protocol;
named_value<uint32_t> native_transport_max_threads;
named_value<uint32_t> native_transport_max_frame_size_in_mb;
named_value<sstring> broadcast_rpc_address;
@@ -464,8 +460,6 @@ public:
named_value<uint16_t> alternator_port;
named_value<uint16_t> alternator_https_port;
named_value<uint16_t> alternator_port_proxy_protocol;
named_value<uint16_t> alternator_https_port_proxy_protocol;
named_value<sstring> alternator_address;
named_value<bool> alternator_enforce_authorization;
named_value<bool> alternator_warn_authorization;
@@ -479,9 +473,6 @@ public:
named_value<bool> alternator_allow_system_table_write;
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<uint32_t> alternator_describe_table_info_cache_validity_in_seconds;
named_value<int> alternator_response_gzip_compression_level;
named_value<uint32_t> alternator_response_compression_threshold_in_bytes;
named_value<bool> abort_on_ebadf;
@@ -571,8 +562,6 @@ public:
named_value<double> topology_barrier_stall_detector_threshold_seconds;
named_value<bool> enable_tablets;
named_value<enum_option<tablets_mode_t>> tablets_mode_for_new_keyspaces;
named_value<bool> auto_repair_enabled_default;
named_value<int32_t> auto_repair_threshold_default_in_seconds;
bool enable_tablets_by_default() const noexcept {
switch (tablets_mode_for_new_keyspaces()) {
@@ -601,9 +590,6 @@ public:
named_value<bool> rf_rack_valid_keyspaces;
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
named_value<bool> force_capacity_based_balancing;
named_value<float> size_based_balance_threshold_percentage;
named_value<uint64_t> minimal_tablet_size_for_balancing;
static const sstring default_tls_priority;
private:

View File

@@ -26,7 +26,6 @@
#include <seastar/core/smp.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>
// Boost features.
@@ -644,12 +643,6 @@ future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept
co_return;
}
if (!replay_allowed()) {
auto reason = seastar::format("Precondition violdated while trying to drain {} / {}: "
"hint replay is not allowed", host_id, ip);
on_internal_error(manager_logger, std::move(reason));
}
manager_logger.info("Draining starts for {}", host_id);
const auto holder = seastar::gate::holder{_draining_eps_gate};
@@ -906,7 +899,7 @@ future<> manager::migrate_ip_directories() {
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
try {
manager_logger.warn("Removing hint directory {}", directory.native());
co_await seastar::recursive_remove_directory(directory);
co_await lister::rmdir(directory);
} catch (...) {
on_internal_error(manager_logger,
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));

View File

@@ -318,10 +318,6 @@ public:
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
/// corresponding hint_endpoint_manager objects.
///
/// Preconditions:
/// * Hint replay must be allowed (i.e. `replay_allowed()` must be true) throughout
/// the execution of this function.
///
/// \param host_id host ID of the node that left the cluster
/// \param ip the IP of the node that left the cluster
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
@@ -346,15 +342,15 @@ public:
return _state.contains(state::started);
}
bool replay_allowed() const noexcept {
return _state.contains(state::replay_allowed);
}
private:
void set_started() noexcept {
_state.set(state::started);
}
bool replay_allowed() const noexcept {
return _state.contains(state::replay_allowed);
}
void set_draining_all() noexcept {
_state.set(state::draining_all);
}

View File

@@ -14,20 +14,15 @@
#include <boost/lexical_cast.hpp>
#include "utils/s3/creds.hh"
#include "utils/http.hh"
#include "object_storage_endpoint_param.hh"
using namespace std::string_literals;
static auto format_url(std::string_view host, unsigned port, bool use_https) {
return fmt::format("{}://{}:{}", use_https ? "https" : "http", host, port);
}
db::object_storage_endpoint_param::object_storage_endpoint_param(s3_storage s)
: _data(std::move(s))
{}
db::object_storage_endpoint_param::object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config)
: object_storage_endpoint_param(s3_storage{format_url(endpoint, config.port, config.use_https), std::move(config.region), std::move(config.role_arn), true /* legacy_format */})
: object_storage_endpoint_param(s3_storage{std::move(endpoint), std::move(config)})
{}
db::object_storage_endpoint_param::object_storage_endpoint_param(gs_storage s)
: _data(std::move(s))
@@ -37,29 +32,13 @@ db::object_storage_endpoint_param::object_storage_endpoint_param() = default;
db::object_storage_endpoint_param::object_storage_endpoint_param(const object_storage_endpoint_param&) = default;
std::string db::object_storage_endpoint_param::s3_storage::to_json_string() const {
if (!legacy_format) {
return fmt::format("{{ \"type\": \"s3\", \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
region, iam_role_arn
);
}
auto url = utils::http::parse_simple_url(endpoint);
return fmt::format("{{ \"port\": {}, \"use_https\": {}, \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
url.port, url.is_https(), region, iam_role_arn
config.port, config.use_https, config.region, config.role_arn
);
}
std::string db::object_storage_endpoint_param::s3_storage::key() const {
// The `endpoint` is full URL all the time, so only return it as a key
// if it wasn't configured "the old way". In the latter case, split the
// URL and return its host part to mimic the old behavior.
if (!legacy_format) {
return endpoint;
}
auto url = utils::http::parse_simple_url(endpoint);
return url.host;
return endpoint;
}
std::string db::object_storage_endpoint_param::gs_storage::to_json_string() const {
@@ -120,6 +99,8 @@ const std::string& db::object_storage_endpoint_param::type() const {
db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(const YAML::Node& node) {
auto name = node["name"];
auto aws_region = node["aws_region"];
auto iam_role_arn = node["iam_role_arn"];
auto type = node["type"];
auto get_opt = [](auto& node, const std::string& key, auto def) {
@@ -127,20 +108,13 @@ db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(cons
return tmp ? tmp.template as<std::decay_t<decltype(def)>>() : def;
};
// aws s3 endpoint.
if (!type || type.as<std::string>() == s3_type) {
if (!type || type.as<std::string>() == s3_type || aws_region || iam_role_arn) {
s3_storage ep;
auto endpoint = name.as<std::string>();
ep.legacy_format = (!endpoint.starts_with("http://") && !endpoint.starts_with("https://"));
if (!ep.legacy_format) {
ep.endpoint = std::move(endpoint);
} else {
ep.endpoint = format_url(endpoint, node["port"].as<unsigned>(), node["https"].as<bool>(false));
}
auto aws_region = node["aws_region"];
ep.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
ep.iam_role_arn = get_opt(node, "iam_role_arn", ""s);
ep.endpoint = name.as<std::string>();
ep.config.port = node["port"].as<unsigned>();
ep.config.use_https = node["https"].as<bool>(false);
ep.config.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
ep.config.role_arn = iam_role_arn ? iam_role_arn.as<std::string>() : "";
return object_storage_endpoint_param{std::move(ep)};
}

View File

@@ -25,9 +25,7 @@ class object_storage_endpoint_param {
public:
struct s3_storage {
std::string endpoint;
std::string region;
std::string iam_role_arn;
bool legacy_format; // FIXME convert it to bool_class after seastar#3198
s3::endpoint_config config;
std::strong_ordering operator<=>(const s3_storage&) const = default;
std::string to_json_string() const;

View File

@@ -961,15 +961,15 @@ public:
auto include_pending_changes = [&table_schemas](schema_diff_per_shard d) -> future<> {
for (auto& schema : d.dropped) {
co_await coroutine::maybe_yield();
co_await maybe_yield();
table_schemas.erase(schema->id());
}
for (auto& change : d.altered) {
co_await coroutine::maybe_yield();
co_await maybe_yield();
table_schemas.insert_or_assign(change.new_schema->id(), change.new_schema);
}
for (auto& schema : d.created) {
co_await coroutine::maybe_yield();
co_await maybe_yield();
table_schemas.insert_or_assign(schema->id(), schema);
}
};

View File

@@ -152,8 +152,7 @@ future<> backup_task_impl::do_backup() {
}
future<> backup_task_impl::process_snapshot_dir() {
auto directory = co_await io_check(open_directory, _snapshot_dir.native());
auto snapshot_dir_lister = directory_lister(directory, _snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
auto snapshot_dir_lister = directory_lister(_snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
size_t num_sstable_comps = 0;
try {
@@ -162,7 +161,7 @@ future<> backup_task_impl::process_snapshot_dir() {
while (auto component_ent = co_await snapshot_dir_lister.get()) {
const auto& name = component_ent->name;
auto file_path = _snapshot_dir / name;
auto st = co_await file_stat(directory, name);
auto st = co_await file_stat(file_path.native());
total += st.size;
try {
auto desc = sstables::parse_path(file_path, "", "");

View File

@@ -55,7 +55,6 @@
#include "message/shared_dict.hh"
#include "replica/database.hh"
#include "db/compaction_history_entry.hh"
#include "mutation/async_utils.hh"
#include <unordered_map>
@@ -140,7 +139,6 @@ namespace {
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES,
system_keyspace::REPAIR_TASKS,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -491,24 +489,6 @@ schema_ptr system_keyspace::repair_history() {
return schema;
}
schema_ptr system_keyspace::repair_tasks() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
.with_column("task_uuid", uuid_type, column_kind::partition_key)
.with_column("operation", utf8_type, column_kind::clustering_key)
// First and last token for of the tablet
.with_column("first_token", long_type, column_kind::clustering_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("timestamp", timestamp_type)
.with_column("table_uuid", uuid_type, column_kind::static_column)
.set_comment("Record tablet repair tasks")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2375,7 +2355,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
repair_tasks(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
@@ -2619,32 +2598,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
});
}
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
constexpr int ttl = 10 * 24 * 3600;
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
utils::chunked_vector<canonical_mutation> cmuts(muts.begin(), muts.end());
co_return cmuts;
}
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_task_entry ent;
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
ent.first_token = row.get_as<int64_t>("first_token");
ent.last_token = row.get_as<int64_t>("last_token");
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
co_await f(std::move(ent));
co_return stop_iteration::no;
});
}
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
@@ -3046,9 +2999,7 @@ future<mutation> system_keyspace::get_group0_history(sharded<replica::database>&
SCYLLA_ASSERT(rs);
auto& ps = rs->partitions();
for (auto& p: ps) {
// Note: we could decorate the frozen_mutation's key to check if it's the expected one
// but since this is a single partition table, we can just check after unfreezing the whole mutation.
auto mut = co_await unfreeze_gently(p.mut(), s);
auto mut = p.mut().unfreeze(s);
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
if (partition_key == GROUP0_HISTORY_KEY) {
co_return mut;
@@ -3295,7 +3246,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
}
if (row.has("topology_request") && nstate != service::node_state::left) {
if (row.has("topology_request")) {
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
ret.requests.emplace(host_id, req);
switch(req) {
@@ -3845,35 +3796,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
{system_keyspace::repair_task_operation::requested, "requested"},
{system_keyspace::repair_task_operation::finished, "finished"},
};
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
for (auto&& [v, s] : repair_task_operation_to_name) {
result.emplace(s, v);
}
return result;
});
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
auto i = repair_task_operation_to_name.find(op);
if (i == repair_task_operation_to_name.end()) {
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
}
return i->second;
}
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
return repair_task_operation_from_name.at(name);
}
} // namespace db
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
}

View File

@@ -57,8 +57,6 @@ namespace paxos {
struct topology_request_state;
class group0_guard;
class raft_group0_client;
}
namespace netw {
@@ -187,7 +185,6 @@ public:
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
@@ -203,7 +200,6 @@ public:
static constexpr auto DICTS = "dicts";
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
static constexpr auto CLIENT_ROUTES = "client_routes";
static constexpr auto VERSIONS = "versions";
// auth
static constexpr auto ROLES = "roles";
@@ -267,7 +263,6 @@ public:
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
@@ -407,22 +402,6 @@ public:
int64_t range_end;
};
enum class repair_task_operation {
requested,
finished,
};
static sstring repair_task_operation_to_string(repair_task_operation op);
static repair_task_operation repair_task_operation_from_string(const sstring& name);
struct repair_task_entry {
tasks::task_id task_uuid;
repair_task_operation operation;
int64_t first_token;
int64_t last_token;
db_clock::time_point timestamp;
table_id table_uuid;
};
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
@@ -444,10 +423,6 @@ public:
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
future<> get_repair_history(table_id, repair_history_consumer f);
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
future<replay_positions> get_truncated_positions(table_id);
future<> drop_truncation_rp_records();
@@ -757,8 +732,3 @@ public:
}; // class system_keyspace
} // namespace db
template <>
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -57,7 +57,7 @@
#include "locator/network_topology_strategy.hh"
#include "mutation/mutation.hh"
#include "mutation/mutation_partition.hh"
#include <seastar/core/on_internal_error.hh>
#include "seastar/core/on_internal_error.hh"
#include "service/migration_manager.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/storage_proxy.hh"
@@ -2244,7 +2244,7 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
// Guard the whole startup routine with a semaphore,
// so that it's not intercepted by `on_drop_view`, `on_create_view`
// or `on_update_view` events.
auto units = co_await get_units(_sem, view_builder_semaphore_units);
auto units = co_await get_units(_sem, 1);
// Wait for schema agreement even if we're a seed node.
co_await mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as);
@@ -2659,7 +2659,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) {
co_await utils::get_local_injector().inject("add_new_view_pause_last_shard", utils::wait_for_message(5min));
}
co_await _sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), step.current_token());
co_await _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
}
@@ -2667,74 +2667,40 @@ static bool should_ignore_tablet_keyspace(const replica::database& db, const sst
return db.features().view_building_coordinator && db.has_keyspace(ks_name) && db.find_keyspace(ks_name).uses_tablets();
}
future<> view_builder::dispatch_create_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return make_ready_future<>();
}
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
// This runs on shard 0 only; seed the global rows before broadcasting.
return handle_seed_view_build_progress(ks_name, view_name).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return container().invoke_on_all([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable {
return vb.handle_create_view_local(std::move(ks_name), std::move(view_name));
});
});
});
}
future<> view_builder::handle_seed_view_build_progress(sstring ks_name, sstring view_name) {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto& step = get_or_create_build_step(view->view_info()->base_id());
return _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
}
future<> view_builder::handle_create_view_local(sstring ks_name, sstring view_name){
if (this_shard_id() == 0) {
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
} else {
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
});
}
}
future<> view_builder::handle_create_view_local_impl(sstring ks_name, sstring view_name) {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto& step = get_or_create_build_step(view->view_info()->base_id());
return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
return flush_base(step.base, _as);
}).then([this, view, &step] () {
// This resets the build step to the current token. It may result in views currently
// being built to receive duplicate updates, but it simplifies things as we don't have
// to keep around a list of new views to build the next time the reader crosses a token
// threshold.
return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
return add_new_view(view, step);
}).then_wrapped([this, view] (future<>&& f) {
try {
f.get();
} catch (abort_requested_exception&) {
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
} catch (raft::request_aborted&) {
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
} catch (...) {
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
}
// Waited on indirectly in stop().
static_cast<void>(_build_step.trigger());
});
});
}
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
if (this_shard_id() != 0) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return;
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(dispatch_create_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
}));
// Do it in the background, serialized.
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto& step = get_or_create_build_step(view->view_info()->base_id());
return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
return flush_base(step.base, _as);
}).then([this, view, &step] () mutable {
// This resets the build step to the current token. It may result in views currently
// being built to receive duplicate updates, but it simplifies things as we don't have
// to keep around a list of new views to build the next time the reader crosses a token
// threshold.
return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) {
try {
f.get();
} catch (abort_requested_exception&) {
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
} catch (raft::request_aborted&) {
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
} catch (...) {
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
}
// Waited on indirectly in stop().
(void)_build_step.trigger();
});
});
});
}).handle_exception_type([] (replica::no_such_column_family&) { });
}
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
@@ -2743,7 +2709,7 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
}
// Do it in the background, serialized.
(void)with_semaphore(_sem, view_builder_semaphore_units, [ks_name, view_name, this] {
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
if (step_it == _base_to_build_step.end()) {
@@ -2758,75 +2724,45 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
}).handle_exception_type([] (replica::no_such_column_family&) { });
}
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return make_ready_future<>();
}
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
// This runs on shard 0 only; broadcast local cleanup before global cleanup.
return container().invoke_on_all([ks_name, view_name] (view_builder& vb) mutable {
return vb.handle_drop_view_local(std::move(ks_name), std::move(view_name));
}).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return handle_drop_view_global_cleanup(std::move(ks_name), std::move(view_name));
});
});
}
future<> view_builder::handle_drop_view_local(sstring ks_name, sstring view_name) {
if (this_shard_id() == 0) {
return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
} else {
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
});
}
}
future<> view_builder::handle_drop_view_local_impl(sstring ks_name, sstring view_name) {
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
// The view is absent from the database at this point, so find it by brute force.
([&, this] {
for (auto& [_, step] : _base_to_build_step) {
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
continue;
}
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
if (it->view->cf_name() == view_name) {
_built_views.erase(it->view->id());
step.build_status.erase(it);
return;
}
}
}
})();
return make_ready_future<>();
}
future<> view_builder::handle_drop_view_global_cleanup(sstring ks_name, sstring view_name) {
if (this_shard_id() != 0) {
return make_ready_future<>();
}
vlogger.info0("Starting view global cleanup {}.{}", ks_name, view_name);
return when_all_succeed(
_sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name),
_sys_ks.remove_built_view(ks_name, view_name),
remove_view_build_status(ks_name, view_name))
.discard_result()
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
});
}
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
if (this_shard_id() != 0) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return;
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(dispatch_drop_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
}));
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
// Do it in the background, serialized.
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
// The view is absent from the database at this point, so find it by brute force.
([&, this] {
for (auto& [_, step] : _base_to_build_step) {
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
continue;
}
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
if (it->view->cf_name() == view_name) {
_built_views.erase(it->view->id());
step.build_status.erase(it);
return;
}
}
}
})();
if (this_shard_id() != 0) {
// Shard 0 can't remove the entry in the build progress system table on behalf of the
// current shard, since shard 0 may have already processed the notification, and this
// shard may since have updated the system table if the drop happened concurrently
// with the build.
return _sys_ks.remove_view_build_progress(ks_name, view_name);
}
return when_all_succeed(
_sys_ks.remove_view_build_progress(ks_name, view_name),
_sys_ks.remove_built_view(ks_name, view_name),
remove_view_build_status(ks_name, view_name))
.discard_result()
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
});
}).handle_exception_type([] (replica::no_such_keyspace&) {});
}
future<> view_builder::do_build_step() {
@@ -2837,7 +2773,7 @@ future<> view_builder::do_build_step() {
return seastar::async(std::move(attr), [this] {
exponential_backoff_retry r(1s, 1min);
while (!_base_to_build_step.empty() && !_as.abort_requested()) {
auto units = get_units(_sem, view_builder_semaphore_units).get();
auto units = get_units(_sem, 1).get();
++_stats.steps_performed;
try {
execute(_current_step->second, exponential_backoff_retry(1s, 1min));
@@ -3697,20 +3633,20 @@ sstring build_status_to_sstring(build_status status) {
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
}
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name, locator::token_metadata_ptr tmptr) {
const auto& rs = db.find_keyspace(keyspace_name).get_replication_strategy();
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
const bool tablet_views_enabled = db.features().views_with_tablets;
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
if (rs.uses_tablets() && !db.features().views_with_tablets) {
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
if (!required_config && uses_tablets) {
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
"To be able to use them, make sure all nodes in the cluster are upgraded.");
}
try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs);
} catch (const std::exception& e) {
throw std::logic_error(fmt::format(
"Materialized views and secondary indexes are not supported on the keyspace '{}': {}",
keyspace_name, e.what()));
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
}
}

View File

@@ -9,7 +9,6 @@
#pragma once
#include "gc_clock.hh"
#include "locator/token_metadata_fwd.hh"
#include "query/query-request.hh"
#include "schema/schema_fwd.hh"
#include "readers/mutation_reader.hh"
@@ -319,7 +318,7 @@ endpoints_to_update get_view_natural_endpoint(
///
/// Preconditions:
/// * The provided `keyspace_name` must correspond to an existing keyspace.
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name, locator::token_metadata_ptr tmptr);
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
}

View File

@@ -169,11 +169,10 @@ class view_builder final : public service::migration_listener::only_view_notific
base_to_build_step_type _base_to_build_step;
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
static constexpr size_t view_builder_semaphore_units = 1;
// Ensures bookkeeping operations are serialized, meaning that while we execute
// a build step we don't consider newly added or removed views. This simplifies
// the algorithms. Also synchronizes an operation wrt. a call to stop().
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::named_semaphore _sem{1, named_semaphore_exception_factory{"view builder"}};
seastar::abort_source _as;
future<> _started = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view.
@@ -267,14 +266,6 @@ private:
future<> maybe_mark_view_as_built(view_ptr, dht::token);
future<> mark_as_built(view_ptr);
void setup_metrics();
future<> dispatch_create_view(sstring ks_name, sstring view_name);
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
future<> handle_seed_view_build_progress(sstring ks_name, sstring view_name);
future<> handle_create_view_local(sstring ks_name, sstring view_name);
future<> handle_drop_view_local(sstring ks_name, sstring view_name);
future<> handle_create_view_local_impl(sstring ks_name, sstring view_name);
future<> handle_drop_view_local_impl(sstring ks_name, sstring view_name);
future<> handle_drop_view_global_cleanup(sstring ks_name, sstring view_name);
template <typename Func1, typename Func2>
future<> write_view_build_status(Func1&& fn_group0, Func2&& fn_sys_dist) {

View File

@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
future<> view_building_worker::run_staging_sstables_registrator() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
co_await create_staging_sstable_tasks();
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
} catch (raft::request_aborted&) {
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
} catch (...) {
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
sleep = true;
}
if (sleep) {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
}
}
}
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
auto it = vbw._state._batch->tasks.begin();
while (it != vbw._state._batch->tasks.end()) {
auto id = it->first;
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
++it; // Advance the iterator before potentially removing the entry from the map.
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
for (auto& [id, t]: tasks_map) {
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
if (!task_opt || task_opt->get().aborted) {
co_await vbw._state._batch->abort_task(id);
}
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
}) | std::ranges::to<std::unordered_set>();;
}
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
// clear the state, save and flush new base table
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
if (processing_base_table != building_state.currently_processed_base_table) {
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
break;
}
}
_vbw.local()._vb_state_machine.event.broadcast();
}
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
tasks.insert({id, *task_opt});
}
#ifdef SEASTAR_DEBUG
{
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
#endif
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
co_return collect_completed_tasks();
}
}
}

View File

@@ -17,7 +17,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "raft/raft.hh"
#include <seastar/core/gate.hh>
#include "seastar/core/gate.hh"
#include "db/view/view_building_state.hh"
#include "sstables/shared_sstable.hh"
#include "utils/UUID.hh"

View File

@@ -102,13 +102,13 @@ view_update_generator::view_update_generator(replica::database& db, sharded<serv
, _early_abort_subscription(as.subscribe([this] () noexcept { do_abort(); }))
{
setup_metrics();
discover_staging_sstables();
_db.plug_view_update_generator(*this);
}
view_update_generator::~view_update_generator() {}
future<> view_update_generator::start() {
discover_staging_sstables();
_started = seastar::async([this]() mutable {
auto drop_sstable_references = defer([&] () noexcept {
// Clear sstable references so sstables_manager::stop() doesn't hang.

View File

@@ -605,8 +605,8 @@ public:
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using client_data_vec = utils::chunked_vector<client_data>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
if (cd_map.contains(cd.ip)) {
cd_map[cd.ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
dht::decorated_key key = make_partition_key(cd.ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
ips.insert(decorated_ip{std::move(key), cd.ip});
cd_map[cd.ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
return a.port < b.port || a.client_type_str() < b.client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
set_cell(cr.cells(), "shard_id", cd.shard_id);
set_cell(cr.cells(), "connection_stage", cd.stage_str());
if (cd.driver_name) {
set_cell(cr.cells(), "driver_name", *cd.driver_name);
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
if (cd.driver_version) {
set_cell(cr.cells(), "driver_version", *cd.driver_version);
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
if (cd.hostname) {
set_cell(cr.cells(), "hostname", *cd.hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
if (cd.protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
if (cd.ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
if (cd.ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
if (cd.ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
if (cd.scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
@@ -1120,10 +1100,9 @@ public:
}
auto tm = _db.local().get_token_metadata_ptr();
auto target_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
locator::load_sketch load(tm, stats, default_tablet_size);
locator::load_sketch load(tm);
co_await load.populate();
tm->get_topology().for_each_node([&] (const auto& node) {
@@ -1137,23 +1116,18 @@ public:
if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
}
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
set_cell(r.cells(), "tablets_allocated", load.get_load(host));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_shard_load(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_load(host) * target_tablet_size)));
if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
if (auto utilization = load.get_allocated_utilization(host)) {
auto utilization = load.get_allocated_utilization(host, *stats, target_tablet_size);
if (utilization) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
}
if (load.has_complete_data(host)) {
if (auto utilization = load.get_storage_utilization(host)) {
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
}
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
}
}
mutation_sink(m);
});
@@ -1173,8 +1147,6 @@ private:
.with_column("storage_capacity", long_type)
.with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type)
.with_column("storage_load", long_type)
.with_column("storage_utilization", double_type)
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();

View File

@@ -7,6 +7,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import os
import sys
import errno
import logging

View File

@@ -8,7 +8,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import argparse
from scylla_blocktune import tune_yaml, tune_fs, tune_dev
from scylla_blocktune import *
if __name__ == "__main__":
ap = argparse.ArgumentParser('Tune filesystems for ScyllaDB')

View File

@@ -14,9 +14,7 @@ import subprocess
import time
import tempfile
import shutil
import re
import distro
from scylla_util import out, is_debian_variant, is_suse_variant, pkg_install, is_redhat_variant, systemd_unit, is_gentoo
from scylla_util import *
from subprocess import run

View File

@@ -10,8 +10,9 @@
import os
import sys
import argparse
import shutil
from scylla_util import is_debian_variant, pkg_install, systemd_unit, sysconfig_parser, is_gentoo, is_arch, is_amzn2, is_suse_variant, is_redhat_variant
import shlex
import distro
from scylla_util import *
UNIT_DATA= '''
[Unit]

View File

@@ -10,7 +10,7 @@
import os
import sys
import argparse
from scylla_util import is_container, sysconfig_parser
from scylla_util import *
if __name__ == '__main__':
if not is_container() and os.getuid() > 0:

View File

@@ -10,7 +10,7 @@
import os
import sys
import argparse
from scylla_util import is_nonroot, is_container, etcdir, sysconfig_parser
from scylla_util import *
if __name__ == '__main__':
if not is_nonroot() and not is_container() and os.getuid() > 0:

View File

@@ -8,6 +8,8 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import os
import sys
import yaml
import argparse
import subprocess

View File

@@ -8,8 +8,9 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import os
import sys
from scylla_util import systemd_unit
import subprocess
import shutil
from scylla_util import *
if __name__ == '__main__':
if os.getuid() > 0:

View File

@@ -9,7 +9,7 @@
import os
import re
from scylla_util import etcdir, datadir, bindir, scriptsdir, is_nonroot, is_container, is_developer_mode
from scylla_util import *
import resource
import subprocess
import argparse

View File

@@ -10,7 +10,7 @@
import os
import sys
import shutil
from scylla_util import pkg_install
from scylla_util import *
from subprocess import run, DEVNULL
if __name__ == '__main__':

View File

@@ -7,8 +7,9 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
from pathlib import Path
from datetime import datetime
from scylla_util import scylladir_p
from scylla_util import *
if __name__ == '__main__':
log = scylladir_p() / 'scylla-server.log'

View File

@@ -10,7 +10,7 @@
import os
import sys
import argparse
from scylla_util import etcdir_p
from scylla_util import *
if __name__ == '__main__':
if os.getuid() > 0:

View File

@@ -11,9 +11,10 @@ import os
import sys
import argparse
import re
import distro
import shutil
from scylla_util import systemd_unit, is_redhat_variant, pkg_install
from scylla_util import *
from subprocess import run
from pathlib import Path

View File

@@ -12,10 +12,8 @@ import sys
import glob
import platform
import distro
import re
import traceback
from scylla_util import sysconfig_parser, out, get_set_nic_and_disks_config_value, check_sysfs_numa_topology_is_valid, sysconfdir_p, scylla_excepthook, perftune_base_command
from scylla_util import *
from subprocess import run
def get_cur_cpuset():

View File

@@ -9,6 +9,7 @@
import os
import argparse
import distutils.util
import pwd
import grp
import sys
@@ -17,22 +18,12 @@ import logging
import pyudev
import psutil
import platform
import shutil
from pathlib import Path
from scylla_util import is_unused_disk, out, pkg_install, systemd_unit, SystemdException, is_debian_variant, is_redhat_variant, is_offline
from subprocess import run, SubprocessError, Popen
from scylla_util import *
from subprocess import run, SubprocessError
LOGGER = logging.getLogger(__name__)
def strtobool(val):
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return 1
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return 0
else:
raise ValueError(f"invalid truth value {val!r}")
class UdevInfo:
def __init__(self, device_file):
self.context = pyudev.Context()
@@ -154,7 +145,7 @@ if __name__ == '__main__':
args = parser.parse_args()
# Allow args.online_discard to be used as a boolean value
args.online_discard = strtobool(args.online_discard)
args.online_discard = distutils.util.strtobool(args.online_discard)
root = args.root.rstrip('/')
if args.volume_role == 'all':
@@ -236,7 +227,7 @@ if __name__ == '__main__':
with open(discard_path) as f:
discard = f.read().strip()
if discard != '0':
proc = Popen(['blkdiscard', disk])
proc = subprocess.Popen(['blkdiscard', disk])
procs.append(proc)
for proc in procs:
proc.wait()

View File

@@ -8,9 +8,8 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import os
import sys
import argparse
from scylla_util import systemd_unit
from scylla_util import *
def update_rsysconf(rsyslog_server):
if ':' not in rsyslog_server:

Some files were not shown because too many files have changed in this diff Show More