Compare commits
2 Commits
copilot/im
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ebf5a02d1 | ||
|
|
03daab52ec |
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Progress
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-progress:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Review
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [ready_for_review, review_requested]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-review:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status Ready For Merge
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [labeled]
|
||||
|
||||
jobs:
|
||||
call-jira-status-update:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
41
.github/workflows/call_jira_sync.yml
vendored
41
.github/workflows/call_jira_sync.yml
vendored
@@ -1,41 +0,0 @@
|
||||
name: Sync Jira Based on PR Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
jira-sync-pr-opened:
|
||||
if: github.event.action == 'opened'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-in-review:
|
||||
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-add-label:
|
||||
if: github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-remove-label:
|
||||
if: github.event.action == 'unlabeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-pr-closed:
|
||||
if: github.event.action == 'closed'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,14 +0,0 @@
|
||||
name: Call Jira release creation for new milestone
|
||||
|
||||
on:
|
||||
milestone:
|
||||
types: [created]
|
||||
|
||||
jobs:
|
||||
sync-milestone-to-jira:
|
||||
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
|
||||
with:
|
||||
# Comma-separated list of Jira project keys
|
||||
jira_project_keys: "SCYLLADB,CUSTOMER"
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,13 +0,0 @@
|
||||
name: validate_pr_author_email
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
|
||||
jobs:
|
||||
validate_pr_author_email:
|
||||
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main
|
||||
|
||||
2
.github/workflows/codespell.yaml
vendored
2
.github/workflows/codespell.yaml
vendored
@@ -13,5 +13,5 @@ jobs:
|
||||
- uses: codespell-project/actions-codespell@master
|
||||
with:
|
||||
only_warn: 1
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
|
||||
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"
|
||||
|
||||
10
.github/workflows/docs-validate-metrics.yml
vendored
10
.github/workflows/docs-validate-metrics.yml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
- enterprise
|
||||
paths:
|
||||
- '**/*.cc'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/get_description.py'
|
||||
- 'docs/_ext/scylladb_metrics.py'
|
||||
|
||||
@@ -15,20 +15,20 @@ jobs:
|
||||
validate-metrics:
|
||||
runs-on: ubuntu-latest
|
||||
name: Check metrics documentation coverage
|
||||
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install PyYAML
|
||||
|
||||
|
||||
- name: Validate metrics
|
||||
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml
|
||||
|
||||
@@ -18,7 +18,6 @@ target_sources(alternator
|
||||
consumed_capacity.cc
|
||||
ttl.cc
|
||||
parsed_expression_cache.cc
|
||||
http_compression.cc
|
||||
${cql_grammar_srcs})
|
||||
target_include_directories(alternator
|
||||
PUBLIC
|
||||
|
||||
@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
|
||||
controller::controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -40,7 +39,6 @@ controller::controller(
|
||||
: protocol_server(sg)
|
||||
, _gossiper(gossiper)
|
||||
, _proxy(proxy)
|
||||
, _ss(ss)
|
||||
, _mm(mm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _cdc_gen_svc(cdc_gen_svc)
|
||||
@@ -91,7 +89,7 @@ future<> controller::start_server() {
|
||||
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
|
||||
return cfg.alternator_timeout_in_ms;
|
||||
};
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
|
||||
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
|
||||
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
@@ -171,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class storage_service;
|
||||
class migration_manager;
|
||||
class memory_limiter;
|
||||
}
|
||||
@@ -58,7 +57,6 @@ class server;
|
||||
class controller : public protocol_server {
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
sharded<service::storage_proxy>& _proxy;
|
||||
sharded<service::storage_service>& _ss;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<cdc::generation_service>& _cdc_gen_svc;
|
||||
@@ -76,7 +74,6 @@ public:
|
||||
controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -96,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -67,14 +67,6 @@ using namespace std::chrono_literals;
|
||||
|
||||
logging::logger elogger("alternator-executor");
|
||||
|
||||
namespace std {
|
||||
template <> struct hash<std::pair<sstring, sstring>> {
|
||||
size_t operator () (const std::pair<sstring, sstring>& p) const {
|
||||
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// Alternator-specific table properties stored as hidden table tags:
|
||||
@@ -256,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
|
||||
return *(v.MemberBegin());
|
||||
}
|
||||
|
||||
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
|
||||
executor &_executor;
|
||||
|
||||
struct table_info {
|
||||
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
|
||||
};
|
||||
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
|
||||
bool active = false;
|
||||
|
||||
public:
|
||||
describe_table_info_manager(executor& executor) : _executor(executor) {
|
||||
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
|
||||
active = true;
|
||||
}
|
||||
describe_table_info_manager(const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager(describe_table_info_manager&&) = delete;
|
||||
~describe_table_info_manager() {
|
||||
if (active) {
|
||||
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
|
||||
}
|
||||
}
|
||||
|
||||
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
|
||||
|
||||
static std::chrono::high_resolution_clock::time_point now() {
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
}
|
||||
|
||||
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
|
||||
auto it = info_for_tables.find({ks_name, cf_name});
|
||||
if (it != info_for_tables.end()) {
|
||||
return it->second.size_in_bytes.get();
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
|
||||
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
|
||||
}
|
||||
future<> stop() {
|
||||
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
|
||||
active = false;
|
||||
co_return;
|
||||
}
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
|
||||
info_for_tables.erase({ks_name, cf_name});
|
||||
}
|
||||
};
|
||||
|
||||
executor::executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
smp_service_group ssg,
|
||||
utils::updateable_value<uint32_t> default_timeout_in_ms)
|
||||
: _gossiper(gossiper),
|
||||
_ss(ss),
|
||||
_proxy(proxy),
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
@@ -328,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_stats))
|
||||
{
|
||||
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
|
||||
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
|
||||
register_metrics(_metrics, _stats);
|
||||
}
|
||||
|
||||
@@ -813,44 +752,12 @@ static future<bool> is_view_built(
|
||||
|
||||
}
|
||||
|
||||
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
|
||||
auto expiry = describe_table_info_manager::now() + ttl;
|
||||
return container().invoke_on_all(
|
||||
[schema, size_in_bytes, expiry] (executor& exec) {
|
||||
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
|
||||
});
|
||||
}
|
||||
|
||||
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
|
||||
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
|
||||
std::uint64_t total_size = 0;
|
||||
if (cached_size) {
|
||||
total_size = *cached_size;
|
||||
} else {
|
||||
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
|
||||
// move forward with deletion faster than we calculate the size
|
||||
if (!deleting) {
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
rjson::add(table_description, "TableSizeBytes", total_size);
|
||||
}
|
||||
|
||||
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
auto tags_ptr = db::get_tags_of_table(schema);
|
||||
|
||||
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
|
||||
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
|
||||
|
||||
auto creation_timestamp = get_table_creation_time(*schema);
|
||||
|
||||
@@ -894,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
|
||||
|
||||
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
|
||||
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
if (tbl_status != table_status::deleting) {
|
||||
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
|
||||
@@ -931,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
// (for a built view) or CREATING+Backfilling (if view building
|
||||
// is in progress).
|
||||
if (!is_lsi) {
|
||||
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
|
||||
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
|
||||
rjson::add(view_entry, "IndexStatus", "ACTIVE");
|
||||
} else {
|
||||
rjson::add(view_entry, "IndexStatus", "CREATING");
|
||||
@@ -959,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
}
|
||||
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
|
||||
}
|
||||
executor::supplement_table_stream_info(table_description, *schema, _proxy);
|
||||
executor::supplement_table_stream_info(table_description, *schema, proxy);
|
||||
|
||||
// FIXME: still missing some response fields (issue #5026)
|
||||
co_return table_description;
|
||||
}
|
||||
|
||||
@@ -980,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Table", std::move(table_description));
|
||||
elogger.trace("returning {}", response);
|
||||
@@ -1083,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
auto& p = _proxy.container();
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -1647,7 +1557,8 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
|
||||
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
@@ -1834,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
|
||||
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, sp)) {
|
||||
validate_cdc_log_name_length(builder.cf_name());
|
||||
}
|
||||
}
|
||||
@@ -1853,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1869,18 +1780,18 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
|
||||
}
|
||||
|
||||
size_t retries = _mm.get_concurrent_ddl_retries();
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
utils::chunked_vector<mutation> schema_mutations;
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
const auto& topo = sp.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
if (rs->uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
@@ -1888,19 +1799,14 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
}
|
||||
}
|
||||
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
|
||||
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
|
||||
if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
}
|
||||
try {
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
|
||||
} catch (exceptions::already_exists_exception&) {
|
||||
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
|
||||
}
|
||||
}
|
||||
if (_proxy.data_dictionary().try_find_table(schema->id())) {
|
||||
if (sp.data_dictionary().try_find_table(schema->id())) {
|
||||
// This should never happen, the ID is supposed to be unique
|
||||
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
|
||||
}
|
||||
@@ -1909,9 +1815,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
for (schema_builder& view_builder : view_builders) {
|
||||
schemas.push_back(view_builder.build());
|
||||
}
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
|
||||
if (ksm->uses_tablets()) {
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
|
||||
}
|
||||
|
||||
// If a role is allowed to create a table, we must give it permissions to
|
||||
@@ -1936,7 +1842,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
|
||||
try {
|
||||
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
break;
|
||||
} catch (const service::group0_concurrent_modification& ex) {
|
||||
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
|
||||
@@ -1948,9 +1854,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
}
|
||||
|
||||
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
rjson::value status = rjson::empty_object();
|
||||
executor::supplement_table_info(request, *schema, _proxy);
|
||||
executor::supplement_table_info(request, *schema, sp);
|
||||
rjson::add(status, "TableDescription", std::move(request));
|
||||
co_return rjson::print(std::move(status));
|
||||
}
|
||||
@@ -1959,11 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
|
||||
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
|
||||
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2114,10 +2019,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation(fmt::format(
|
||||
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
|
||||
}
|
||||
if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
|
||||
!p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
}
|
||||
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
@@ -6177,10 +6078,9 @@ future<> executor::start() {
|
||||
}
|
||||
|
||||
future<> executor::stop() {
|
||||
co_await _describe_table_info_manager->stop();
|
||||
// disconnect from the value source, but keep the value unchanged.
|
||||
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
|
||||
co_await _parsed_expression_cache->stop();
|
||||
return _parsed_expression_cache->stop();
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
|
||||
@@ -17,13 +17,11 @@
|
||||
#include "service/client_state.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "alternator/error.hh"
|
||||
#include "stats.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/simple_value_with_expiry.hh"
|
||||
|
||||
#include "tracing/trace_state.hh"
|
||||
|
||||
@@ -43,7 +41,6 @@ namespace cql3::selection {
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class cas_shard;
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -60,7 +57,6 @@ class schema_builder;
|
||||
|
||||
namespace alternator {
|
||||
|
||||
enum class table_status;
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
@@ -140,7 +136,6 @@ class expression_cache;
|
||||
|
||||
class executor : public peering_sharded_service<executor> {
|
||||
gms::gossiper& _gossiper;
|
||||
service::storage_service& _ss;
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_manager& _mm;
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
@@ -153,11 +148,6 @@ class executor : public peering_sharded_service<executor> {
|
||||
|
||||
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
|
||||
|
||||
struct describe_table_info_manager;
|
||||
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
|
||||
|
||||
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
|
||||
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
|
||||
public:
|
||||
using client_state = service::client_state;
|
||||
// request_return_type is the return type of the executor methods, which
|
||||
@@ -183,7 +173,6 @@ public:
|
||||
|
||||
executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
@@ -231,8 +220,6 @@ private:
|
||||
friend class rmw_operation;
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
|
||||
|
||||
future<> do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "alternator/http_compression.hh"
|
||||
#include "alternator/server.hh"
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <zlib.h>
|
||||
|
||||
static logging::logger slogger("alternator-http-compression");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
|
||||
static constexpr size_t compressed_buffer_size = 1024;
|
||||
class zlib_compressor {
|
||||
z_stream _zs;
|
||||
temporary_buffer<char> _output_buf;
|
||||
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
|
||||
public:
|
||||
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
|
||||
: _write_func(std::move(write_func)) {
|
||||
memset(&_zs, 0, sizeof(_zs));
|
||||
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
|
||||
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
||||
// Should only happen if memory allocation fails
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
}
|
||||
~zlib_compressor() {
|
||||
deflateEnd(&_zs);
|
||||
}
|
||||
future<> close() {
|
||||
return compress(nullptr, 0, true);
|
||||
}
|
||||
|
||||
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
|
||||
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
|
||||
_zs.avail_in = (uInt) len;
|
||||
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
|
||||
while(_zs.avail_in > 0 || is_last_chunk) {
|
||||
co_await coroutine::maybe_yield();
|
||||
if (_output_buf.empty()) {
|
||||
if (is_last_chunk) {
|
||||
uint32_t max_buffer_size = 0;
|
||||
deflatePending(&_zs, &max_buffer_size, nullptr);
|
||||
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
|
||||
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
|
||||
} else {
|
||||
_output_buf = temporary_buffer<char>(compressed_buffer_size);
|
||||
}
|
||||
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
|
||||
_zs.avail_out = compressed_buffer_size;
|
||||
}
|
||||
int e = deflate(&_zs, mode);
|
||||
if (e < Z_OK) {
|
||||
throw api_error::internal("Error during compression of response body");
|
||||
}
|
||||
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
|
||||
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
|
||||
co_await _write_func(std::move(_output_buf));
|
||||
if (e == Z_STREAM_END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Helper string_view functions for parsing Accept-Encoding header
|
||||
struct case_insensitive_cmp_sv {
|
||||
bool operator()(std::string_view s1, std::string_view s2) const {
|
||||
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
|
||||
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
|
||||
}
|
||||
};
|
||||
static inline std::string_view trim_left(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
|
||||
sv.remove_prefix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim_right(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
|
||||
sv.remove_suffix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim(std::string_view sv) {
|
||||
return trim_left(trim_right(sv));
|
||||
}
|
||||
|
||||
inline std::vector<std::string_view> split(std::string_view text, char separator) {
|
||||
std::vector<std::string_view> tokens;
|
||||
if (text == "") {
|
||||
return tokens;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto pos = text.find_first_of(separator);
|
||||
if (pos != std::string_view::npos) {
|
||||
tokens.emplace_back(text.data(), pos);
|
||||
text.remove_prefix(pos + 1);
|
||||
} else {
|
||||
tokens.emplace_back(text);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
|
||||
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
return compression_type::unknown;
|
||||
}
|
||||
|
||||
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
|
||||
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
|
||||
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
|
||||
compression_type selected_ct = compression_type::none;
|
||||
|
||||
std::vector<std::string_view> entries = split(accept_encoding, ',');
|
||||
for (auto& e : entries) {
|
||||
std::vector<std::string_view> params = split(e, ';');
|
||||
if (params.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
compression_type ct = get_compression_type(trim(params[0]));
|
||||
if (ct == compression_type::unknown) {
|
||||
continue; // ignore unknown encoding types
|
||||
}
|
||||
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
|
||||
continue; // already processed this encoding
|
||||
}
|
||||
if (response_size < _threshold[static_cast<size_t>(ct)]) {
|
||||
continue; // below threshold treat as unknown
|
||||
}
|
||||
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
|
||||
auto pos = params[i].find("q=");
|
||||
if (pos == std::string_view::npos) {
|
||||
continue;
|
||||
}
|
||||
std::string_view param = params[i].substr(pos + 2);
|
||||
param = trim(param);
|
||||
// parse quality value
|
||||
float q_value = 1.0f;
|
||||
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
|
||||
if (ec != std::errc() || ptr != param.data() + param.size()) {
|
||||
continue;
|
||||
}
|
||||
if (q_value < 0.0) {
|
||||
q_value = 0.0;
|
||||
} else if (q_value > 1.0) {
|
||||
q_value = 1.0;
|
||||
}
|
||||
ct_q[static_cast<size_t>(ct)] = q_value;
|
||||
break; // we parsed quality value
|
||||
}
|
||||
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
|
||||
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
|
||||
}
|
||||
// keep the highest encoding (in the order, unless 'any')
|
||||
if (selected_ct == compression_type::any) {
|
||||
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
} else {
|
||||
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (selected_ct == compression_type::any) {
|
||||
// select any not mentioned or highest quality
|
||||
selected_ct = compression_type::none;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (!ct_q[i].has_value()) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return selected_ct;
|
||||
}
|
||||
|
||||
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
|
||||
chunked_content compressed;
|
||||
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
|
||||
compressed.push_back(std::move(buf));
|
||||
return make_ready_future<>();
|
||||
};
|
||||
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
|
||||
cfg.alternator_response_gzip_compression_level(), std::move(write));
|
||||
co_await compressor.compress(str.data(), str.size(), true);
|
||||
co_return compressed;
|
||||
}
|
||||
|
||||
static sstring flatten(chunked_content&& cc) {
|
||||
size_t total_size = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
total_size += chunk.size();
|
||||
}
|
||||
sstring result = sstring{ sstring::initialized_later{}, total_size };
|
||||
size_t offset = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
|
||||
offset += chunk.size();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->set_content_type(content_type);
|
||||
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
|
||||
rep->_content = flatten(std::move(compressed));
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
});
|
||||
} else {
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(response_body);
|
||||
rep->set_content_type(content_type);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
template<typename Compressor>
|
||||
class compressed_data_sink_impl : public data_sink_impl {
|
||||
output_stream<char> _out;
|
||||
Compressor _compressor;
|
||||
public:
|
||||
template<typename... Args>
|
||||
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
|
||||
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
|
||||
return _out.write(std::move(buf));
|
||||
}) { }
|
||||
|
||||
future<> put(std::span<temporary_buffer<char>> data) override {
|
||||
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
|
||||
return do_put(std::move(buf));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_put(temporary_buffer<char> buf) {
|
||||
co_return co_await _compressor.compress(buf.get(), buf.size());
|
||||
|
||||
}
|
||||
future<> close() override {
|
||||
return _compressor.close().then([this] {
|
||||
return _out.close();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
|
||||
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
|
||||
output_stream_options opts;
|
||||
opts.trim_to_size = true;
|
||||
std::unique_ptr<data_sink_impl> data_sink_impl;
|
||||
switch (ct) {
|
||||
case response_compressor::compression_type::gzip:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
|
||||
break;
|
||||
case response_compressor::compression_type::deflate:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
|
||||
break;
|
||||
case response_compressor::compression_type::none:
|
||||
case response_compressor::compression_type::any:
|
||||
case response_compressor::compression_type::unknown:
|
||||
on_internal_error(slogger,"Compression not selected");
|
||||
default:
|
||||
on_internal_error(slogger, "Unsupported compression type for data sink");
|
||||
}
|
||||
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
|
||||
};
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
|
||||
} else {
|
||||
rep->write_body(content_type, std::move(body_writer));
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
@@ -1,91 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "alternator/executor.hh"
|
||||
#include <seastar/http/httpd.hh>
|
||||
#include "db/config.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
class response_compressor {
|
||||
public:
|
||||
enum class compression_type {
|
||||
gzip,
|
||||
deflate,
|
||||
compressions_count,
|
||||
any = compressions_count,
|
||||
none,
|
||||
count,
|
||||
unknown = count
|
||||
};
|
||||
static constexpr std::string_view compression_names[] = {
|
||||
"gzip",
|
||||
"deflate",
|
||||
"*",
|
||||
"identity"
|
||||
};
|
||||
|
||||
static sstring get_encoding_name(compression_type ct) {
|
||||
return sstring(compression_names[static_cast<size_t>(ct)]);
|
||||
}
|
||||
static constexpr compression_type get_compression_type(std::string_view encoding);
|
||||
|
||||
sstring get_accepted_encoding(const http::request& req) {
|
||||
if (get_threshold() == 0) {
|
||||
return "";
|
||||
}
|
||||
return req.get_header("Accept-Encoding");
|
||||
}
|
||||
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
|
||||
|
||||
response_compressor(const db::config& cfg)
|
||||
: cfg(cfg)
|
||||
,_gzip_level_observer(
|
||||
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
|
||||
update_threshold();
|
||||
}))
|
||||
,_gzip_threshold_observer(
|
||||
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
|
||||
update_threshold();
|
||||
}))
|
||||
{
|
||||
update_threshold();
|
||||
}
|
||||
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
|
||||
|
||||
private:
|
||||
const db::config& cfg;
|
||||
utils::observable<int>::observer _gzip_level_observer;
|
||||
utils::observable<uint32_t>::observer _gzip_threshold_observer;
|
||||
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
|
||||
|
||||
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
|
||||
void update_threshold() {
|
||||
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
|
||||
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
|
||||
: cfg.alternator_response_compression_threshold_in_bytes();
|
||||
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
|
||||
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, std::string&& response_body);
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
|
||||
};
|
||||
|
||||
}
|
||||
@@ -34,7 +34,6 @@
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <zlib.h>
|
||||
#include "alternator/http_compression.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
|
||||
// type applies to all replies, both success and error.
|
||||
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
|
||||
public:
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
|
||||
const db::config& config) : _response_compressor(config), _f_handle(
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
|
||||
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
|
||||
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
|
||||
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
|
||||
if (resf.failed()) {
|
||||
// Exceptions of type api_error are wrapped as JSON and
|
||||
// returned to the client as expected. Other types of
|
||||
@@ -137,20 +133,22 @@ public:
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
auto res = resf.get();
|
||||
return std::visit(overloaded_functor {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (std::string&& str) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(str));
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(str);
|
||||
rep->set_content_type(REPLY_CONTENT_TYPE);
|
||||
},
|
||||
[&] (executor::body_writer&& body_writer) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
},
|
||||
[&] (const api_error& err) {
|
||||
generate_error_reply(*rep, err);
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
}, std::move(res));
|
||||
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}) { }
|
||||
|
||||
@@ -179,7 +177,6 @@ protected:
|
||||
slogger.trace("api_handler error case: {}", rep._content);
|
||||
}
|
||||
|
||||
response_compressor _response_compressor;
|
||||
future_handler_function _f_handle;
|
||||
};
|
||||
|
||||
@@ -711,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -761,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
void server::set_routes(routes& r) {
|
||||
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
|
||||
return handle_api_request(std::move(req));
|
||||
}, _proxy.data_dictionary().get_config());
|
||||
});
|
||||
|
||||
r.put(operation_type::POST, "/", req_handler);
|
||||
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
|
||||
@@ -992,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
ret.emplace_back(r.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -108,7 +107,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -31,7 +31,6 @@ set(swagger_files
|
||||
api-doc/column_family.json
|
||||
api-doc/commitlog.json
|
||||
api-doc/compaction_manager.json
|
||||
api-doc/client_routes.json
|
||||
api-doc/config.json
|
||||
api-doc/cql_server_test.json
|
||||
api-doc/endpoint_snitch_info.json
|
||||
@@ -69,7 +68,6 @@ target_sources(api
|
||||
PRIVATE
|
||||
api.cc
|
||||
cache_service.cc
|
||||
client_routes.cc
|
||||
collectd.cc
|
||||
column_family.cc
|
||||
commitlog.cc
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
, "client_routes_entry": {
|
||||
"id": "client_routes_entry",
|
||||
"summary": "An entry storing client routes",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"},
|
||||
"address": {"type": "string"},
|
||||
"port": {"type": "integer"},
|
||||
"tls_port": {"type": "integer"},
|
||||
"alternator_port": {"type": "integer"},
|
||||
"alternator_https_port": {"type": "integer"}
|
||||
},
|
||||
"required": ["connection_id", "host_id", "address"]
|
||||
}
|
||||
, "client_routes_key": {
|
||||
"id": "client_routes_key",
|
||||
"summary": "A key of client_routes_entry",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
, "/v2/client-routes":{
|
||||
"get": {
|
||||
"description":"List all client route entries",
|
||||
"operationId":"get_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[],
|
||||
"responses":{
|
||||
"200":{
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{"$ref":"#/definitions/ErrorModel"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"post": {
|
||||
"description":"Upsert one or more client route entries",
|
||||
"operationId":"set_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{ "description": "OK" },
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{ "$ref":"#/definitions/ErrorModel" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"delete": {
|
||||
"description":"Delete one or more client route entries",
|
||||
"operationId":"delete_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_key" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{
|
||||
"description": "OK"
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{
|
||||
"$ref":"#/definitions/ErrorModel"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
13
api/api.cc
13
api/api.cc
@@ -37,7 +37,6 @@
|
||||
#include "raft.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "service_levels.hh"
|
||||
#include "client_routes.hh"
|
||||
|
||||
logging::logger apilog("api");
|
||||
|
||||
@@ -68,11 +67,9 @@ future<> set_server_init(http_context& ctx) {
|
||||
rb02->set_api_doc(r);
|
||||
rb02->register_api_file(r, "swagger20_header");
|
||||
rb02->register_api_file(r, "metrics");
|
||||
rb02->register_api_file(r, "client_routes");
|
||||
rb->register_function(r, "system",
|
||||
"The system related API");
|
||||
rb02->add_definitions_file(r, "metrics");
|
||||
rb02->add_definitions_file(r, "client_routes");
|
||||
set_system(ctx, r);
|
||||
rb->register_function(r, "error_injection",
|
||||
"The error injection API");
|
||||
@@ -132,16 +129,6 @@ future<> unset_server_storage_service(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
|
||||
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
|
||||
set_client_routes(ctx, r, cr);
|
||||
});
|
||||
}
|
||||
|
||||
future<> unset_server_client_routes(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
|
||||
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ class storage_proxy;
|
||||
class storage_service;
|
||||
class raft_group0_client;
|
||||
class raft_group_registry;
|
||||
class client_routes_service;
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -100,8 +99,6 @@ future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snit
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
|
||||
future<> unset_server_sstables_loader(http_context& ctx);
|
||||
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);
|
||||
|
||||
@@ -1,176 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/http/short_streams.hh>
|
||||
|
||||
#include "client_routes.hh"
|
||||
#include "api/api.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/client_routes.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
|
||||
#include "api/api-doc/client_routes.json.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace json;
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
namespace api {
|
||||
|
||||
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
|
||||
if (!cr.local().get_feature_service().client_routes) {
|
||||
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
|
||||
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
|
||||
}
|
||||
}
|
||||
|
||||
static sstring parse_string(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
throw bad_param_exception(fmt::format("Missing '{}'", name));
|
||||
}
|
||||
if (!it->value.IsString()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be a string", name));
|
||||
}
|
||||
return {it->value.GetString(), it->value.GetStringLength()};
|
||||
}
|
||||
|
||||
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!it->value.IsInt()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
|
||||
}
|
||||
auto port = it->value.GetInt();
|
||||
if (port < 1 || port > 65535) {
|
||||
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_entry> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
|
||||
|
||||
const auto port = parse_port("port", element);
|
||||
const auto tls_port = parse_port("tls_port", element);
|
||||
const auto alternator_port = parse_port("alternator_port", element);
|
||||
const auto alternator_https_port = parse_port("alternator_https_port", element);
|
||||
|
||||
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
|
||||
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
|
||||
}
|
||||
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)},
|
||||
parse_string("address", element),
|
||||
port,
|
||||
tls_port,
|
||||
alternator_port,
|
||||
alternator_https_port
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "rest_set_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_key> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)}
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "delete_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "get_client_routes");
|
||||
|
||||
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
|
||||
seastar::httpd::client_routes_json::client_routes_entry obj;
|
||||
obj.connection_id = entry.connection_id;
|
||||
obj.host_id = fmt::to_string(entry.host_id);
|
||||
obj.address = entry.address;
|
||||
if (entry.port.has_value()) { obj.port = entry.port.value(); }
|
||||
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
|
||||
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
|
||||
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
|
||||
return obj;
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_set_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_delete_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_get_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
}
|
||||
|
||||
void unset_client_routes(http_context& ctx, routes& r) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::get_client_routes.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/json/json_elements.hh>
|
||||
#include "api/api_init.hh"
|
||||
|
||||
namespace api {
|
||||
|
||||
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
|
||||
void unset_client_routes(http_context& ctx, httpd::routes& r);
|
||||
|
||||
}
|
||||
@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
|
||||
vp.insert(b.second);
|
||||
}
|
||||
}
|
||||
std::vector<sstring> res;
|
||||
replica::database& db = vb.local().get_db();
|
||||
auto uuid = validate_table(db, ks, cf_name);
|
||||
replica::column_family& cf = db.find_column_family(uuid);
|
||||
co_return cf.get_index_manager().list_indexes()
|
||||
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
|
||||
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
|
||||
| std::ranges::to<std::vector>();
|
||||
res.reserve(cf.get_index_manager().list_indexes().size());
|
||||
for (auto&& i : cf.get_index_manager().list_indexes()) {
|
||||
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
|
||||
res.emplace_back(i.metadata().name());
|
||||
}
|
||||
}
|
||||
co_return res;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
@@ -22,6 +23,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include <iterator>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/core/format.hh>
|
||||
|
||||
@@ -23,11 +22,9 @@ namespace auth {
|
||||
|
||||
logging::logger logger("auth-cache");
|
||||
|
||||
cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
|
||||
cache::cache(cql3::query_processor& qp) noexcept
|
||||
: _current_version(0)
|
||||
, _qp(qp)
|
||||
, _loading_sem(1)
|
||||
, _as(as) {
|
||||
, _qp(qp) {
|
||||
}
|
||||
|
||||
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
|
||||
@@ -119,8 +116,6 @@ future<> cache::load_all() {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto units = co_await get_units(_loading_sem, 1, _as);
|
||||
|
||||
++_current_version;
|
||||
|
||||
logger.info("Loading all roles");
|
||||
@@ -151,9 +146,6 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto units = co_await get_units(_loading_sem, 1, _as);
|
||||
|
||||
for (const auto& name : roles) {
|
||||
logger.info("Loading role {}", name);
|
||||
auto role = co_await fetch_role(name);
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -16,7 +15,6 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
@@ -43,7 +41,7 @@ public:
|
||||
version_tag_t version; // used for seamless cache reloads
|
||||
};
|
||||
|
||||
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
|
||||
explicit cache(cql3::query_processor& qp) noexcept;
|
||||
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
|
||||
future<> load_all();
|
||||
future<> load_roles(std::unordered_set<role_name_t> roles);
|
||||
@@ -54,8 +52,6 @@ private:
|
||||
roles_map _roles;
|
||||
version_tag_t _current_version;
|
||||
cql3::query_processor& _qp;
|
||||
semaphore _loading_sem;
|
||||
abort_source& _as;
|
||||
|
||||
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
|
||||
future<> prune_all() noexcept;
|
||||
|
||||
@@ -35,13 +35,14 @@ static const class_registrator<auth::authenticator
|
||||
, cql3::query_processor&
|
||||
, ::service::raft_group0_client&
|
||||
, ::service::migration_manager&
|
||||
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
, auth::cache&
|
||||
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
|
||||
enum class auth::certificate_authenticator::query_source {
|
||||
subject, altname
|
||||
};
|
||||
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
|
||||
: _queries([&] {
|
||||
auto& conf = qp.db().get_config();
|
||||
auto queries = conf.auth_certificate_role_queries();
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
|
||||
|
||||
namespace cql3 {
|
||||
@@ -33,7 +34,7 @@ class certificate_authenticator : public authenticator {
|
||||
enum class query_source;
|
||||
std::vector<std::pair<query_source, boost::regex>> _queries;
|
||||
public:
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
~certificate_authenticator();
|
||||
|
||||
future<> start() override;
|
||||
|
||||
@@ -49,7 +49,8 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
|
||||
@@ -63,13 +64,14 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
password_authenticator::~password_authenticator() {
|
||||
}
|
||||
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
, _hashing_worker(hashing_worker)
|
||||
{}
|
||||
|
||||
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
|
||||
@@ -328,7 +330,9 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
}
|
||||
salted_hash = role->salted_hash;
|
||||
}
|
||||
const bool password_match = co_await passwords::check(password, *salted_hash);
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
|
||||
return passwords::check(password, *salted_hash);
|
||||
});
|
||||
if (!password_match) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "auth/passwords.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
@@ -48,12 +49,13 @@ class password_authenticator : public authenticator {
|
||||
shared_promise<> _superuser_created_promise;
|
||||
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
|
||||
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
|
||||
utils::alien_worker& _hashing_worker;
|
||||
|
||||
public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
|
||||
~password_authenticator();
|
||||
|
||||
|
||||
@@ -7,8 +7,6 @@
|
||||
*/
|
||||
|
||||
#include "auth/passwords.hh"
|
||||
#include "utils/crypt_sha512.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include <cerrno>
|
||||
|
||||
@@ -23,46 +21,25 @@ static thread_local crypt_data tlcrypt = {};
|
||||
|
||||
namespace detail {
|
||||
|
||||
void verify_hashing_output(const char * res) {
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
void verify_scheme(scheme scheme) {
|
||||
const sstring random_part_of_salt = "aaaabbbbccccdddd";
|
||||
|
||||
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
try {
|
||||
verify_hashing_output(e);
|
||||
} catch (const std::system_error& ex) {
|
||||
throw no_supported_schemes();
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw no_supported_schemes();
|
||||
}
|
||||
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
|
||||
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
verify_hashing_output(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
|
||||
sstring res;
|
||||
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
|
||||
// the __crypt_sha512 method. For other computations, we fall back to the
|
||||
// crypt_r implementation from `<crypt.h>`, which can stall.
|
||||
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
|
||||
char buf[128];
|
||||
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
} else {
|
||||
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
co_return res;
|
||||
return res;
|
||||
}
|
||||
|
||||
std::string_view prefix_for_scheme(scheme c) noexcept {
|
||||
@@ -81,9 +58,8 @@ no_supported_schemes::no_supported_schemes()
|
||||
: std::runtime_error("No allowed hashing schemes are supported on this system") {
|
||||
}
|
||||
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
|
||||
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
|
||||
co_return pwd_hash == salted_hash;
|
||||
bool check(const sstring& pass, const sstring& salted_hash) {
|
||||
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
|
||||
}
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <random>
|
||||
#include <stdexcept>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "seastarx.hh"
|
||||
@@ -76,19 +75,10 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
|
||||
|
||||
///
|
||||
/// Hash a password combined with an implementation-specific salt string.
|
||||
/// Deprecated in favor of `hash_with_salt_async`.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
///
|
||||
/// Async version of `hash_with_salt` that returns a future.
|
||||
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
} // namespace detail
|
||||
|
||||
@@ -117,6 +107,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
|
||||
bool check(const sstring& pass, const sstring& salted_hash);
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -35,9 +35,10 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -29,7 +30,7 @@ namespace auth {
|
||||
class saslauthd_authenticator : public authenticator {
|
||||
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
|
||||
public:
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
|
||||
|
||||
future<> start() override;
|
||||
|
||||
|
||||
@@ -191,7 +191,8 @@ service::service(
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache)
|
||||
cache& cache,
|
||||
utils::alien_worker& hashing_worker)
|
||||
: service(
|
||||
std::move(c),
|
||||
cache,
|
||||
@@ -199,7 +200,7 @@ service::service(
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "cql3/description.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/observable.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "service/maintenance_mode.hh"
|
||||
@@ -130,7 +131,8 @@ public:
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
maintenance_socket_enabled,
|
||||
cache&);
|
||||
cache&,
|
||||
utils::alien_worker&);
|
||||
|
||||
future<> start(::service::migration_manager&, db::system_keyspace&);
|
||||
|
||||
|
||||
@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
@@ -241,7 +241,8 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
auth::cache&,
|
||||
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
|
||||
@@ -10,9 +10,7 @@
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
enum class client_type {
|
||||
@@ -29,20 +27,6 @@ enum class client_connection_stage {
|
||||
ready,
|
||||
};
|
||||
|
||||
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
|
||||
struct options_cache_value_type {};
|
||||
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
|
||||
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
|
||||
using client_options_cache_key_type = client_options_cache_type::key_type;
|
||||
|
||||
// This struct represents a single OPTION key-value pair from the client's connection options.
|
||||
// Both key and value are represented by corresponding "references" to their cached values.
|
||||
// Each "reference" is effectively a lw_shared_ptr value.
|
||||
struct client_option_key_value_cached_entry {
|
||||
client_options_cache_entry_type key;
|
||||
client_options_cache_entry_type value;
|
||||
};
|
||||
|
||||
sstring to_string(client_connection_stage ct);
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
@@ -53,8 +37,8 @@ struct client_data {
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
std::optional<client_options_cache_entry_type> driver_name;
|
||||
std::optional<client_options_cache_entry_type> driver_version;
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
std::optional<int32_t> protocol_version;
|
||||
std::optional<sstring> ssl_cipher_suite;
|
||||
@@ -62,7 +46,6 @@ struct client_data {
|
||||
std::optional<sstring> ssl_protocol;
|
||||
std::optional<sstring> username;
|
||||
std::optional<sstring> scheduling_group_name;
|
||||
std::list<client_option_key_value_cached_entry> client_options;
|
||||
|
||||
sstring stage_str() const { return to_string(connection_stage); }
|
||||
sstring client_type_str() const { return to_string(ct); }
|
||||
|
||||
@@ -125,6 +125,10 @@ if(target_arch)
|
||||
add_compile_options("-march=${target_arch}")
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
|
||||
endif()
|
||||
|
||||
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
|
||||
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "sstables/open_info.hh"
|
||||
#include "compaction_descriptor.hh"
|
||||
|
||||
class reader_permit;
|
||||
@@ -45,7 +44,7 @@ public:
|
||||
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
|
||||
virtual reader_permit make_compaction_reader_permit() const = 0;
|
||||
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
|
||||
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
|
||||
virtual sstables::shared_sstable make_sstable() const = 0;
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
|
||||
virtual api::timestamp_type min_memtable_timestamp() const = 0;
|
||||
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;
|
||||
|
||||
@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
|
||||
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
|
||||
}
|
||||
descriptor.creator = [&t] (shard_id) {
|
||||
// All compaction types going through this path will work on normal input sstables only.
|
||||
// Off-strategy, for example, waits until the sstables move out of staging state.
|
||||
return t.make_sstable(sstables::sstable_state::normal);
|
||||
return t.make_sstable();
|
||||
};
|
||||
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
|
||||
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
|
||||
@@ -1849,10 +1847,6 @@ protected:
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
}, false);
|
||||
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
|
||||
co_return co_await do_rewrite_sstable(std::move(sst));
|
||||
}
|
||||
};
|
||||
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
// Throw an error if split cannot be performed due to e.g. out of space prevention.
|
||||
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
|
||||
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
|
||||
if (is_disabled()) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
|
||||
"reason might be out of space prevention", sst->get_filename()))));
|
||||
if (!can_proceed(&t)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
|
||||
compaction_progress_monitor monitor;
|
||||
compaction_data info = create_compaction_data();
|
||||
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
|
||||
desc.creator = [&t, sst] (shard_id _) {
|
||||
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
|
||||
// For example, if base table has views, it's important that sstable produced by repair will be
|
||||
// in the staging state.
|
||||
return t.make_sstable(sst->state());
|
||||
desc.creator = [&t] (shard_id _) {
|
||||
return t.make_sstable();
|
||||
};
|
||||
desc.replacer = [&] (compaction_completion_desc d) {
|
||||
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));
|
||||
|
||||
@@ -376,8 +376,7 @@ public:
|
||||
// Splits a single SSTable by segregating all its data according to the classifier.
|
||||
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
||||
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
||||
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
|
||||
// Run a custom job for a given table, defined by a function
|
||||
// it completes when future returned by job is ready or returns immediately
|
||||
|
||||
@@ -571,10 +571,10 @@ commitlog_total_space_in_mb: -1
|
||||
# - "none": auditing is disabled (default)
|
||||
# - "table": save audited events in audit.audit_log column family
|
||||
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
|
||||
audit: "table"
|
||||
# audit: "none"
|
||||
#
|
||||
# List of statement categories that should be audited.
|
||||
audit_categories: "DCL,DDL,AUTH,ADMIN"
|
||||
# audit_categories: "DCL,DDL,AUTH"
|
||||
#
|
||||
# List of tables that should be audited.
|
||||
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
||||
|
||||
162
configure.py
162
configure.py
@@ -368,87 +368,6 @@ def find_ninja():
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def find_compiler(name):
|
||||
"""
|
||||
Find a compiler by name, skipping ccache wrapper directories.
|
||||
|
||||
This is useful when using sccache to avoid double-caching through ccache.
|
||||
|
||||
Args:
|
||||
name: The compiler name (e.g., 'clang++', 'clang', 'gcc')
|
||||
|
||||
Returns:
|
||||
Path to the compiler, skipping ccache directories, or None if not found.
|
||||
"""
|
||||
ccache_dirs = {'/usr/lib/ccache', '/usr/lib64/ccache'}
|
||||
for path_dir in os.environ.get('PATH', '').split(os.pathsep):
|
||||
# Skip ccache wrapper directories
|
||||
if os.path.realpath(path_dir) in ccache_dirs or path_dir in ccache_dirs:
|
||||
continue
|
||||
candidate = os.path.join(path_dir, name)
|
||||
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
def resolve_compilers_for_compiler_cache(args, compiler_cache):
|
||||
"""
|
||||
When using a compiler cache, resolve compiler paths to avoid ccache directories.
|
||||
|
||||
This prevents double-caching when ccache symlinks are in PATH.
|
||||
|
||||
Args:
|
||||
args: The argument namespace with cc and cxx attributes.
|
||||
compiler_cache: Path to the compiler cache binary, or None.
|
||||
"""
|
||||
if not compiler_cache:
|
||||
return
|
||||
if not os.path.isabs(args.cxx):
|
||||
real_cxx = find_compiler(args.cxx)
|
||||
if real_cxx:
|
||||
args.cxx = real_cxx
|
||||
if not os.path.isabs(args.cc):
|
||||
real_cc = find_compiler(args.cc)
|
||||
if real_cc:
|
||||
args.cc = real_cc
|
||||
|
||||
|
||||
def find_compiler_cache(preference):
|
||||
"""
|
||||
Find a compiler cache based on the preference.
|
||||
|
||||
Args:
|
||||
preference: One of 'auto', 'sccache', 'ccache', 'none', or a path to a binary.
|
||||
|
||||
Returns:
|
||||
Path to the compiler cache binary, or None if not found/disabled.
|
||||
"""
|
||||
if preference == 'none':
|
||||
return None
|
||||
|
||||
if preference == 'auto':
|
||||
# Prefer sccache over ccache
|
||||
for cache in ['sccache', 'ccache']:
|
||||
path = which(cache)
|
||||
if path:
|
||||
return path
|
||||
return None
|
||||
|
||||
if preference in ('sccache', 'ccache'):
|
||||
path = which(preference)
|
||||
if path:
|
||||
return path
|
||||
print(f"Warning: {preference} not found on PATH, disabling compiler cache")
|
||||
return None
|
||||
|
||||
# Assume it's a path to a binary
|
||||
if os.path.isfile(preference) and os.access(preference, os.X_OK):
|
||||
return preference
|
||||
|
||||
print(f"Warning: compiler cache '{preference}' not found or not executable, disabling compiler cache")
|
||||
return None
|
||||
|
||||
|
||||
modes = {
|
||||
'debug': {
|
||||
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
@@ -813,8 +732,6 @@ arg_parser.add_argument('--compiler', action='store', dest='cxx', default='clang
|
||||
help='C++ compiler path')
|
||||
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clang',
|
||||
help='C compiler path')
|
||||
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
|
||||
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
|
||||
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
|
||||
help='Use dpdk (from seastar dpdk sources)')
|
||||
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
|
||||
@@ -942,7 +859,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/alien_worker.cc',
|
||||
'utils/array-search.cc',
|
||||
'utils/base64.cc',
|
||||
'utils/crypt_sha512.cc',
|
||||
'utils/logalloc.cc',
|
||||
'utils/large_bitset.cc',
|
||||
'utils/buffer_input_stream.cc',
|
||||
@@ -1034,7 +950,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/functions/aggregate_fcts.cc',
|
||||
'cql3/functions/castas_fcts.cc',
|
||||
'cql3/functions/error_injection_fcts.cc',
|
||||
'cql3/functions/vector_similarity_fcts.cc',
|
||||
'cql3/statements/cf_prop_defs.cc',
|
||||
'cql3/statements/cf_statement.cc',
|
||||
'cql3/statements/authentication_statement.cc',
|
||||
@@ -1242,7 +1157,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'locator/topology.cc',
|
||||
'locator/util.cc',
|
||||
'service/client_state.cc',
|
||||
'service/client_routes.cc',
|
||||
'service/storage_service.cc',
|
||||
'service/session.cc',
|
||||
'service/task_manager_module.cc',
|
||||
@@ -1403,8 +1317,6 @@ api = ['api/api.cc',
|
||||
'api/storage_proxy.cc',
|
||||
Json2Code('api/api-doc/cache_service.json'),
|
||||
'api/cache_service.cc',
|
||||
Json2Code('api/api-doc/client_routes.json'),
|
||||
'api/client_routes.cc',
|
||||
Json2Code('api/api-doc/collectd.json'),
|
||||
'api/collectd.cc',
|
||||
Json2Code('api/api-doc/endpoint_snitch_info.json'),
|
||||
@@ -1454,7 +1366,6 @@ alternator = [
|
||||
'alternator/auth.cc',
|
||||
'alternator/streams.cc',
|
||||
'alternator/ttl.cc',
|
||||
'alternator/http_compression.cc'
|
||||
]
|
||||
|
||||
idls = ['idl/gossip_digest.idl.hh',
|
||||
@@ -1568,6 +1479,7 @@ deps = {
|
||||
|
||||
pure_boost_tests = set([
|
||||
'test/boost/anchorless_list_test',
|
||||
'test/boost/auth_passwords_test',
|
||||
'test/boost/auth_resource_test',
|
||||
'test/boost/big_decimal_test',
|
||||
'test/boost/caching_options_test',
|
||||
@@ -1700,7 +1612,6 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/schema_registry_test.cc',
|
||||
'test/boost/secondary_index_test.cc',
|
||||
'test/boost/sessions_test.cc',
|
||||
'test/boost/simple_value_with_expiry_test.cc',
|
||||
'test/boost/sstable_compaction_test.cc',
|
||||
'test/boost/sstable_compressor_factory_test.cc',
|
||||
'test/boost/sstable_compression_config_test.cc',
|
||||
@@ -1784,18 +1695,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
|
||||
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
|
||||
|
||||
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
|
||||
|
||||
# We need to link these files to all Boost tests to make sure that
|
||||
# we can execute `--list_json_content` on them. That will produce
|
||||
# a similar result as calling `--list_content={HRF,DOT}`.
|
||||
# Unfortunately, to be able to do that, we're forced to link the
|
||||
# relevant code by hand.
|
||||
for key in deps.keys():
|
||||
for prefix in boost_tests_prefixes:
|
||||
if key.startswith(prefix):
|
||||
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
|
||||
|
||||
wasm_deps = {}
|
||||
|
||||
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'
|
||||
@@ -2100,7 +1999,7 @@ def semicolon_separated(*flags):
|
||||
def real_relpath(path, start):
|
||||
return os.path.relpath(os.path.realpath(path), os.path.realpath(start))
|
||||
|
||||
def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
def configure_seastar(build_dir, mode, mode_config):
|
||||
seastar_cxx_ld_flags = mode_config['cxx_ld_flags']
|
||||
# We want to "undo" coverage for seastar if we have it enabled.
|
||||
if args.coverage:
|
||||
@@ -2147,10 +2046,6 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
'-DSeastar_IO_URING=ON',
|
||||
]
|
||||
|
||||
if compiler_cache:
|
||||
seastar_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
|
||||
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
|
||||
|
||||
if args.stack_guards is not None:
|
||||
stack_guards = 'ON' if args.stack_guards else 'OFF'
|
||||
seastar_cmake_args += ['-DSeastar_STACK_GUARDS={}'.format(stack_guards)]
|
||||
@@ -2182,7 +2077,7 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
subprocess.check_call(seastar_cmd, shell=False, cwd=cmake_dir)
|
||||
|
||||
|
||||
def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
|
||||
def configure_abseil(build_dir, mode, mode_config):
|
||||
abseil_cflags = mode_config['lib_cflags']
|
||||
cxx_flags = mode_config['cxxflags']
|
||||
if '-DSANITIZE' in cxx_flags:
|
||||
@@ -2208,10 +2103,6 @@ def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
|
||||
'-DABSL_PROPAGATE_CXX_STD=ON',
|
||||
]
|
||||
|
||||
if compiler_cache:
|
||||
abseil_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
|
||||
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
|
||||
|
||||
cmake_args = abseil_cmake_args[:]
|
||||
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
|
||||
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + cmake_args
|
||||
@@ -2357,6 +2248,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
|
||||
if debuginfo and mode_config['can_have_debug_info']:
|
||||
cxxflags += ['-g', '-gz']
|
||||
|
||||
if 'clang' in cxx:
|
||||
# Since AssignmentTracking was enabled by default in clang
|
||||
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
|
||||
# coroutine frame debugging info (`coro_frame_ty`) is broken.
|
||||
#
|
||||
# It seems that we aren't losing much by disabling AssigmentTracking,
|
||||
# so for now we choose to disable it to get `coro_frame_ty` back.
|
||||
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
|
||||
|
||||
return cxxflags
|
||||
|
||||
|
||||
@@ -2384,15 +2284,10 @@ def write_build_file(f,
|
||||
scylla_product,
|
||||
scylla_version,
|
||||
scylla_release,
|
||||
compiler_cache,
|
||||
args):
|
||||
use_precompiled_header = not args.disable_precompiled_header
|
||||
warnings = get_warning_options(args.cxx)
|
||||
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
|
||||
# If compiler cache is available, prefix the compiler with it
|
||||
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
|
||||
# For Rust, sccache is used via RUSTC_WRAPPER environment variable
|
||||
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache else ''
|
||||
f.write(textwrap.dedent('''\
|
||||
configure_args = {configure_args}
|
||||
builddir = {outdir}
|
||||
@@ -2455,7 +2350,7 @@ def write_build_file(f,
|
||||
command = clang --target=wasm32 --no-standard-libraries -Wl,--export-all -Wl,--no-entry $in -o $out
|
||||
description = C2WASM $out
|
||||
rule rust2wasm
|
||||
command = {rustc_wrapper}cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
|
||||
command = cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
|
||||
&& wasm-opt -Oz $builddir/wasm/{rustc_target}/debug/examples/$example.wasm -o $builddir/wasm/$example.wasm $
|
||||
&& wasm-strip $builddir/wasm/$example.wasm
|
||||
description = RUST2WASM $out
|
||||
@@ -2471,7 +2366,7 @@ def write_build_file(f,
|
||||
command = llvm-profdata merge $in -output=$out
|
||||
''').format(configure_args=configure_args,
|
||||
outdir=outdir,
|
||||
cxx=cxx_with_cache,
|
||||
cxx=args.cxx,
|
||||
user_cflags=user_cflags,
|
||||
warnings=warnings,
|
||||
defines=defines,
|
||||
@@ -2479,7 +2374,6 @@ def write_build_file(f,
|
||||
user_ldflags=user_ldflags,
|
||||
libs=libs,
|
||||
rustc_target=rustc_target,
|
||||
rustc_wrapper=rustc_wrapper,
|
||||
link_pool_depth=link_pool_depth,
|
||||
seastar_path=args.seastar_path,
|
||||
ninja=ninja,
|
||||
@@ -2564,10 +2458,10 @@ def write_build_file(f,
|
||||
description = TEST {mode}
|
||||
# This rule is unused for PGO stages. They use the rust lib from the parent mode.
|
||||
rule rust_lib.{mode}
|
||||
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' {rustc_wrapper}cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
|
||||
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
|
||||
&& touch $out
|
||||
description = RUST_LIB $out
|
||||
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
|
||||
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
|
||||
f.write(
|
||||
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
|
||||
mode=mode,
|
||||
@@ -2631,7 +2525,7 @@ def write_build_file(f,
|
||||
# In debug/sanitize modes, we compile with fsanitizers,
|
||||
# so must use the same options during the link:
|
||||
if '-DSANITIZE' in modes[mode]['cxxflags']:
|
||||
f.write(' libs = -fsanitize=address -fsanitize=undefined -lubsan\n')
|
||||
f.write(' libs = -fsanitize=address -fsanitize=undefined\n')
|
||||
else:
|
||||
f.write(' libs =\n')
|
||||
f.write(f'build $builddir/{mode}/{binary}.stripped: strip $builddir/{mode}/{binary}\n')
|
||||
@@ -3027,9 +2921,6 @@ def create_build_system(args):
|
||||
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_compiler_cache(args, compiler_cache)
|
||||
|
||||
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
|
||||
|
||||
for mode, mode_config in build_modes.items():
|
||||
@@ -3046,8 +2937,8 @@ def create_build_system(args):
|
||||
# {outdir}/{mode}/seastar/build.ninja, and
|
||||
# {outdir}/{mode}/seastar/seastar.pc is queried for building flags
|
||||
for mode, mode_config in build_modes.items():
|
||||
configure_seastar(outdir, mode, mode_config, compiler_cache)
|
||||
configure_abseil(outdir, mode, mode_config, compiler_cache)
|
||||
configure_seastar(outdir, mode, mode_config)
|
||||
configure_abseil(outdir, mode, mode_config)
|
||||
user_cflags += ' -isystem abseil'
|
||||
|
||||
for mode, mode_config in build_modes.items():
|
||||
@@ -3070,7 +2961,6 @@ def create_build_system(args):
|
||||
scylla_product,
|
||||
scylla_version,
|
||||
scylla_release,
|
||||
compiler_cache,
|
||||
args)
|
||||
generate_compdb('compile_commands.json', ninja, args.buildfile, selected_modes)
|
||||
|
||||
@@ -3113,10 +3003,6 @@ def configure_using_cmake(args):
|
||||
selected_modes = args.selected_modes or default_modes
|
||||
selected_configs = ';'.join(build_modes[mode].cmake_build_type for mode
|
||||
in selected_modes)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_compiler_cache(args, compiler_cache)
|
||||
|
||||
settings = {
|
||||
'CMAKE_CONFIGURATION_TYPES': selected_configs,
|
||||
'CMAKE_CROSS_CONFIGS': selected_configs,
|
||||
@@ -3134,14 +3020,6 @@ def configure_using_cmake(args):
|
||||
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
|
||||
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
|
||||
}
|
||||
|
||||
if compiler_cache:
|
||||
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
|
||||
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
|
||||
# For Rust, sccache is used via RUSTC_WRAPPER
|
||||
if 'sccache' in compiler_cache:
|
||||
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
|
||||
|
||||
if args.date_stamp:
|
||||
settings['Scylla_DATE_STAMP'] = args.date_stamp
|
||||
if args.staticboost:
|
||||
@@ -3173,7 +3051,7 @@ def configure_using_cmake(args):
|
||||
|
||||
if not args.dist_only:
|
||||
for mode in selected_modes:
|
||||
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode], compiler_cache)
|
||||
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode])
|
||||
|
||||
cmake_command = ['cmake']
|
||||
cmake_command += [f'-D{var}={value}' for var, value in settings.items()]
|
||||
|
||||
@@ -47,7 +47,6 @@ target_sources(cql3
|
||||
functions/aggregate_fcts.cc
|
||||
functions/castas_fcts.cc
|
||||
functions/error_injection_fcts.cc
|
||||
functions/vector_similarity_fcts.cc
|
||||
statements/cf_prop_defs.cc
|
||||
statements/cf_statement.cc
|
||||
statements/authentication_statement.cc
|
||||
|
||||
26
cql3/Cql.g
26
cql3/Cql.g
@@ -431,7 +431,6 @@ unaliasedSelector returns [uexpression tmp]
|
||||
| K_TTL '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::ttl,
|
||||
unresolved_identifier{std::move(c)}}; }
|
||||
| f=functionName args=selectionFunctionArgs { tmp = function_call{std::move(f), std::move(args)}; }
|
||||
| f=similarityFunctionName args=vectorSimilarityArgs { tmp = function_call{std::move(f), std::move(args)}; }
|
||||
| K_CAST '(' arg=unaliasedSelector K_AS t=native_type ')' { tmp = cast{.style = cast::cast_style::sql, .arg = std::move(arg), .type = std::move(t)}; }
|
||||
)
|
||||
( '.' fi=cident { tmp = field_selection{std::move(tmp), std::move(fi)}; }
|
||||
@@ -446,18 +445,6 @@ selectionFunctionArgs returns [std::vector<expression> a]
|
||||
')'
|
||||
;
|
||||
|
||||
vectorSimilarityArgs returns [std::vector<expression> a]
|
||||
: '(' ')'
|
||||
| '(' v1=vectorSimilarityArg { a.push_back(std::move(v1)); }
|
||||
( ',' vn=vectorSimilarityArg { a.push_back(std::move(vn)); } )*
|
||||
')'
|
||||
;
|
||||
|
||||
vectorSimilarityArg returns [uexpression a]
|
||||
: s=unaliasedSelector { a = std::move(s); }
|
||||
| v=value { a = std::move(v); }
|
||||
;
|
||||
|
||||
countArgument
|
||||
: '*'
|
||||
| i=INTEGER { if (i->getText() != "1") {
|
||||
@@ -1696,10 +1683,6 @@ functionName returns [cql3::functions::function_name s]
|
||||
: (ks=keyspaceName '.')? f=allowedFunctionName { $s.keyspace = std::move(ks); $s.name = std::move(f); }
|
||||
;
|
||||
|
||||
similarityFunctionName returns [cql3::functions::function_name s]
|
||||
: f=allowedSimilarityFunctionName { $s = cql3::functions::function_name::native_function(std::move(f)); }
|
||||
;
|
||||
|
||||
allowedFunctionName returns [sstring s]
|
||||
: f=IDENT { $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
|
||||
| f=QUOTED_NAME { $s = $f.text; }
|
||||
@@ -1708,11 +1691,6 @@ allowedFunctionName returns [sstring s]
|
||||
| K_COUNT { $s = "count"; }
|
||||
;
|
||||
|
||||
allowedSimilarityFunctionName returns [sstring s]
|
||||
: f=(K_SIMILARITY_COSINE | K_SIMILARITY_EUCLIDEAN | K_SIMILARITY_DOT_PRODUCT)
|
||||
{ $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
|
||||
;
|
||||
|
||||
functionArgs returns [std::vector<expression> a]
|
||||
: '(' ')'
|
||||
| '(' t1=term { a.push_back(std::move(t1)); }
|
||||
@@ -2409,10 +2387,6 @@ K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
|
||||
|
||||
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
|
||||
|
||||
K_SIMILARITY_EUCLIDEAN: S I M I L A R I T Y '_' E U C L I D E A N;
|
||||
K_SIMILARITY_COSINE: S I M I L A R I T Y '_' C O S I N E;
|
||||
K_SIMILARITY_DOT_PRODUCT: S I M I L A R I T Y '_' D O T '_' P R O D U C T;
|
||||
|
||||
// Case-insensitive alpha characters
|
||||
fragment A: ('a'|'A');
|
||||
fragment B: ('b'|'B');
|
||||
|
||||
@@ -25,11 +25,6 @@ public:
|
||||
NOT_ASSIGNABLE,
|
||||
};
|
||||
|
||||
struct vector_test_result {
|
||||
test_result result;
|
||||
std::optional<size_t> dimension_opt;
|
||||
};
|
||||
|
||||
static bool is_assignable(test_result tr) {
|
||||
return tr != test_result::NOT_ASSIGNABLE;
|
||||
}
|
||||
@@ -49,8 +44,6 @@ public:
|
||||
*/
|
||||
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const = 0;
|
||||
|
||||
virtual vector_test_result test_assignment_any_size_float_vector() const = 0;
|
||||
|
||||
virtual std::optional<data_type> assignment_testable_type_opt() const = 0;
|
||||
|
||||
// for error reporting
|
||||
|
||||
@@ -1434,112 +1434,6 @@ test_assignment(const expression& expr, data_dictionary::database db, const sstr
|
||||
}, expr);
|
||||
}
|
||||
|
||||
template <cql3_type::kind... Kinds>
|
||||
assignment_testable::vector_test_result
|
||||
test_assignment_any_size_float_vector(const expression& expr) {
|
||||
using test_result = assignment_testable::vector_test_result;
|
||||
const test_result NOT_ASSIGNABLE = {assignment_testable::test_result::NOT_ASSIGNABLE, std::nullopt};
|
||||
const test_result WEAKLY_ASSIGNABLE = {assignment_testable::test_result::WEAKLY_ASSIGNABLE, std::nullopt};
|
||||
auto is_float_or_bind = [] (const expression& e) {
|
||||
return expr::visit(overloaded_functor{
|
||||
[] (const bind_variable&) {
|
||||
return true;
|
||||
},
|
||||
[] (const untyped_constant& uc) {
|
||||
return uc.partial_type == untyped_constant::type_class::floating_point
|
||||
|| uc.partial_type == untyped_constant::type_class::integer;
|
||||
},
|
||||
[] (const constant& value) {
|
||||
auto kind = value.type->as_cql3_type().get_kind();
|
||||
return cql3_type::kind_enum_set::frozen<Kinds...>().contains(kind);
|
||||
},
|
||||
[] (const auto&) {
|
||||
return false;
|
||||
},
|
||||
}, e);
|
||||
};
|
||||
auto validate_assignment = [&] (const data_type& dt) -> test_result {
|
||||
auto vt = dynamic_pointer_cast<const vector_type_impl>(dt->underlying_type());
|
||||
if (!vt) {
|
||||
return NOT_ASSIGNABLE;
|
||||
}
|
||||
auto elem_kind = vt->get_elements_type()->as_cql3_type().get_kind();
|
||||
if (cql3_type::kind_enum_set::frozen<Kinds...>().contains(elem_kind)) {
|
||||
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, vt->get_dimension()};
|
||||
}
|
||||
return NOT_ASSIGNABLE;
|
||||
};
|
||||
return expr::visit(overloaded_functor{
|
||||
[&] (const constant& value) -> test_result {
|
||||
return validate_assignment(value.type);
|
||||
},
|
||||
[&] (const binary_operator&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const conjunction&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const column_value& col_val) -> test_result {
|
||||
return validate_assignment(col_val.col->type);
|
||||
},
|
||||
[&] (const subscript&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const unresolved_identifier& ui) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const column_mutation_attribute& cma) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const function_call& fc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const cast& c) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const field_selection& fs) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const bind_variable& bv) -> test_result {
|
||||
return WEAKLY_ASSIGNABLE;
|
||||
},
|
||||
[&] (const untyped_constant& uc) -> test_result {
|
||||
return uc.partial_type == untyped_constant::type_class::null
|
||||
? WEAKLY_ASSIGNABLE
|
||||
: NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const tuple_constructor& tc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const collection_constructor& c) -> test_result {
|
||||
switch (c.style) {
|
||||
case collection_constructor::style_type::list_or_vector: {
|
||||
if(std::ranges::all_of(c.elements, is_float_or_bind)) {
|
||||
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, c.elements.size()};
|
||||
}
|
||||
return NOT_ASSIGNABLE;
|
||||
}
|
||||
case collection_constructor::style_type::set: return NOT_ASSIGNABLE;
|
||||
case collection_constructor::style_type::map: return NOT_ASSIGNABLE;
|
||||
case collection_constructor::style_type::vector:
|
||||
on_internal_error(expr_logger, "vector style type found in test_assignment, should have been introduced post-prepare");
|
||||
}
|
||||
on_internal_error(expr_logger, fmt::format("unexpected collection_constructor style {}", static_cast<unsigned>(c.style)));
|
||||
},
|
||||
[&] (const usertype_constructor& uc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const temporary& t) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
}, expr);
|
||||
}
|
||||
|
||||
assignment_testable::vector_test_result
|
||||
test_assignment_any_size_float_vector(const expression& expr) {
|
||||
return test_assignment_any_size_float_vector<cql3_type::kind::FLOAT, cql3_type::kind::DOUBLE>(expr);
|
||||
}
|
||||
|
||||
expression
|
||||
prepare_expression(const expression& expr, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
|
||||
auto e_opt = try_prepare_expression(expr, db, keyspace, schema_opt, std::move(receiver));
|
||||
@@ -1573,9 +1467,6 @@ public:
|
||||
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const override {
|
||||
return expr::test_assignment(_e, db, keyspace, schema_opt, receiver);
|
||||
}
|
||||
virtual vector_test_result test_assignment_any_size_float_vector() const override {
|
||||
return expr::test_assignment_any_size_float_vector(_e);
|
||||
}
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
return fmt::format("{}", _e);
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
#include "cql3/functions/user_function.hh"
|
||||
#include "cql3/functions/user_aggregate.hh"
|
||||
#include "cql3/functions/uuid_fcts.hh"
|
||||
#include "cql3/functions/vector_similarity_fcts.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "as_json_function.hh"
|
||||
#include "cql3/prepare_context.hh"
|
||||
@@ -399,14 +398,6 @@ functions::get(data_dictionary::database db,
|
||||
}
|
||||
});
|
||||
|
||||
const auto func_name = name.has_keyspace() ? name : name.as_native_function();
|
||||
if (SIMILARITY_FUNCTIONS.contains(func_name)) {
|
||||
auto arg_types = retrieve_vector_arg_types(func_name, provided_args);
|
||||
auto fun = ::make_shared<vector_similarity_fct>(func_name.name, arg_types);
|
||||
validate_types(db, keyspace, schema.get(), fun, provided_args, receiver_ks, receiver_cf);
|
||||
return fun;
|
||||
}
|
||||
|
||||
if (name.has_keyspace()
|
||||
? name == TOKEN_FUNCTION_NAME
|
||||
: name.name == TOKEN_FUNCTION_NAME.name) {
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "vector_similarity_fcts.hh"
|
||||
#include "types/types.hh"
|
||||
#include "types/vector.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace cql3 {
|
||||
namespace functions {
|
||||
namespace {
|
||||
|
||||
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
|
||||
// There exist tests checking the compliance of the results.
|
||||
// Reference:
|
||||
// https://github.com/datastax/jvector/blob/f967f1c9249035b63b55a566fac7d4dc38380349/jvector-base/src/main/java/io/github/jbellis/jvector/vector/VectorSimilarityFunction.java#L36-L69
|
||||
|
||||
// You should only use this function if you need to preserve the original vectors and cannot normalize
|
||||
// them in advance.
|
||||
float compute_cosine_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double dot_product = 0.0;
|
||||
double squared_norm_a = 0.0;
|
||||
double squared_norm_b = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
|
||||
dot_product += a * b;
|
||||
squared_norm_a += a * a;
|
||||
squared_norm_b += b * b;
|
||||
}
|
||||
|
||||
if (squared_norm_a == 0 || squared_norm_b == 0) {
|
||||
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
|
||||
}
|
||||
|
||||
// The cosine similarity is in the range [-1, 1].
|
||||
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
|
||||
// for consistency with other similarity functions.
|
||||
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
|
||||
}
|
||||
|
||||
float compute_euclidean_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double sum = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
|
||||
double diff = a - b;
|
||||
sum += diff * diff;
|
||||
}
|
||||
|
||||
// The squared Euclidean (L2) distance is of range [0, inf).
|
||||
// It is mapped to a similarity score in the range (0, 1] (0 -> 1, inf -> 0)
|
||||
// for consistency with other similarity functions.
|
||||
return (1 / (1 + sum));
|
||||
}
|
||||
|
||||
// Assumes that both vectors are L2-normalized.
|
||||
// This similarity is intended as an optimized way to perform cosine similarity calculation.
|
||||
float compute_dot_product_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double dot_product = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
dot_product += a * b;
|
||||
}
|
||||
|
||||
// The dot product is in the range [-1, 1] for L2-normalized vectors.
|
||||
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
|
||||
// for consistency with other similarity functions.
|
||||
return ((1 + dot_product) / 2);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS = {
|
||||
{SIMILARITY_COSINE_FUNCTION_NAME, compute_cosine_similarity},
|
||||
{SIMILARITY_EUCLIDEAN_FUNCTION_NAME, compute_euclidean_similarity},
|
||||
{SIMILARITY_DOT_PRODUCT_FUNCTION_NAME, compute_dot_product_similarity},
|
||||
};
|
||||
|
||||
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args) {
|
||||
if (provided_args.size() != 2) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Invalid number of arguments for function {}(vector<float, n>, vector<float, n>)", name));
|
||||
}
|
||||
|
||||
auto [first_result, first_dim_opt] = provided_args[0]->test_assignment_any_size_float_vector();
|
||||
auto [second_result, second_dim_opt] = provided_args[1]->test_assignment_any_size_float_vector();
|
||||
|
||||
auto invalid_type_error_message = [&name](const shared_ptr<assignment_testable>& arg) {
|
||||
auto type = arg->assignment_testable_type_opt();
|
||||
const auto& source_context = arg->assignment_testable_source_context();
|
||||
if (type) {
|
||||
return fmt::format("Function {} requires a float vector argument, but found {} of type {}", name, source_context, type.value()->cql3_type_name());
|
||||
} else {
|
||||
return fmt::format("Function {} requires a float vector argument, but found {}", name, source_context);
|
||||
}
|
||||
};
|
||||
|
||||
if (!is_assignable(first_result)) {
|
||||
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[0]));
|
||||
}
|
||||
if (!is_assignable(second_result)) {
|
||||
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[1]));
|
||||
}
|
||||
|
||||
if (!first_dim_opt && !second_dim_opt) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot infer type of argument {} for function {}(vector<float, n>, vector<float, n>)",
|
||||
provided_args[0]->assignment_testable_source_context(), name));
|
||||
}
|
||||
if (first_dim_opt && second_dim_opt) {
|
||||
if (*first_dim_opt != *second_dim_opt) {
|
||||
throw exceptions::invalid_request_exception(fmt::format(
|
||||
"All arguments must have the same vector dimensions, but found vector<float, {}> and vector<float, {}>", *first_dim_opt, *second_dim_opt));
|
||||
}
|
||||
}
|
||||
|
||||
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
|
||||
auto type = vector_type_impl::get_instance(float_type, dimension);
|
||||
return {type, type};
|
||||
}
|
||||
|
||||
bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters) {
|
||||
if (std::any_of(parameters.begin(), parameters.end(), [](const auto& param) {
|
||||
return !param;
|
||||
})) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const auto& type = arg_types()[0];
|
||||
data_value v1 = type->deserialize(*parameters[0]);
|
||||
data_value v2 = type->deserialize(*parameters[1]);
|
||||
const auto& v1_elements = value_cast<std::vector<data_value>>(v1);
|
||||
const auto& v2_elements = value_cast<std::vector<data_value>>(v2);
|
||||
|
||||
float result = SIMILARITY_FUNCTIONS.at(_name)(v1_elements, v2_elements);
|
||||
return float_type->decompose(result);
|
||||
}
|
||||
|
||||
} // namespace functions
|
||||
} // namespace cql3
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "native_scalar_function.hh"
|
||||
#include "cql3/assignment_testable.hh"
|
||||
#include "cql3/functions/function_name.hh"
|
||||
|
||||
namespace cql3 {
|
||||
namespace functions {
|
||||
|
||||
static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::native_function("similarity_cosine");
|
||||
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
|
||||
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
|
||||
|
||||
using similarity_function_t = float (*)(const std::vector<data_value>&, const std::vector<data_value>&);
|
||||
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
|
||||
|
||||
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
|
||||
|
||||
class vector_similarity_fct : public native_scalar_function {
|
||||
public:
|
||||
vector_similarity_fct(const sstring& name, const std::vector<data_type>& arg_types)
|
||||
: native_scalar_function(name, float_type, arg_types) {
|
||||
}
|
||||
|
||||
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
|
||||
};
|
||||
|
||||
} // namespace functions
|
||||
} // namespace cql3
|
||||
@@ -64,10 +64,6 @@ bool query_processor::topology_global_queue_empty() {
|
||||
return remote().first.get().ss.topology_global_queue_empty();
|
||||
}
|
||||
|
||||
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
|
||||
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
|
||||
}
|
||||
|
||||
static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
|
||||
@@ -474,7 +474,6 @@ public:
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
|
||||
|
||||
query_options make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
|
||||
@@ -1322,10 +1322,6 @@ const std::vector<expr::expression>& statement_restrictions::index_restrictions(
|
||||
return _index_restrictions;
|
||||
}
|
||||
|
||||
bool statement_restrictions::is_empty() const {
|
||||
return !_where.has_value();
|
||||
}
|
||||
|
||||
// Current score table:
|
||||
// local and restrictions include full partition key: 2
|
||||
// global: 1
|
||||
|
||||
@@ -408,8 +408,6 @@ public:
|
||||
|
||||
/// Checks that the primary key restrictions don't contain null values, throws invalid_request_exception otherwise.
|
||||
void validate_primary_key(const query_options& options) const;
|
||||
|
||||
bool is_empty() const;
|
||||
};
|
||||
|
||||
statement_restrictions analyze_statement_restrictions(
|
||||
|
||||
@@ -32,7 +32,7 @@ bool
|
||||
selectable_processes_selection(const expr::expression& selectable) {
|
||||
return expr::visit(overloaded_functor{
|
||||
[&] (const expr::constant&) -> bool {
|
||||
return true;
|
||||
on_internal_error(slogger, "no way to express SELECT constant in the grammar yet");
|
||||
},
|
||||
[&] (const expr::conjunction& conj) -> bool {
|
||||
on_internal_error(slogger, "no way to express 'SELECT a AND b' in the grammar yet");
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "seastar/coroutine/exception.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
@@ -139,7 +138,6 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
bool unknown_keyspace = false;
|
||||
try {
|
||||
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
|
||||
auto ks = qp.db().find_keyspace(_name);
|
||||
@@ -160,12 +158,8 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
@@ -248,15 +242,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
target_type,
|
||||
keyspace());
|
||||
mc.add_mutations(std::move(muts), "CQL alter keyspace");
|
||||
co_return std::make_tuple(std::move(ret), warnings);
|
||||
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
|
||||
} catch (data_dictionary::no_such_keyspace& e) {
|
||||
unknown_keyspace = true;
|
||||
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
if (unknown_keyspace) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
std::unreachable();
|
||||
}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
|
||||
@@ -190,7 +190,7 @@ future<utils::chunked_vector<mutation>> batch_statement::get_mutations(query_pro
|
||||
co_return vresult;
|
||||
}
|
||||
|
||||
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const {
|
||||
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) {
|
||||
if (mutations.size() <= 1) {
|
||||
return; // We only warn for batch spanning multiple mutations
|
||||
}
|
||||
@@ -209,9 +209,8 @@ void batch_statement::verify_batch_size(query_processor& qp, const utils::chunke
|
||||
for (auto&& m : mutations) {
|
||||
ks_cf_pairs.insert(m.schema()->ks_name() + "." + m.schema()->cf_name());
|
||||
}
|
||||
const auto batch_type = _type == type::LOGGED ? "Logged" : "Unlogged";
|
||||
return seastar::format("{} batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
|
||||
batch_type, mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
|
||||
return seastar::format("Batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
|
||||
mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
|
||||
};
|
||||
if (size > fail_threshold) {
|
||||
_logger.error("{}", error("FAIL", fail_threshold).c_str());
|
||||
|
||||
@@ -116,7 +116,7 @@ public:
|
||||
* Checks batch size to ensure threshold is met. If not, a warning is logged.
|
||||
* @param cfs ColumnFamilies that will store the batch's mutations.
|
||||
*/
|
||||
void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const;
|
||||
static void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations);
|
||||
|
||||
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
@@ -710,12 +710,11 @@ std::vector<lw_shared_ptr<column_specification>> listing_describe_statement::get
|
||||
|
||||
future<std::vector<std::vector<managed_bytes_opt>>> listing_describe_statement::describe(cql3::query_processor& qp, const service::client_state& client_state) const {
|
||||
auto db = qp.db();
|
||||
auto raw_ks = client_state.get_raw_keyspace();
|
||||
|
||||
std::vector<sstring> keyspaces;
|
||||
// For most describe statements we should limit the results to the USEd
|
||||
// keyspace (client_state.get_raw_keyspace()), if any. However for DESC
|
||||
// KEYSPACES we must list all keyspaces, not just the USEd one.
|
||||
if (_element != element_type::keyspace && !client_state.get_raw_keyspace().empty()) {
|
||||
keyspaces.push_back(client_state.get_raw_keyspace());
|
||||
if (!raw_ks.empty()) {
|
||||
keyspaces.push_back(raw_ks);
|
||||
} else {
|
||||
keyspaces = db.get_all_keyspaces();
|
||||
std::ranges::sort(keyspaces);
|
||||
|
||||
@@ -61,7 +61,7 @@ expand_to_racks(const locator::token_metadata& tm,
|
||||
|
||||
// Handle ALTER:
|
||||
// ([]|0) -> numeric is allowed, there are no existing replicas
|
||||
// numeric -> numeric' is not supported unless numeric == numeric'. User should convert RF to rack list of equal count first.
|
||||
// numeric -> numeric' is not supported. User should convert RF to rack list of equal count first.
|
||||
// rack_list -> len(rack_list) is allowed (no-op)
|
||||
// rack_list -> numeric is not allowed
|
||||
if (old_options.contains(dc)) {
|
||||
@@ -75,8 +75,6 @@ expand_to_racks(const locator::token_metadata& tm,
|
||||
"Cannot change replication factor for '{}' from {} to numeric {}, use rack list instead",
|
||||
dc, old_rf_val, data.count()));
|
||||
}
|
||||
} else if (old_rf.count() == data.count()) {
|
||||
return rf;
|
||||
} else if (old_rf.count() > 0) {
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Cannot change replication factor for '{}' from {} to {}, only rack list is allowed",
|
||||
@@ -155,8 +153,6 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
}
|
||||
|
||||
// Validate options.
|
||||
bool numeric_to_rack_list_transition = false;
|
||||
bool rf_change = false;
|
||||
for (auto&& [dc, opt] : options) {
|
||||
locator::replication_factor_data rf(opt);
|
||||
|
||||
@@ -166,7 +162,6 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
old_rf = locator::replication_factor_data(i->second);
|
||||
}
|
||||
|
||||
rf_change = rf_change || (old_rf && old_rf->count() != rf.count()) || (!old_rf && rf.count() != 0);
|
||||
if (!rf.is_rack_based()) {
|
||||
if (old_rf && old_rf->is_rack_based() && rf.count() != 0) {
|
||||
if (old_rf->count() != rf.count()) {
|
||||
@@ -192,11 +187,12 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Rack list for '{}' contains duplicate entries", dc));
|
||||
}
|
||||
numeric_to_rack_list_transition = numeric_to_rack_list_transition || (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0);
|
||||
}
|
||||
|
||||
if (numeric_to_rack_list_transition && rf_change) {
|
||||
throw exceptions::configuration_exception("Cannot change replication factor from numeric to rack list and rf value at the same time");
|
||||
if (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0) {
|
||||
// FIXME: Allow this if replicas already conform to the given rack list.
|
||||
// FIXME: Implement automatic colocation to allow transition to rack list.
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Cannot change replication factor from numeric to rack list for '{}'", dc));
|
||||
}
|
||||
}
|
||||
|
||||
if (!rf && options.empty() && old_options.empty()) {
|
||||
@@ -416,7 +412,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
|
||||
? std::optional<unsigned>(0) : std::nullopt;
|
||||
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
|
||||
bool uses_tablets = initial_tablets.has_value();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
bool rack_list_enabled = feat.rack_list_rf;
|
||||
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
@@ -432,7 +428,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
throw exceptions::invalid_request_exception("Cannot alter replication strategy vnode/tablets flavor");
|
||||
}
|
||||
auto sc = get_replication_strategy_class();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
bool rack_list_enabled = feat.rack_list_rf;
|
||||
if (sc) {
|
||||
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
|
||||
} else {
|
||||
|
||||
@@ -1976,7 +1976,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
if (it == indexes.end()) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
|
||||
}
|
||||
if (index_opt || parameters->allow_filtering() || !(restrictions->is_empty()) || check_needs_allow_filtering_anyway(*restrictions)) {
|
||||
if (index_opt || parameters->allow_filtering() || restrictions->need_filtering() || check_needs_allow_filtering_anyway(*restrictions)) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector does not support filtering");
|
||||
}
|
||||
index_opt = *it;
|
||||
|
||||
@@ -42,11 +42,6 @@ table::get_index_manager() const {
|
||||
return _ops->get_index_manager(*this);
|
||||
}
|
||||
|
||||
db_clock::time_point
|
||||
table::get_truncation_time() const {
|
||||
return _ops->get_truncation_time(*this);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
keyspace::metadata() const {
|
||||
return _ops->get_keyspace_metadata(*this);
|
||||
|
||||
@@ -77,7 +77,6 @@ public:
|
||||
schema_ptr schema() const;
|
||||
const std::vector<view_ptr>& views() const;
|
||||
const secondary_index::secondary_index_manager& get_index_manager() const;
|
||||
db_clock::time_point get_truncation_time() const;
|
||||
};
|
||||
|
||||
class keyspace {
|
||||
|
||||
@@ -27,7 +27,6 @@ public:
|
||||
virtual std::optional<table> try_find_table(database db, table_id id) const = 0;
|
||||
virtual const secondary_index::secondary_index_manager& get_index_manager(table t) const = 0;
|
||||
virtual schema_ptr get_table_schema(table t) const = 0;
|
||||
virtual db_clock::time_point get_truncation_time(table t) const = 0;
|
||||
virtual lw_shared_ptr<keyspace_metadata> get_keyspace_metadata(keyspace ks) const = 0;
|
||||
virtual bool is_internal(keyspace ks) const = 0;
|
||||
virtual const locator::abstract_replication_strategy& get_replication_strategy(keyspace ks) const = 0;
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mutation/mutation.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id);
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id);
|
||||
|
||||
}
|
||||
@@ -10,24 +10,20 @@
|
||||
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include <ranges>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
#include "batchlog_manager.hh"
|
||||
#include "batchlog.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "utils/rate_limiter.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/murmur_hash.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "idl/frozen_schema.dist.hh"
|
||||
@@ -37,94 +33,17 @@
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
static logging::logger blogger("batchlog_manager");
|
||||
|
||||
namespace db {
|
||||
|
||||
// Yields 256 batchlog shards. Even on the largest nodes we currently run on,
|
||||
// this should be enough to give every core a batchlog partition.
|
||||
static constexpr unsigned batchlog_shard_bits = 8;
|
||||
|
||||
int32_t batchlog_shard_of(db_clock::time_point written_at) {
|
||||
const int64_t count = written_at.time_since_epoch().count();
|
||||
std::array<uint64_t, 2> result;
|
||||
utils::murmur_hash::hash3_x64_128(bytes_view(reinterpret_cast<const signed char*>(&count), sizeof(count)), 0, result);
|
||||
uint64_t hash = result[0] ^ result[1];
|
||||
return hash & ((1ULL << batchlog_shard_bits) - 1);
|
||||
}
|
||||
|
||||
std::pair<partition_key, clustering_key>
|
||||
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
||||
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
|
||||
|
||||
std::vector<bytes> ckey_components;
|
||||
ckey_components.reserve(2);
|
||||
ckey_components.push_back(serialized(written_at));
|
||||
if (id) {
|
||||
ckey_components.push_back(serialized(*id));
|
||||
}
|
||||
auto ckey = clustering_key::from_exploded(schema, ckey_components);
|
||||
|
||||
return {std::move(pkey), std::move(ckey)};
|
||||
}
|
||||
|
||||
std::pair<partition_key, clustering_key>
|
||||
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
||||
return get_batchlog_key(schema, version, stage, batchlog_shard_of(written_at), written_at, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
||||
|
||||
auto timestamp = api::new_timestamp();
|
||||
|
||||
mutation m(schema, key);
|
||||
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
|
||||
auto cdef_data = schema->get_column_definition(to_bytes("data"));
|
||||
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
|
||||
|
||||
return m;
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto data = [&mutations] {
|
||||
utils::chunked_vector<canonical_mutation> fm(mutations.begin(), mutations.end());
|
||||
bytes_ostream out;
|
||||
for (auto& m : fm) {
|
||||
ser::serialize(out, m);
|
||||
}
|
||||
return std::move(out).to_managed_bytes();
|
||||
}();
|
||||
|
||||
return get_batchlog_mutation_for(std::move(schema), std::move(data), version, stage, now, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
||||
return get_batchlog_mutation_for(std::move(schema), mutations, version, batchlog_stage::initial, now, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
||||
mutation m(schema, key);
|
||||
auto timestamp = api::new_timestamp();
|
||||
m.partition().apply_delete(*schema, ckey, tombstone(timestamp, gc_clock::now()));
|
||||
return m;
|
||||
}
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
||||
return get_batchlog_delete_mutation(std::move(schema), version, batchlog_stage::initial, now, id);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
const std::chrono::seconds db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
|
||||
: _qp(qp)
|
||||
, _sys_ks(sys_ks)
|
||||
, _replay_timeout(config.replay_timeout)
|
||||
, _write_request_timeout(std::chrono::duration_cast<db_clock::duration>(config.write_request_timeout))
|
||||
, _replay_rate(config.replay_rate)
|
||||
, _delay(config.delay)
|
||||
, _replay_cleanup_after_replays(config.replay_cleanup_after_replays)
|
||||
@@ -233,75 +152,18 @@ future<> db::batchlog_manager::stop() {
|
||||
}
|
||||
|
||||
future<size_t> db::batchlog_manager::count_all_batches() const {
|
||||
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> rs) {
|
||||
return size_t(rs->one().get_as<int64_t>("count"));
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
|
||||
if (_migration_done) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return with_gate(_gate, [this] () mutable -> future<> {
|
||||
blogger.info("Migrating batchlog entries from v1 -> v2");
|
||||
|
||||
auto schema_v1 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto schema_v2 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
auto batch = [this, schema_v1, schema_v2] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
// check version of serialization format
|
||||
if (!row.has("version")) {
|
||||
blogger.warn("Not migrating logged batch because of unknown version");
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto version = row.get_as<int32_t>("version");
|
||||
if (version != netw::messaging_service::current_version) {
|
||||
blogger.warn("Not migrating logged batch because of incorrect version");
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto data = row.get_blob_fragmented("data");
|
||||
|
||||
auto& sp = _qp.proxy();
|
||||
|
||||
utils::get_local_injector().inject("batchlog_manager_fail_migration", [] { throw std::runtime_error("Error injection: failing batchlog migration"); });
|
||||
|
||||
auto migrate_mut = get_batchlog_mutation_for(schema_v2, std::move(data), version, batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(migrate_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
mutation delete_mut(schema_v1, partition_key::from_single_value(*schema_v1, serialized(id)));
|
||||
delete_mut.partition().apply_delete(*schema_v1, clustering_key_prefix::make_empty(), tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
co_await sp.mutate_locally(delete_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
try {
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
page_size,
|
||||
std::move(batch));
|
||||
} catch (...) {
|
||||
blogger.warn("Batchlog v1 to v2 migration failed: {}; will retry", std::current_exception());
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await container().invoke_on_all([] (auto& bm) {
|
||||
bm._migration_done = true;
|
||||
});
|
||||
|
||||
blogger.info("Done migrating batchlog entries from v1 -> v2");
|
||||
});
|
||||
db_clock::duration db::batchlog_manager::get_batch_log_timeout() const {
|
||||
// enough time for the actual write + BM removal mutation
|
||||
return _write_request_timeout * 2;
|
||||
}
|
||||
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
|
||||
co_await maybe_migrate_v1_to_v2();
|
||||
|
||||
typedef db_clock::rep clock_type;
|
||||
|
||||
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
||||
@@ -310,26 +172,21 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
||||
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
struct replay_stats {
|
||||
std::optional<db_clock::time_point> min_too_fresh;
|
||||
bool need_cleanup = false;
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto delete_batch = [this, schema = std::move(schema)] (utils::UUID id) {
|
||||
auto key = partition_key::from_singular(*schema, id);
|
||||
mutation m(schema, key);
|
||||
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
|
||||
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
|
||||
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
};
|
||||
|
||||
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
||||
|
||||
// Use a stable `now` across all batches, so skip/replay decisions are the
|
||||
// same across a while prefix of written_at (across all ids).
|
||||
const auto now = db_clock::now();
|
||||
|
||||
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
|
||||
const auto batch_shard = row.get_as<int32_t>("shard");
|
||||
auto batch = [this, limiter, delete_batch = std::move(delete_batch), &all_replayed](const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
auto timeout = _replay_timeout;
|
||||
auto now = db_clock::now();
|
||||
auto timeout = get_batch_log_timeout();
|
||||
|
||||
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
||||
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
||||
@@ -337,48 +194,52 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
// check version of serialization format
|
||||
if (!row.has("version")) {
|
||||
blogger.warn("Skipping logged batch because of unknown version");
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto version = row.get_as<int32_t>("version");
|
||||
if (version != netw::messaging_service::current_version) {
|
||||
blogger.warn("Skipping logged batch because of incorrect version {}; current version = {}", version, netw::messaging_service::current_version);
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto data = row.get_blob_unfragmented("data");
|
||||
|
||||
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
|
||||
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
bool send_failed = false;
|
||||
|
||||
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
|
||||
blogger.debug("Replaying batch {}", id);
|
||||
|
||||
try {
|
||||
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
|
||||
auto fms = make_lw_shared<std::deque<canonical_mutation>>();
|
||||
auto in = ser::as_input_stream(data);
|
||||
while (in.size()) {
|
||||
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
|
||||
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
|
||||
if (!tbl) {
|
||||
continue;
|
||||
}
|
||||
if (written_at <= tbl->get_truncation_time()) {
|
||||
continue;
|
||||
}
|
||||
schema_ptr s = tbl->schema();
|
||||
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
fms.emplace_back(std::move(fm), std::move(s));
|
||||
fms->emplace_back(ser::deserialize(in, std::type_identity<canonical_mutation>()));
|
||||
schema_ptr s = _qp.db().find_schema(fms->back().column_family_id());
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
|
||||
if (now < written_at + timeout) {
|
||||
blogger.debug("Skipping replay of {}, too fresh", id);
|
||||
|
||||
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
|
||||
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto size = data.size();
|
||||
|
||||
for (const auto& [fm, s] : fms) {
|
||||
mutations.emplace_back(fm.to_mutation(s));
|
||||
co_await maybe_yield();
|
||||
}
|
||||
auto mutations = co_await map_reduce(*fms, [this, written_at] (canonical_mutation& fm) {
|
||||
const auto& cf = _qp.proxy().local_db().find_column_family(fm.column_family_id());
|
||||
return make_ready_future<canonical_mutation*>(written_at > cf.get_truncation_time() ? &fm : nullptr);
|
||||
},
|
||||
utils::chunked_vector<mutation>(),
|
||||
[this] (utils::chunked_vector<mutation> mutations, canonical_mutation* fm) {
|
||||
if (fm) {
|
||||
schema_ptr s = _qp.db().find_schema(fm->column_family_id());
|
||||
mutations.emplace_back(fm->to_mutation(s));
|
||||
}
|
||||
return mutations;
|
||||
});
|
||||
|
||||
if (!mutations.empty()) {
|
||||
const auto ttl = [written_at]() -> clock_type {
|
||||
@@ -404,11 +265,7 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
co_await limiter->reserve(size);
|
||||
_stats.write_attempts += mutations.size();
|
||||
auto timeout = db::timeout_clock::now() + write_timeout;
|
||||
if (cleanup) {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
|
||||
} else {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
}
|
||||
} catch (data_dictionary::no_such_keyspace& ex) {
|
||||
@@ -422,80 +279,31 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
// Do _not_ remove the batch, assuning we got a node write error.
|
||||
// Since we don't have hints (which origin is satisfied with),
|
||||
// we have to resort to keeping this batch to next lap.
|
||||
if (!cleanup || stage == batchlog_stage::failed_replay) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
send_failed = true;
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto& sp = _qp.proxy();
|
||||
|
||||
if (send_failed) {
|
||||
blogger.debug("Moving batch {} to stage failed_replay", id);
|
||||
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
}
|
||||
|
||||
// delete batch
|
||||
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
|
||||
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
shard_written_at.need_cleanup = true;
|
||||
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
|
||||
co_await with_gate(_gate, [this, cleanup, &all_replayed, batch = std::move(batch), now, &replay_stats_per_shard] () mutable -> future<> {
|
||||
blogger.debug("Started replayAllFailedBatches with cleanup: {}", cleanup);
|
||||
co_await with_gate(_gate, [this, cleanup, batch = std::move(batch)] () mutable -> future<> {
|
||||
blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id());
|
||||
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
co_await coroutine::parallel_for_each(std::views::iota(0, 16), [&] (int32_t chunk) -> future<> {
|
||||
const int32_t batchlog_chunk_base = chunk * 16;
|
||||
for (int32_t i = 0; i < 16; ++i) {
|
||||
int32_t batchlog_shard = batchlog_chunk_base + i;
|
||||
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
||||
db::consistency_level::ONE,
|
||||
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::failed_replay)), data_value(batchlog_shard)},
|
||||
page_size,
|
||||
batch);
|
||||
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
||||
db::consistency_level::ONE,
|
||||
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::initial)), data_value(batchlog_shard)},
|
||||
page_size,
|
||||
batch);
|
||||
|
||||
if (cleanup != post_replay_cleanup::yes) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto it = replay_stats_per_shard.find(batchlog_shard);
|
||||
if (it == replay_stats_per_shard.end() || !it->second.need_cleanup) {
|
||||
// Nothing was replayed on this batchlog shard, nothing to cleanup.
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto write_time = it->second.min_too_fresh.value_or(now - _replay_timeout);
|
||||
const auto end_weight = it->second.min_too_fresh ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed;
|
||||
auto [key, ckey] = get_batchlog_key(*schema, netw::messaging_service::current_version, batchlog_stage::initial, batchlog_shard, write_time, {});
|
||||
auto end_pos = position_in_partition(partition_region::clustered, end_weight, std::move(ckey));
|
||||
|
||||
range_tombstone rt(position_in_partition::before_all_clustered_rows(), std::move(end_pos), tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
|
||||
blogger.trace("Clean up batchlog shard {} with range tombstone {}", batchlog_shard, rt);
|
||||
|
||||
mutation m(schema, key);
|
||||
m.partition().apply_row_tombstone(*schema, std::move(rt));
|
||||
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT id, data, written_at, version FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
page_size,
|
||||
std::move(batch)).then([this, cleanup] {
|
||||
if (cleanup == post_replay_cleanup::no) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// Replaying batches could have generated tombstones, flush to disk,
|
||||
// where they can be compacted away.
|
||||
return replica::database::flush_table_on_all_shards(_qp.proxy().get_db(), system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
}).then([] {
|
||||
blogger.debug("Finished replayAllFailedBatches");
|
||||
});
|
||||
|
||||
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
|
||||
});
|
||||
|
||||
co_return all_replayed;
|
||||
|
||||
@@ -34,17 +34,12 @@ class system_keyspace;
|
||||
using all_batches_replayed = bool_class<struct all_batches_replayed_tag>;
|
||||
|
||||
struct batchlog_manager_config {
|
||||
db_clock::duration replay_timeout;
|
||||
std::chrono::duration<double> write_request_timeout;
|
||||
uint64_t replay_rate = std::numeric_limits<uint64_t>::max();
|
||||
std::chrono::milliseconds delay = std::chrono::milliseconds(0);
|
||||
unsigned replay_cleanup_after_replays;
|
||||
};
|
||||
|
||||
enum class batchlog_stage : int8_t {
|
||||
initial,
|
||||
failed_replay
|
||||
};
|
||||
|
||||
class batchlog_manager : public peering_sharded_service<batchlog_manager> {
|
||||
public:
|
||||
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
|
||||
@@ -64,7 +59,7 @@ private:
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
db::system_keyspace& _sys_ks;
|
||||
db_clock::duration _replay_timeout;
|
||||
db_clock::duration _write_request_timeout;
|
||||
uint64_t _replay_rate;
|
||||
std::chrono::milliseconds _delay;
|
||||
unsigned _replay_cleanup_after_replays = 100;
|
||||
@@ -76,14 +71,6 @@ private:
|
||||
|
||||
gc_clock::time_point _last_replay;
|
||||
|
||||
// Was the v1 -> v2 migration already done since last restart?
|
||||
// The migration is attempted once after each restart. This is redundant but
|
||||
// keeps thing simple. Once no upgrade path exists from a ScyllaDB version
|
||||
// which can still produce v1 entries, this migration code can be removed.
|
||||
bool _migration_done = false;
|
||||
|
||||
future<> maybe_migrate_v1_to_v2();
|
||||
|
||||
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
|
||||
public:
|
||||
// Takes a QP, not a distributes. Because this object is supposed
|
||||
@@ -98,13 +85,10 @@ public:
|
||||
future<all_batches_replayed> do_batch_log_replay(post_replay_cleanup cleanup);
|
||||
|
||||
future<size_t> count_all_batches() const;
|
||||
db_clock::duration get_batch_log_timeout() const;
|
||||
gc_clock::time_point get_last_replay() const {
|
||||
return _last_replay;
|
||||
}
|
||||
|
||||
const stats& stats() const {
|
||||
return _stats;
|
||||
}
|
||||
private:
|
||||
future<> batchlog_replay_loop();
|
||||
};
|
||||
|
||||
@@ -54,14 +54,12 @@ public:
|
||||
uint64_t applied_mutations = 0;
|
||||
uint64_t corrupt_bytes = 0;
|
||||
uint64_t truncated_at = 0;
|
||||
uint64_t broken_files = 0;
|
||||
|
||||
stats& operator+=(const stats& s) {
|
||||
invalid_mutations += s.invalid_mutations;
|
||||
skipped_mutations += s.skipped_mutations;
|
||||
applied_mutations += s.applied_mutations;
|
||||
corrupt_bytes += s.corrupt_bytes;
|
||||
broken_files += s.broken_files;
|
||||
return *this;
|
||||
}
|
||||
stats operator+(const stats& s) const {
|
||||
@@ -194,8 +192,6 @@ db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const comm
|
||||
s->corrupt_bytes += e.bytes();
|
||||
} catch (commitlog::segment_truncation& e) {
|
||||
s->truncated_at = e.position();
|
||||
} catch (commitlog::header_checksum_error&) {
|
||||
++s->broken_files;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
@@ -374,9 +370,6 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
|
||||
if (stats.truncated_at != 0) {
|
||||
rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at);
|
||||
}
|
||||
if (stats.broken_files != 0) {
|
||||
rlogger.warn("Corrupted file header: {}. Skipped.", f);
|
||||
}
|
||||
rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
|
||||
25
db/config.cc
25
db/config.cc
@@ -1105,14 +1105,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142,
|
||||
"Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_transport_port_proxy_protocol(this, "native_transport_port_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the CQL native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, native_transport_port_ssl_proxy_protocol(this, "native_transport_port_ssl_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the CQL TLS native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, native_shard_aware_transport_port_proxy_protocol(this, "native_shard_aware_transport_port_proxy_protocol", value_status::Used, 0,
|
||||
"Like native_transport_port_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_shard_aware_transport_port_ssl_proxy_protocol(this, "native_shard_aware_transport_port_ssl_proxy_protocol", value_status::Used, 0,
|
||||
"Like native_transport_port_ssl_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128,
|
||||
"The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n"
|
||||
"Default is different (128 versus unlimited).\n"
|
||||
@@ -1160,7 +1152,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Number of threads with which to deliver hints. In multiple data-center deployments, consider increasing this number because cross data-center handoff is generally slower.")
|
||||
, batchlog_replay_throttle_in_kb(this, "batchlog_replay_throttle_in_kb", value_status::Unused, 1024,
|
||||
"Total maximum throttle. Throttling is reduced proportionally to the number of nodes in the cluster.")
|
||||
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 1,
|
||||
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 60,
|
||||
"Clean up batchlog memtable after every N replays. Replays are issued on a timer, every 60 seconds. So if batchlog_replay_cleanup_after_replays is set to 60, the batchlog memtable is flushed every 60 * 60 seconds.")
|
||||
/**
|
||||
* @Group Request scheduler properties
|
||||
@@ -1478,15 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
|
||||
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
|
||||
, alternator_describe_table_info_cache_validity_in_seconds(this, "alternator_describe_table_info_cache_validity_in_seconds", liveness::LiveUpdate, value_status::Used, 60 * 60 * 6,
|
||||
"The validity of DescribeTable information - table size in bytes. This is how long calculated value will be reused before recalculation.")
|
||||
, alternator_response_gzip_compression_level(this, "alternator_response_gzip_compression_level", liveness::LiveUpdate, value_status::Used, int8_t(6),
|
||||
"Controls gzip and deflate compression level for Alternator response bodies (if the client requests it via Accept-Encoding header) Default of 6 is a compromise between speed and compression.\n"
|
||||
"Valid values:\n"
|
||||
"\t0 : No compression (disables gzip/deflate)\n"
|
||||
"\t1-9: Compression levels (1 = fastest, 9 = best compression)")
|
||||
, alternator_response_compression_threshold_in_bytes(this, "alternator_response_compression_threshold_in_bytes", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"When the compression is enabled, this value indicates the minimum size of data to compress. Smaller responses will not be compressed.")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
|
||||
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
|
||||
@@ -1583,12 +1566,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
|
||||
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
|
||||
"Tablet load stats refresh rate in seconds.")
|
||||
, force_capacity_based_balancing(this, "force_capacity_based_balancing", liveness::LiveUpdate, value_status::Used, false,
|
||||
"Forces the load balancer to perform capacity based balancing, instead of size based balancing.")
|
||||
, size_based_balance_threshold_percentage(this, "size_based_balance_threshold_percentage", liveness::LiveUpdate, value_status::Used, 1.0,
|
||||
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
|
||||
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
|
||||
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")
|
||||
|
||||
10
db/config.hh
10
db/config.hh
@@ -324,10 +324,6 @@ public:
|
||||
named_value<uint16_t> native_transport_port_ssl;
|
||||
named_value<uint16_t> native_shard_aware_transport_port;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_ssl;
|
||||
named_value<uint16_t> native_transport_port_proxy_protocol;
|
||||
named_value<uint16_t> native_transport_port_ssl_proxy_protocol;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_proxy_protocol;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_ssl_proxy_protocol;
|
||||
named_value<uint32_t> native_transport_max_threads;
|
||||
named_value<uint32_t> native_transport_max_frame_size_in_mb;
|
||||
named_value<sstring> broadcast_rpc_address;
|
||||
@@ -477,9 +473,6 @@ public:
|
||||
named_value<bool> alternator_allow_system_table_write;
|
||||
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
|
||||
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
|
||||
named_value<uint32_t> alternator_describe_table_info_cache_validity_in_seconds;
|
||||
named_value<int> alternator_response_gzip_compression_level;
|
||||
named_value<uint32_t> alternator_response_compression_threshold_in_bytes;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
@@ -597,9 +590,6 @@ public:
|
||||
named_value<bool> rf_rack_valid_keyspaces;
|
||||
|
||||
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
|
||||
named_value<bool> force_capacity_based_balancing;
|
||||
named_value<float> size_based_balance_threshold_percentage;
|
||||
named_value<uint64_t> minimal_tablet_size_for_balancing;
|
||||
|
||||
static const sstring default_tls_priority;
|
||||
private:
|
||||
|
||||
@@ -248,7 +248,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
|
||||
// which is larger than the segment ID of the RP of the last written hint.
|
||||
cfg.base_segment_id = _last_written_rp.base_id();
|
||||
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (this auto, commitlog l) -> future<commitlog> {
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) -> future<commitlog> {
|
||||
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
|
||||
// When this happens we want to refill _sender's segments only if it has finished with the segments he had before.
|
||||
if (_sender.have_segments()) {
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
#include <seastar/core/smp.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/util/file.hh>
|
||||
|
||||
// Boost features.
|
||||
|
||||
@@ -644,12 +643,6 @@ future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (!replay_allowed()) {
|
||||
auto reason = seastar::format("Precondition violdated while trying to drain {} / {}: "
|
||||
"hint replay is not allowed", host_id, ip);
|
||||
on_internal_error(manager_logger, std::move(reason));
|
||||
}
|
||||
|
||||
manager_logger.info("Draining starts for {}", host_id);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
@@ -906,7 +899,7 @@ future<> manager::migrate_ip_directories() {
|
||||
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
|
||||
try {
|
||||
manager_logger.warn("Removing hint directory {}", directory.native());
|
||||
co_await seastar::recursive_remove_directory(directory);
|
||||
co_await lister::rmdir(directory);
|
||||
} catch (...) {
|
||||
on_internal_error(manager_logger,
|
||||
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));
|
||||
|
||||
@@ -318,10 +318,6 @@ public:
|
||||
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
|
||||
/// corresponding hint_endpoint_manager objects.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * Hint replay must be allowed (i.e. `replay_allowed()` must be true) throughout
|
||||
/// the execution of this function.
|
||||
///
|
||||
/// \param host_id host ID of the node that left the cluster
|
||||
/// \param ip the IP of the node that left the cluster
|
||||
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
||||
@@ -346,15 +342,15 @@ public:
|
||||
return _state.contains(state::started);
|
||||
}
|
||||
|
||||
bool replay_allowed() const noexcept {
|
||||
return _state.contains(state::replay_allowed);
|
||||
}
|
||||
|
||||
private:
|
||||
void set_started() noexcept {
|
||||
_state.set(state::started);
|
||||
}
|
||||
|
||||
bool replay_allowed() const noexcept {
|
||||
return _state.contains(state::replay_allowed);
|
||||
}
|
||||
|
||||
void set_draining_all() noexcept {
|
||||
_state.set(state::draining_all);
|
||||
}
|
||||
|
||||
@@ -135,5 +135,5 @@ const std::string db::object_storage_endpoint_param::gs_type = "gs";
|
||||
|
||||
auto fmt::formatter<db::object_storage_endpoint_param>::format(const db::object_storage_endpoint_param& e, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{}", e.to_json_string());
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{{}}", e.to_json_string());
|
||||
}
|
||||
|
||||
@@ -1262,9 +1262,16 @@ static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
schema_applier ap(proxy, ss, sys_ks, reload);
|
||||
co_await execute_do_merge_schema(proxy, ap, std::move(mutations)).finally([&ap]() {
|
||||
return ap.destroy();
|
||||
});
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await execute_do_merge_schema(proxy, ap, std::move(mutations));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await ap.destroy();
|
||||
if (ex) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -152,8 +152,7 @@ future<> backup_task_impl::do_backup() {
|
||||
}
|
||||
|
||||
future<> backup_task_impl::process_snapshot_dir() {
|
||||
auto directory = co_await io_check(open_directory, _snapshot_dir.native());
|
||||
auto snapshot_dir_lister = directory_lister(directory, _snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
auto snapshot_dir_lister = directory_lister(_snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
size_t num_sstable_comps = 0;
|
||||
|
||||
try {
|
||||
@@ -162,7 +161,7 @@ future<> backup_task_impl::process_snapshot_dir() {
|
||||
while (auto component_ent = co_await snapshot_dir_lister.get()) {
|
||||
const auto& name = component_ent->name;
|
||||
auto file_path = _snapshot_dir / name;
|
||||
auto st = co_await file_stat(directory, name);
|
||||
auto st = co_await file_stat(file_path.native());
|
||||
total += st.size;
|
||||
try {
|
||||
auto desc = sstables::parse_path(file_path, "", "");
|
||||
|
||||
@@ -55,7 +55,6 @@
|
||||
#include "message/shared_dict.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "db/compaction_history_entry.hh"
|
||||
#include "mutation/async_utils.hh"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -111,7 +110,6 @@ namespace {
|
||||
system_keyspace::v3::CDC_LOCAL,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.enable_schema_commitlog();
|
||||
@@ -139,7 +137,6 @@ namespace {
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
@@ -216,30 +213,6 @@ schema_ptr system_keyspace::batchlog() {
|
||||
return batchlog;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::batchlog_v2() {
|
||||
static thread_local auto batchlog_v2 = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BATCHLOG_V2), NAME, BATCHLOG_V2,
|
||||
// partition key
|
||||
{{"version", int32_type}, {"stage", byte_type}, {"shard", int32_type}},
|
||||
// clustering key
|
||||
{{"written_at", timestamp_type}, {"id", uuid_type}},
|
||||
// regular columns
|
||||
{{"data", bytes_type}},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"batches awaiting replay"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
builder.with_hash_version();
|
||||
return builder.build(schema_builder::compact_storage::no);
|
||||
}();
|
||||
return batchlog_v2;
|
||||
}
|
||||
|
||||
/*static*/ schema_ptr system_keyspace::paxos() {
|
||||
static thread_local auto paxos = [] {
|
||||
// FIXME: switch to the new schema_builder interface (with_column(...), etc)
|
||||
@@ -312,7 +285,6 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -1419,23 +1391,6 @@ schema_ptr system_keyspace::view_building_tasks() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::client_routes() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, CLIENT_ROUTES);
|
||||
return schema_builder(NAME, CLIENT_ROUTES, std::make_optional(id))
|
||||
.with_column("connection_id", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("address", utf8_type)
|
||||
.with_column("port", int32_type)
|
||||
.with_column("tls_port", int32_type)
|
||||
.with_column("alternator_port", int32_type)
|
||||
.with_column("alternator_https_port", int32_type)
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
future<system_keyspace::local_info> system_keyspace::load_local_info() {
|
||||
auto msg = co_await execute_cql(format("SELECT host_id, cluster_name, data_center, rack FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL));
|
||||
|
||||
@@ -2349,7 +2304,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r));
|
||||
auto auth_tables = system_keyspace::auth_tables();
|
||||
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), batchlog_v2(), paxos(), local(),
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
|
||||
peers(), peer_events(), range_xfers(),
|
||||
compactions_in_progress(), compaction_history(),
|
||||
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
|
||||
@@ -2363,7 +2318,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
v3::cdc_local(),
|
||||
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
|
||||
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
|
||||
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
|
||||
dicts(), view_building_tasks(), cdc_streams_state(), cdc_streams_history()
|
||||
});
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
||||
@@ -2380,9 +2335,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s) {
|
||||
return (s.get() == system_keyspace::batchlog().get())
|
||||
|| (s.get() == system_keyspace::batchlog_v2().get())
|
||||
|| (s.get() == system_keyspace::paxos().get())
|
||||
return (s.get() == system_keyspace::batchlog().get()) || (s.get() == system_keyspace::paxos().get())
|
||||
|| s == system_keyspace::v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
@@ -3000,9 +2953,7 @@ future<mutation> system_keyspace::get_group0_history(sharded<replica::database>&
|
||||
SCYLLA_ASSERT(rs);
|
||||
auto& ps = rs->partitions();
|
||||
for (auto& p: ps) {
|
||||
// Note: we could decorate the frozen_mutation's key to check if it's the expected one
|
||||
// but since this is a single partition table, we can just check after unfreezing the whole mutation.
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
auto mut = p.mut().unfreeze(s);
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
||||
if (partition_key == GROUP0_HISTORY_KEY) {
|
||||
co_return mut;
|
||||
@@ -3160,10 +3111,7 @@ static bool must_have_tokens(service::node_state nst) {
|
||||
// A decommissioning node doesn't have tokens at the end, they are
|
||||
// removed during transition to the left_token_ring state.
|
||||
case service::node_state::decommissioning: return false;
|
||||
// A removing node might or might not have tokens depending on whether
|
||||
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
|
||||
// cases, we allow removing nodes to not have tokens.
|
||||
case service::node_state::removing: return false;
|
||||
case service::node_state::removing: return true;
|
||||
case service::node_state::rebuilding: return true;
|
||||
case service::node_state::normal: return true;
|
||||
case service::node_state::left: return false;
|
||||
@@ -3403,12 +3351,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("paused_rf_change_requests")) {
|
||||
for (auto&& v : deserialize_set_column(*topology(), some_row, "paused_rf_change_requests")) {
|
||||
ret.paused_rf_change_requests.insert(value_cast<utils::UUID>(v));
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
@@ -3620,43 +3562,35 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
return entry;
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id) {
|
||||
auto r = co_await get_topology_request_entry_opt(id);
|
||||
if (!r) {
|
||||
on_internal_error(slogger, format("no entry for request id {}", id));
|
||||
}
|
||||
co_return std::move(*r);
|
||||
}
|
||||
|
||||
future<std::optional<system_keyspace::topology_requests_entry>> system_keyspace::get_topology_request_entry_opt(utils::UUID id) {
|
||||
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id, bool require_entry) {
|
||||
auto rs = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
|
||||
|
||||
if (!rs || rs->empty()) {
|
||||
co_return std::nullopt;
|
||||
if (require_entry) {
|
||||
on_internal_error(slogger, format("no entry for request id {}", id));
|
||||
} else {
|
||||
co_return topology_requests_entry{
|
||||
.id = utils::null_uuid()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const auto& row = rs->one();
|
||||
co_return topology_request_row_to_entry(id, row);
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit) {
|
||||
sstring request_types_str = "";
|
||||
bool first = true;
|
||||
for (const auto& rt : request_types) {
|
||||
if (!std::exchange(first, false)) {
|
||||
request_types_str += ", ";
|
||||
}
|
||||
request_types_str += std::visit([] (auto&& arg) { return fmt::format("'{}'", arg); }, rt);
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
|
||||
// Running requests.
|
||||
auto rs_running = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, request_types_str));
|
||||
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS,
|
||||
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
|
||||
|
||||
|
||||
// Requests which finished after end_time_limit.
|
||||
auto rs_done = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(), request_types_str));
|
||||
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(),
|
||||
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
|
||||
|
||||
topology_requests_entries m;
|
||||
for (const auto& row: *rs_done) {
|
||||
@@ -3674,16 +3608,6 @@ future<system_keyspace::topology_requests_entries> system_keyspace::get_topology
|
||||
co_return m;
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
|
||||
return get_topology_request_entries({
|
||||
service::topology_request::join,
|
||||
service::topology_request::replace,
|
||||
service::topology_request::rebuild,
|
||||
service::topology_request::leave,
|
||||
service::topology_request::remove
|
||||
}, end_time_limit);
|
||||
}
|
||||
|
||||
future<mutation> system_keyspace::get_insert_dict_mutation(
|
||||
std::string_view name,
|
||||
bytes data,
|
||||
|
||||
@@ -163,7 +163,6 @@ public:
|
||||
static constexpr auto NAME = "system";
|
||||
static constexpr auto HINTS = "hints";
|
||||
static constexpr auto BATCHLOG = "batchlog";
|
||||
static constexpr auto BATCHLOG_V2 = "batchlog_v2";
|
||||
static constexpr auto PAXOS = "paxos";
|
||||
static constexpr auto BUILT_INDEXES = "IndexInfo";
|
||||
static constexpr auto LOCAL = "local";
|
||||
@@ -199,8 +198,6 @@ public:
|
||||
static constexpr auto VIEW_BUILD_STATUS_V2 = "view_build_status_v2";
|
||||
static constexpr auto DICTS = "dicts";
|
||||
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
|
||||
static constexpr auto CLIENT_ROUTES = "client_routes";
|
||||
static constexpr auto VERSIONS = "versions";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
@@ -258,7 +255,6 @@ public:
|
||||
|
||||
static schema_ptr hints();
|
||||
static schema_ptr batchlog();
|
||||
static schema_ptr batchlog_v2();
|
||||
static schema_ptr paxos();
|
||||
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
static schema_ptr raft();
|
||||
@@ -278,7 +274,6 @@ public:
|
||||
static schema_ptr view_build_status_v2();
|
||||
static schema_ptr dicts();
|
||||
static schema_ptr view_building_tasks();
|
||||
static schema_ptr client_routes();
|
||||
|
||||
// auth
|
||||
static schema_ptr roles();
|
||||
@@ -670,9 +665,7 @@ public:
|
||||
|
||||
future<service::topology_request_state> get_topology_request_state(utils::UUID id, bool require_entry);
|
||||
topology_requests_entry topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row);
|
||||
future<topology_requests_entry> get_topology_request_entry(utils::UUID id);
|
||||
future<std::optional<topology_requests_entry>> get_topology_request_entry_opt(utils::UUID id);
|
||||
future<system_keyspace::topology_requests_entries> get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit);
|
||||
future<topology_requests_entry> get_topology_request_entry(utils::UUID id, bool require_entry);
|
||||
future<topology_requests_entries> get_node_ops_request_entries(db_clock::time_point end_time_limit);
|
||||
|
||||
public:
|
||||
|
||||
@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
|
||||
} catch (...) {
|
||||
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
|
||||
sleep = true;
|
||||
}
|
||||
|
||||
if (sleep) {
|
||||
vbw_logger.debug("Sleeping after exception.");
|
||||
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
|
||||
|
||||
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
|
||||
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
|
||||
auto it = vbw._state._batch->tasks.begin();
|
||||
while (it != vbw._state._batch->tasks.end()) {
|
||||
auto id = it->first;
|
||||
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
|
||||
|
||||
++it; // Advance the iterator before potentially removing the entry from the map.
|
||||
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
|
||||
for (auto& [id, t]: tasks_map) {
|
||||
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
|
||||
if (!task_opt || task_opt->get().aborted) {
|
||||
co_await vbw._state._batch->abort_task(id);
|
||||
}
|
||||
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
if (processing_base_table != building_state.currently_processed_base_table) {
|
||||
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
|
||||
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
tasks.insert({id, *task_opt});
|
||||
}
|
||||
#ifdef SEASTAR_DEBUG
|
||||
{
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
co_return collect_completed_tasks();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -605,8 +605,8 @@ public:
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
|
||||
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
|
||||
.with_column("key", utf8_type, column_kind::partition_key)
|
||||
.with_column("version", utf8_type)
|
||||
.with_column("build_mode", utf8_type)
|
||||
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
|
||||
.with_column("ssl_protocol", utf8_type)
|
||||
.with_column("username", utf8_type)
|
||||
.with_column("scheduling_group", utf8_type)
|
||||
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Collect
|
||||
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
|
||||
using client_data_vec = utils::chunked_vector<client_data>;
|
||||
using shard_client_data = std::vector<client_data_vec>;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
|
||||
cd_vec.resize(smp::count);
|
||||
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
|
||||
for (unsigned i = 0; i < smp::count; i++) {
|
||||
for (auto&& ps_cdc : *cd_vec[i]) {
|
||||
for (auto&& cd : ps_cdc) {
|
||||
if (cd_map.contains(cd->ip)) {
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
if (cd_map.contains(cd.ip)) {
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
} else {
|
||||
dht::decorated_key key = make_partition_key(cd->ip);
|
||||
dht::decorated_key key = make_partition_key(cd.ip);
|
||||
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
|
||||
ips.insert(decorated_ip{std::move(key), cd->ip});
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
ips.insert(decorated_ip{std::move(key), cd.ip});
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
|
||||
co_await result.emit_partition_start(dip.key);
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
|
||||
return a.port < b.port || a.client_type_str() < b.client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd->shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd->stage_str());
|
||||
if (cd->driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
|
||||
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd.shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd.stage_str());
|
||||
if (cd.driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", *cd.driver_name);
|
||||
}
|
||||
if (cd->driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
|
||||
if (cd.driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", *cd.driver_version);
|
||||
}
|
||||
if (cd->hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd->hostname);
|
||||
if (cd.hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd.hostname);
|
||||
}
|
||||
if (cd->protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
|
||||
if (cd.protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
|
||||
}
|
||||
if (cd->ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
|
||||
if (cd.ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
|
||||
}
|
||||
if (cd->ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
|
||||
if (cd.ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
|
||||
}
|
||||
if (cd->ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
|
||||
if (cd.ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
|
||||
}
|
||||
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
|
||||
if (cd->scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
|
||||
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
|
||||
if (cd.scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(
|
||||
utf8_type,
|
||||
utf8_type,
|
||||
false
|
||||
);
|
||||
|
||||
auto prepare_client_options = [] (const auto& client_options) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& co: client_options) {
|
||||
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
};
|
||||
|
||||
set_cell(cr.cells(), "client_options",
|
||||
make_map_value(map_type, prepare_client_options(cd->client_options)));
|
||||
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
@@ -1120,10 +1100,9 @@ public:
|
||||
}
|
||||
|
||||
auto tm = _db.local().get_token_metadata_ptr();
|
||||
auto target_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
|
||||
|
||||
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
|
||||
|
||||
locator::load_sketch load(tm, stats, default_tablet_size);
|
||||
locator::load_sketch load(tm);
|
||||
co_await load.populate();
|
||||
|
||||
tm->get_topology().for_each_node([&] (const auto& node) {
|
||||
@@ -1137,23 +1116,18 @@ public:
|
||||
if (auto ip = _gossiper.local().get_address_map().find(host)) {
|
||||
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
|
||||
}
|
||||
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
|
||||
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
|
||||
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
|
||||
set_cell(r.cells(), "tablets_allocated", load.get_load(host));
|
||||
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_shard_load(host))));
|
||||
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_load(host) * target_tablet_size)));
|
||||
|
||||
if (stats && stats->capacity.contains(host)) {
|
||||
auto capacity = stats->capacity.at(host);
|
||||
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
|
||||
|
||||
if (auto utilization = load.get_allocated_utilization(host)) {
|
||||
auto utilization = load.get_allocated_utilization(host, *stats, target_tablet_size);
|
||||
if (utilization) {
|
||||
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
|
||||
}
|
||||
if (load.has_complete_data(host)) {
|
||||
if (auto utilization = load.get_storage_utilization(host)) {
|
||||
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
|
||||
}
|
||||
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
|
||||
}
|
||||
}
|
||||
mutation_sink(m);
|
||||
});
|
||||
@@ -1173,8 +1147,6 @@ private:
|
||||
.with_column("storage_capacity", long_type)
|
||||
.with_column("storage_allocated_load", long_type)
|
||||
.with_column("storage_allocated_utilization", double_type)
|
||||
.with_column("storage_load", long_type)
|
||||
.with_column("storage_utilization", double_type)
|
||||
.with_sharder(1, 0) // shard0-only
|
||||
.with_hash_version()
|
||||
.build();
|
||||
|
||||
@@ -71,7 +71,7 @@ Use "Bash on Ubuntu on Windows" for the same tools and capabilities as on Linux
|
||||
|
||||
### Building the Docs
|
||||
|
||||
1. Run `make preview` in the `docs/` directory to build the documentation.
|
||||
1. Run `make preview` to build the documentation.
|
||||
1. Preview the built documentation locally at http://127.0.0.1:5500/.
|
||||
|
||||
### Cleanup
|
||||
|
||||
@@ -41,8 +41,6 @@ class MetricsProcessor:
|
||||
# Get metrics from the file
|
||||
try:
|
||||
metrics_file = metrics.get_metrics_from_file(relative_path, "scylla_", metrics_info, strict=strict)
|
||||
except SystemExit:
|
||||
pass
|
||||
finally:
|
||||
os.chdir(old_cwd)
|
||||
if metrics_file:
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
# Alternator: DynamoDB API in ScyllaDB
|
||||
# Alternator: DynamoDB API in Scylla
|
||||
|
||||
## Introduction
|
||||
Alternator is a ScyllaDB feature adding compatibility with Amazon DynamoDB(TM).
|
||||
Alternator is a Scylla feature adding compatibility with Amazon DynamoDB(TM).
|
||||
DynamoDB's API uses JSON-encoded requests and responses which are sent over
|
||||
an HTTP or HTTPS transport. It is described in detail in Amazon's [DynamoDB
|
||||
API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/).
|
||||
|
||||
Our goal is that any application written to use Amazon DynamoDB could
|
||||
be run, unmodified, against ScyllaDB with Alternator enabled. Alternator's
|
||||
be run, unmodified, against Scylla with Alternator enabled. Alternator's
|
||||
compatibility with DynamoDB is fairly complete, but users should be aware
|
||||
of some differences and some unimplemented features. The extent of
|
||||
Alternator's compatibility with DynamoDB is described in the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md) document,
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md) document,
|
||||
which is updated as the work on Alternator progresses and compatibility
|
||||
continues to improve.
|
||||
|
||||
@@ -19,8 +19,8 @@ Alternator also adds several features and APIs that are not available in
|
||||
DynamoDB. These are described in [Alternator-specific APIs](new-apis.md).
|
||||
|
||||
## Running Alternator
|
||||
By default, ScyllaDB does not listen for DynamoDB API requests. To enable
|
||||
this API in ScyllaDB you must set at least two configuration options,
|
||||
By default, Scylla does not listen for DynamoDB API requests. To enable
|
||||
this API in Scylla you must set at least two configuration options,
|
||||
**alternator_port** and **alternator_write_isolation**. For example in the
|
||||
YAML configuration file:
|
||||
```yaml
|
||||
@@ -30,7 +30,7 @@ alternator_write_isolation: only_rmw_uses_lwt # or always, forbid or unsafe
|
||||
or, equivalently, via command-line arguments: `--alternator-port=8000
|
||||
--alternator-write-isolation=only_rmw_uses_lwt.
|
||||
|
||||
the **alternator_port** option determines on which port ScyllaDB listens for
|
||||
the **alternator_port** option determines on which port Scylla listens for
|
||||
DynamoDB API requests. By default, it listens on this port on all network
|
||||
interfaces. To listen only on a specific interface, configure also the
|
||||
**alternator_address** option.
|
||||
@@ -41,12 +41,12 @@ Alternator has four different choices
|
||||
for the implementation of writes, each with different advantages. You should
|
||||
carefully consider which of the options makes more sense for your intended
|
||||
use case and configure alternator_write_isolation accordingly. There is
|
||||
currently no default for this option: Trying to run ScyllaDB with an Alternator
|
||||
currently no default for this option: Trying to run Scylla with an Alternator
|
||||
port selected but without configuring write isolation will result in an error message,
|
||||
asking you to set it.
|
||||
|
||||
In addition to (or instead of) serving HTTP requests on alternator_port,
|
||||
ScyllaDB can accept DynamoDB API requests over HTTPS (encrypted), on the port
|
||||
Scylla can accept DynamoDB API requests over HTTPS (encrypted), on the port
|
||||
specified by **alternator_https_port**. As usual for HTTPS servers, the
|
||||
operator must specify certificate and key files. By default these should
|
||||
be placed in `/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key`, but
|
||||
@@ -54,7 +54,7 @@ these default locations can overridden by specifying
|
||||
`--alternator-encryption-options keyfile="..."` and
|
||||
`--alternator-encryption-options certificate="..."`.
|
||||
|
||||
By default, ScyllaDB saves a snapshot of deleted tables. But Alternator does
|
||||
By default, Scylla saves a snapshot of deleted tables. But Alternator does
|
||||
not offer an API to restore these snapshots, so these snapshots are not useful
|
||||
and waste disk space - deleting a table does not recover any disk space.
|
||||
It is therefore recommended to disable this automatic-snapshotting feature
|
||||
@@ -73,11 +73,11 @@ itself. Instructions, code and examples for doing this can be found in the
|
||||
|
||||
This section provides only a very brief introduction to Alternator's
|
||||
design. A much more detailed document about the features of the DynamoDB
|
||||
API and how they are, or could be, implemented in ScyllaDB can be found in:
|
||||
API and how they are, or could be, implemented in Scylla can be found in:
|
||||
<https://docs.google.com/document/d/1i4yjF5OSAazAY_-T8CBce9-2ykW4twx_E_Nt2zDoOVs>
|
||||
|
||||
Almost all of Alternator's source code (except some initialization code)
|
||||
can be found in the alternator/ subdirectory of ScyllaDB's source code.
|
||||
can be found in the alternator/ subdirectory of Scylla's source code.
|
||||
Extensive functional tests can be found in the test/alternator
|
||||
subdirectory. These tests are written in Python, and can be run against
|
||||
both Alternator and Amazon's DynamoDB; This allows verifying that
|
||||
@@ -85,15 +85,15 @@ Alternator's behavior matches the one observed on DynamoDB.
|
||||
See test/alternator/README.md for more information about the tests and
|
||||
how to run them.
|
||||
|
||||
With Alternator enabled on port 8000 (for example), every ScyllaDB node
|
||||
With Alternator enabled on port 8000 (for example), every Scylla node
|
||||
listens for DynamoDB API requests on this port. These requests, in
|
||||
JSON format over HTTP, are parsed and result in calls to internal Scylla
|
||||
C++ functions - there is no CQL generation or parsing involved.
|
||||
In ScyllaDB terminology, the node receiving the request acts as the
|
||||
In Scylla terminology, the node receiving the request acts as the
|
||||
*coordinator*, and often passes the request on to one or more other nodes -
|
||||
*replicas* which hold copies of the requested data.
|
||||
|
||||
Alternator tables are stored as ScyllaDB tables, each in a separate keyspace.
|
||||
Alternator tables are stored as Scylla tables, each in a separate keyspace.
|
||||
Each keyspace is initialized when the corresponding Alternator table is
|
||||
created (with a CreateTable request). The replication factor (RF) for this
|
||||
keyspace is chosen at that point, depending on the size of the cluster:
|
||||
@@ -101,19 +101,19 @@ RF=3 is used on clusters with three or more nodes, and RF=1 is used for
|
||||
smaller clusters. Such smaller clusters are, of course, only recommended
|
||||
for tests because of the risk of data loss.
|
||||
|
||||
Each table in Alternator is stored as a ScyllaDB table in a separate
|
||||
Each table in Alternator is stored as a Scylla table in a separate
|
||||
keyspace. The DynamoDB key columns (hash and sort key) have known types,
|
||||
and become partition and clustering key columns of the ScyllaDB table.
|
||||
and become partition and clustering key columns of the Scylla table.
|
||||
All other attributes may be different for each row, so are stored in one
|
||||
map column in ScyllaDB, and not as separate columns.
|
||||
map column in Scylla, and not as separate columns.
|
||||
|
||||
DynamoDB supports two consistency levels for reads, "eventual consistency"
|
||||
and "strong consistency". These two modes are implemented using ScyllaDB's CL
|
||||
and "strong consistency". These two modes are implemented using Scylla's CL
|
||||
(consistency level) feature: All writes are done using the `LOCAL_QUORUM`
|
||||
consistency level, then strongly-consistent reads are done with
|
||||
`LOCAL_QUORUM`, while eventually-consistent reads are with just `LOCAL_ONE`.
|
||||
|
||||
In ScyllaDB (and its inspiration, Cassandra), high write performance is
|
||||
In Scylla (and its inspiration, Cassandra), high write performance is
|
||||
achieved by ensuring that writes do not require reads from disk.
|
||||
The DynamoDB API, however, provides many types of requests that need a read
|
||||
before the write (a.k.a. RMW requests - read-modify-write). For example,
|
||||
@@ -121,7 +121,7 @@ a request may copy an existing attribute, increment an attribute,
|
||||
be conditional on some expression involving existing values of attribute,
|
||||
or request that the previous values of attributes be returned. These
|
||||
read-modify-write transactions should be _isolated_ from each other, so
|
||||
by default Alternator implements every write operation using ScyllaDB's
|
||||
by default Alternator implements every write operation using Scylla's
|
||||
LWT (lightweight transactions). This default can be overridden on a per-table
|
||||
basis, by tagging the table as explained above in the "write isolation
|
||||
policies" section.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# ScyllaDB Alternator for DynamoDB users
|
||||
|
||||
ScyllaDB supports the DynamoDB API (this feature is codenamed "Alternator").
|
||||
Scylla supports the DynamoDB API (this feature is codenamed "Alternator").
|
||||
Our goal is to support any application written for Amazon DynamoDB.
|
||||
Nevertheless, there are a few differences between DynamoDB and Scylla, and
|
||||
and a few DynamoDB features that have not yet been implemented in Scylla.
|
||||
@@ -8,16 +8,16 @@ The purpose of this document is to inform users of these differences.
|
||||
|
||||
## Provisioning
|
||||
|
||||
The most obvious difference between DynamoDB and ScyllaDB is that while
|
||||
DynamoDB is a shared cloud service, ScyllaDB is a dedicated service running
|
||||
The most obvious difference between DynamoDB and Scylla is that while
|
||||
DynamoDB is a shared cloud service, Scylla is a dedicated service running
|
||||
on your private cluster. Whereas DynamoDB allows you to "provision" the
|
||||
number of requests per second you'll need - or at an extra cost not even
|
||||
provision that - ScyllaDB requires you to provision your cluster. You need
|
||||
provision that - Scylla requires you to provision your cluster. You need
|
||||
to reason about the number and size of your nodes - not the throughput.
|
||||
|
||||
Moreover, DynamoDB's per-table provisioning (`BillingMode=PROVISIONED`) is
|
||||
not yet supported by Scylla. The BillingMode and ProvisionedThroughput options
|
||||
on a table need to be valid but are ignored, and ScyllaDB behaves like DynamoDB's
|
||||
on a table need to be valid but are ignored, and Scylla behaves like DynamoDB's
|
||||
`BillingMode=PAY_PER_REQUEST`: All requests are accepted without a per-table
|
||||
throughput cap.
|
||||
|
||||
@@ -33,7 +33,7 @@ Instructions for doing this can be found in:
|
||||
|
||||
## Write isolation policies
|
||||
|
||||
ScyllaDB was designed to optimize the performance of pure write operations -
|
||||
Scylla was designed to optimize the performance of pure write operations -
|
||||
writes which do not need to read the previous value of the item.
|
||||
In CQL, writes which do need the previous value of the item must explicitly
|
||||
use the slower LWT ("LightWeight Transaction") feature to be correctly
|
||||
@@ -79,11 +79,11 @@ a _higher_ timestamp - and this will be the "last write" that wins.
|
||||
To avoid or mitigate this write reordering issue, users may consider
|
||||
one or more of the following:
|
||||
|
||||
1. Use NTP to keep the clocks on the different ScyllaDB nodes synchronized.
|
||||
1. Use NTP to keep the clocks on the different Scylla nodes synchronized.
|
||||
If the delay between the two writes is longer than NTP's accuracy,
|
||||
they will not be reordered.
|
||||
2. If an application wants to ensure that two specific writes are not
|
||||
reordered, it should send both requests to the same ScyllaDB node.
|
||||
reordered, it should send both requests to the same Scylla node.
|
||||
Care should be taken when using a load balancer - which might redirect
|
||||
two requests to two different nodes.
|
||||
3. Consider using the `always_use_lwt` write isolation policy.
|
||||
@@ -210,7 +210,7 @@ CREATE SERVICE_LEVEL IF NOT EXISTS oltp WITH SHARES = 1000;
|
||||
ATTACH SERVICE_LEVEL olap TO alice;
|
||||
ATTACH SERVICE_LEVEL oltp TO bob;
|
||||
```
|
||||
Note that `alternator_enforce_authorization` has to be enabled in ScyllaDB configuration.
|
||||
Note that `alternator_enforce_authorization` has to be enabled in Scylla configuration.
|
||||
|
||||
See [Authorization](##Authorization) section to learn more about roles and authorization.
|
||||
See [Workload Prioritization](../features/workload-prioritization)
|
||||
@@ -218,11 +218,11 @@ to read about Workload Prioritization in detail.
|
||||
|
||||
## Metrics
|
||||
|
||||
ScyllaDB has an advanced and extensive monitoring framework for inspecting
|
||||
and graphing hundreds of different metrics of ScyllaDB's usage and performance.
|
||||
ScyllaDB's monitoring stack, based on Grafana and Prometheus, is described in
|
||||
Scylla has an advanced and extensive monitoring framework for inspecting
|
||||
and graphing hundreds of different metrics of Scylla's usage and performance.
|
||||
Scylla's monitoring stack, based on Grafana and Prometheus, is described in
|
||||
<https://docs.scylladb.com/operating-scylla/monitoring/>.
|
||||
This monitoring stack is different from DynamoDB's offering - but ScyllaDB's
|
||||
This monitoring stack is different from DynamoDB's offering - but Scylla's
|
||||
is significantly more powerful and gives the user better insights on
|
||||
the internals of the database and its performance.
|
||||
|
||||
@@ -248,7 +248,7 @@ data in different partition order. Applications mustn't rely on that
|
||||
undocumented order.
|
||||
|
||||
Note that inside each partition, the individual items will be sorted the same
|
||||
in DynamoDB and ScyllaDB - determined by the _sort key_ defined for that table.
|
||||
in DynamoDB and Scylla - determined by the _sort key_ defined for that table.
|
||||
|
||||
---
|
||||
|
||||
@@ -271,16 +271,10 @@ is different, or can be configured in Alternator:
|
||||
So for example, if you create a table whose name is 192 characters, you
|
||||
can't create a GSI whose name is longer than 29 characters.
|
||||
|
||||
* DynamoDB's DescribeTable will return information about the table. According to
|
||||
AWS documentation, fields TableSizeBytes, IndexSizeBytes and ItemCount can
|
||||
lag behind by up to 6 hours.
|
||||
The `alternator_describe_table_info_cache_validity_in_seconds` parameter allows
|
||||
users to change this timeout - the default value in seconds is set to 21600 (6 hours).
|
||||
|
||||
## Experimental API features
|
||||
|
||||
Some DynamoDB API features are supported by Alternator, but considered
|
||||
**experimental** in this release. An experimental feature in ScyllaDB is a
|
||||
**experimental** in this release. An experimental feature in Scylla is a
|
||||
feature whose functionality is complete, or mostly complete, but it is not
|
||||
as thoroughly tested or optimized as regular features. Also, an experimental
|
||||
feature's implementation is still subject to change and upgrades may not be
|
||||
@@ -296,14 +290,6 @@ experimental:
|
||||
considered experimental so needs to be enabled explicitly with the
|
||||
`--experimental-features=alternator-streams` configuration option.
|
||||
|
||||
In this version, Alternator Streams is only supported if the base table
|
||||
uses vnodes instead of tablets. However, by default new tables use tablets
|
||||
so to create a table that can be used with Streams, you must set the tag
|
||||
`system:initial_tablets` set to `none` during CreateTable - so that the
|
||||
new table will use vnodes. Streams cannot be enabled on an already-existing
|
||||
table that uses tablets.
|
||||
See <https://github.com/scylladb/scylla/issues/23838>.
|
||||
|
||||
Alternator streams also differ in some respects from DynamoDB Streams:
|
||||
* The number of separate "shards" in Alternator's streams is significantly
|
||||
larger than is typical on DynamoDB.
|
||||
@@ -365,8 +351,8 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
|
||||
* The on-demand backup APIs are not supported: CreateBackup, DescribeBackup,
|
||||
DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
For now, users can use ScyllaDB's existing backup solutions such as snapshots
|
||||
or ScyllaDB Manager.
|
||||
For now, users can use Scylla's existing backup solutions such as snapshots
|
||||
or Scylla Manager.
|
||||
<https://github.com/scylladb/scylla/issues/5063>
|
||||
|
||||
* Continuous backup (the ability to restore any point in time) is also not
|
||||
@@ -384,21 +370,21 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
<https://github.com/scylladb/scylla/issues/5068>
|
||||
|
||||
* DAX (DynamoDB Accelerator), an in-memory cache for DynamoDB, is not
|
||||
available in for Alternator. Anyway, it should not be necessary - ScyllaDB's
|
||||
available in for Alternator. Anyway, it should not be necessary - Scylla's
|
||||
internal cache is already rather advanced and there is no need to place
|
||||
another cache in front of the it. We wrote more about this here:
|
||||
<https://www.scylladb.com/2017/07/31/database-caches-not-good/>
|
||||
|
||||
* The DescribeTable is missing some information about size estimates
|
||||
(IndexSizeBytes and ItemCount - TableSizeBytes is available), and also
|
||||
part of the information about indexes enabled on the table.
|
||||
* The DescribeTable is missing information about size estimates, and
|
||||
also part of the information about indexes enabled on the table.
|
||||
<https://github.com/scylladb/scylla/issues/5320>
|
||||
<https://github.com/scylladb/scylla/issues/7550>
|
||||
<https://github.com/scylladb/scylla/issues/7551>
|
||||
|
||||
* The PartiQL syntax (SQL-like SELECT/UPDATE/INSERT/DELETE expressions)
|
||||
and the operations ExecuteStatement, BatchExecuteStatement and
|
||||
ExecuteTransaction are not yet supported.
|
||||
A user that is interested in an SQL-like syntax can consider using ScyllaDB's
|
||||
A user that is interested in an SQL-like syntax can consider using Scylla's
|
||||
CQL protocol instead.
|
||||
This feature was added to DynamoDB in November 2020.
|
||||
<https://github.com/scylladb/scylla/issues/8787>
|
||||
@@ -407,7 +393,7 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
which is different from AWS's. In particular, the operations
|
||||
DescribeContributorInsights, ListContributorInsights and
|
||||
UpdateContributorInsights that configure Amazon's "CloudWatch Contributor
|
||||
Insights" are not yet supported. ScyllaDB has different ways to retrieve the
|
||||
Insights" are not yet supported. Scylla has different ways to retrieve the
|
||||
same information, such as which items were accessed most often.
|
||||
<https://github.com/scylladb/scylla/issues/8788>
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ This section will guide you through the steps for setting up the cluster:
|
||||
<https://hub.docker.com/r/scylladb/scylla/>, but add to every `docker run`
|
||||
command a `-p 8000:8000` before the image name and
|
||||
`--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port ScyllaDB will listen for
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
@@ -24,10 +24,10 @@ This section will guide you through the steps for setting up the cluster:
|
||||
By default, ScyllaDB run in this way will not have authentication or
|
||||
authorization enabled, and any DynamoDB API request will be honored without
|
||||
requiring them to be signed appropriately. See the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md#authentication-and-authorization)
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md#authentication-and-authorization)
|
||||
document on how to configure authentication and authorization.
|
||||
|
||||
## Testing ScyllaDB's DynamoDB API support:
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
1. Follow the instructions on the [AWS github page](https://github.com/awsdocs/amazon-dynamodb-developer-guide/blob/master/doc_source/TicTacToe.Phase1.md)
|
||||
2. Enjoy your tic-tac-toe game :-)
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
Alternator's primary goal is to be compatible with Amazon DynamoDB(TM)
|
||||
and its APIs, so that any application written to use Amazon DynamoDB could
|
||||
be run, unmodified, against ScyllaDB with Alternator enabled. The extent of
|
||||
be run, unmodified, against Scylla with Alternator enabled. The extent of
|
||||
Alternator's compatibility with DynamoDB is described in the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md) document.
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md) document.
|
||||
|
||||
But Alternator also adds several features and APIs that are not available in
|
||||
DynamoDB. These Alternator-specific APIs are documented here.
|
||||
@@ -15,7 +15,7 @@ _conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
Alternator could do this isolation by using ScyllaDB's LWT (lightweight
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
@@ -41,7 +41,7 @@ isolation policy for a specific table can be overridden by tagging the table
|
||||
which need a read before the write. An attempt to use such statements
|
||||
(e.g., UpdateItem with a ConditionExpression) will result in an error.
|
||||
In this mode, the remaining write requests which are allowed - pure writes
|
||||
without a read - are performed using standard ScyllaDB writes, not LWT,
|
||||
without a read - are performed using standard Scylla writes, not LWT,
|
||||
so they are significantly faster than they would have been in the
|
||||
`always_use_lwt`, but their isolation is still correct.
|
||||
|
||||
@@ -65,19 +65,19 @@ isolation policy for a specific table can be overridden by tagging the table
|
||||
read-modify-write updates. This mode is not recommended for any use case,
|
||||
and will likely be removed in the future.
|
||||
|
||||
## Accessing system tables from ScyllaDB
|
||||
ScyllaDB exposes lots of useful information via its internal system tables,
|
||||
## Accessing system tables from Scylla
|
||||
Scylla exposes lots of useful information via its internal system tables,
|
||||
which can be found in system keyspaces: 'system', 'system\_auth', etc.
|
||||
In order to access to these tables via alternator interface,
|
||||
Scan and Query requests can use a special table name:
|
||||
`.scylla.alternator.KEYSPACE_NAME.TABLE_NAME`
|
||||
which will return results fetched from corresponding ScyllaDB table.
|
||||
which will return results fetched from corresponding Scylla table.
|
||||
|
||||
This interface can be used only to fetch data from system tables.
|
||||
Attempts to read regular tables via the virtual interface will result
|
||||
in an error.
|
||||
|
||||
Example: in order to query the contents of ScyllaDB's `system.large_rows`,
|
||||
Example: in order to query the contents of Scylla's `system.large_rows`,
|
||||
pass `TableName='.scylla.alternator.system.large_rows'` to a Query/Scan
|
||||
request.
|
||||
|
||||
@@ -113,14 +113,14 @@ connection (either active or idle), not necessarily an active request as
|
||||
in Alternator.
|
||||
|
||||
## Service discovery
|
||||
As explained in [ScyllaDB Alternator for DynamoDB users](compatibility.md),
|
||||
As explained in [Scylla Alternator for DynamoDB users](compatibility.md),
|
||||
Alternator requires a load-balancer or a client-side load-balancing library
|
||||
to distribute requests between all ScyllaDB nodes. This load-balancer needs
|
||||
to be able to _discover_ the ScyllaDB nodes. Alternator provides two special
|
||||
to distribute requests between all Scylla nodes. This load-balancer needs
|
||||
to be able to _discover_ the Scylla nodes. Alternator provides two special
|
||||
requests, `/` and `/localnodes`, to help with this service discovery, which
|
||||
we will now explain.
|
||||
|
||||
Some setups know exactly which ScyllaDB nodes were brought up, so all that
|
||||
Some setups know exactly which Scylla nodes were brought up, so all that
|
||||
remains is to periodically verify that each node is still functional. The
|
||||
easiest way to do this is to make an HTTP (or HTTPS) GET request to the node,
|
||||
with URL `/`. This is a trivial GET request and does **not** need to be
|
||||
@@ -133,10 +133,10 @@ $ curl http://localhost:8000/
|
||||
healthy: localhost:8000
|
||||
```
|
||||
|
||||
In other setups, the load balancer might not know which ScyllaDB nodes exist.
|
||||
For example, it may be possible to add or remove ScyllaDB nodes without a
|
||||
In other setups, the load balancer might not know which Scylla nodes exist.
|
||||
For example, it may be possible to add or remove Scylla nodes without a
|
||||
client-side load balancer knowing. For these setups we have the `/localnodes`
|
||||
request that can be used to discover which ScyllaDB nodes exist: A load balancer
|
||||
request that can be used to discover which Scylla nodes exist: A load balancer
|
||||
that already knows at least one live node can discover the rest by sending
|
||||
a `/localnodes` request to the known node. It's again an unauthenticated
|
||||
HTTP (or HTTPS) GET request:
|
||||
@@ -160,7 +160,7 @@ list the nodes in a specific _data center_ or _rack_. These options are
|
||||
useful for certain use cases:
|
||||
|
||||
* A `dc` option (e.g., `/localnodes?dc=dc1`) can be passed to list the
|
||||
nodes in a specific ScyllaDB data center, not the data center of the node
|
||||
nodes in a specific Scylla data center, not the data center of the node
|
||||
being contacted. This is useful when a client knowns of _some_ Scylla
|
||||
node belonging to an unknown DC, but wants to list the nodes in _its_
|
||||
DC, which it knows by name.
|
||||
@@ -191,7 +191,7 @@ tells them to.
|
||||
|
||||
If you want to influence whether a specific Alternator table is created with tablets or vnodes,
|
||||
you can do this by specifying the `system:initial_tablets` tag
|
||||
(in earlier versions of ScyllaDB the tag was `experimental:initial_tablets`)
|
||||
(in earlier versions of Scylla the tag was `experimental:initial_tablets`)
|
||||
in the CreateTable operation. The value of this tag can be:
|
||||
|
||||
* Any valid integer as the value of this tag enables tablets.
|
||||
|
||||
@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
|
||||
@@ -1043,8 +1043,6 @@ The following modes are available:
|
||||
* - ``immediate``
|
||||
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
|
||||
@@ -272,17 +272,9 @@ For example::
|
||||
This query returns up to 5 rows with the closest distance of ``embedding`` vector to the provided query vector,
|
||||
in this case ``[0.1, 0.2, 0.3, 0.4]``.
|
||||
|
||||
There's also possibility to return the similarity score along with the results by using the :ref:`similarity functions <vector-similarity-functions>`.
|
||||
|
||||
For example::
|
||||
|
||||
SELECT image_id, similarity_cosine(embedding, [0.1, 0.2, 0.3, 0.4])
|
||||
FROM ImageEmbeddings
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
.. warning::
|
||||
|
||||
Currently, vector queries do not support filtering with ``WHERE`` clause,
|
||||
Currently, vector queries do not support filtering with ``WHERE`` clause, returning similarity distances,
|
||||
grouping with ``GROUP BY`` and paging. This will be added in the future releases.
|
||||
|
||||
|
||||
|
||||
@@ -227,39 +227,6 @@ A number of functions are provided to “convert” the native types into binary
|
||||
takes a 64-bit ``blob`` argument and converts it to a ``bigint`` value. For example, ``bigintAsBlob(3)`` is
|
||||
``0x0000000000000003`` and ``blobAsBigint(0x0000000000000003)`` is ``3``.
|
||||
|
||||
|
||||
.. _vector-similarity-functions:
|
||||
|
||||
Vector similarity functions
|
||||
```````````````````````````
|
||||
|
||||
To obtain the similarity of the given vectors, use a ``SELECT`` query::
|
||||
|
||||
SELECT comment, similarity_cosine(comment_vector, [0.1, 0.15, 0.3, 0.12, 0.05])
|
||||
FROM cycling.comments_vs;
|
||||
|
||||
The supported functions for this type of query are:
|
||||
|
||||
- ``similarity_cosine``
|
||||
- ``similarity_euclidean``
|
||||
- ``similarity_dot_product``
|
||||
|
||||
with the parameters of (``<vector>``, ``<vector>``).
|
||||
|
||||
The ``vector`` is either the name of the float vector column or :ref:`vector of floats <vectors>`.
|
||||
Both arguments must be of the same dimension.
|
||||
|
||||
These functions return a ``float`` value representing the similarity between the given vectors for each row.
|
||||
The similarity value is a floating-point number in a range of [0, 1] that describes how similar two vectors are.
|
||||
Values closer to 1 indicate greater similarity.
|
||||
The ``similarity_euclidean`` and ``similarity_dot_product`` functions do not perform vector normalization prior to computing similarity.
|
||||
|
||||
.. note::
|
||||
The ``similarity_dot_product`` function assumes that all input vectors are L2-normalized.
|
||||
Supplying non-normalized vectors will result in dot product values that do not represent cosine similarity and therefore are not meaningful for similarity comparison.
|
||||
If the input vectors are not normalized, consider using the ``similarity_cosine`` function instead.
|
||||
|
||||
|
||||
.. _udfs:
|
||||
|
||||
User-defined functions :label-caution:`Experimental`
|
||||
|
||||
@@ -102,7 +102,6 @@ Additional Information
|
||||
|
||||
To learn more about TTL, and see a hands-on example, check out `this lesson <https://university.scylladb.com/courses/data-modeling/lessons/advanced-data-modeling/topic/expiring-data-with-ttl-time-to-live/>`_ on ScyllaDB University.
|
||||
|
||||
* `Video: Managing data expiration with Time-To-Live <https://www.youtube.com/watch?v=SXkbu7mFHeA>`_
|
||||
* :doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`
|
||||
* :doc:`KB Article:How to Change gc_grace_seconds for a Table </kb/gc-grace-seconds/>`
|
||||
* :doc:`KB Article:Time to Live (TTL) and Compaction </kb/ttl-facts/>`
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Introduction
|
||||
|
||||
Similar to the approach described in CASSANDRA-12151, we add the
|
||||
Similar to the approach described in CASSANDRA-14471, we add the
|
||||
concept of an audit specification. An audit has a target (syslog or a
|
||||
table) and a set of events/actions that it wants recorded. We
|
||||
introduce new CQL syntax for Scylla users to describe and manipulate
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
|
||||
## What is ScyllaDB?
|
||||
|
||||
ScyllaDB is a high-performance NoSQL database optimized for speed and scalability.
|
||||
It is designed to efficiently handle large volumes of data with minimal latency,
|
||||
making it ideal for data-intensive applications.
|
||||
|
||||
ScyllaDB is distributed under the [ScyllaDB Source Available License](https://github.com/scylladb/scylladb/blob/master/LICENSE-ScyllaDB-Source-Available.md).
|
||||
ScyllaDB is a high-performance NoSQL database system, fully compatible with Apache Cassandra.
|
||||
ScyllaDB is released under the GNU Affero General Public License version 3 and the Apache License, ScyllaDB is free and open-source software.
|
||||
|
||||
> [ScyllaDB](http://www.scylladb.com/)
|
||||
|
||||
|
||||
@@ -74,8 +74,6 @@ The keys and values are:
|
||||
as an indicator to which shard client wants to connect. The desired shard number
|
||||
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
|
||||
Its value is a decimal representation of type `uint16_t`, by default `19142`.
|
||||
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
|
||||
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
|
||||
|
||||
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
|
||||
`biased-token-round-robin`. To apply the algorithm,
|
||||
@@ -238,26 +236,3 @@ the same mechanism for other protocol versions, such as CQLv4.
|
||||
|
||||
The feature is identified by the `SCYLLA_USE_METADATA_ID` key, which is meant to be sent
|
||||
in the SUPPORTED message.
|
||||
|
||||
## Sending the CLIENT_ROUTES_CHANGE event
|
||||
|
||||
This extension allows a driver to update its connections when the
|
||||
`system.client_routes` table is modified.
|
||||
|
||||
In some network topologies a specific mapping of addresses and ports is required (e.g.
|
||||
to support Private Link). This mapping can change dynamically even when no nodes are
|
||||
added or removed. The driver must adapt to those changes; otherwise connectivity can be
|
||||
lost.
|
||||
|
||||
The extension is implemented as a new `EVENT` type: `CLIENT_ROUTES_CHANGE`. The event
|
||||
body consists of:
|
||||
- [string] change
|
||||
- [string list] connection_ids
|
||||
- [string list] host_ids
|
||||
|
||||
There is only one change value: `UPDATE_NODES`, which means at least one client route
|
||||
was inserted, updated, or deleted.
|
||||
|
||||
Events already have a subscription mechanism similar to protocol extensions (that is,
|
||||
the driver only receives the events it explicitly subscribed to), so no additional
|
||||
`cql_protocol_extension` key is introduced for this feature.
|
||||
|
||||
@@ -372,8 +372,6 @@ Columns:
|
||||
* `storage_allocated_load` - Disk space allocated for tablets, assuming each tablet has a fixed size (target_tablet_size).
|
||||
* `storage_allocated_utilization` - Fraction of node's disk capacity taken for `storage_allocated_load`, where 1.0 means full utilization.
|
||||
* `storage_capacity` - Total disk capacity in bytes. Used to compute `storage_allocated_utilization`. By default equal to file system's capacity.
|
||||
* `storage_load` - Disk space allocated for tablets, computed with actual tablet sizes. Can be null if some of the tablet sizes are not known.
|
||||
* `storage_utilization` - Fraction of node's disk capacity taken for `storage_load` (with actual tablet sizes), where 1.0 means full utilization. Can be null if some of the tablet sizes are not known.
|
||||
* `tablets_allocated` - Number of tablet replicas on the node. Migrating tablets are accounted as if migration already finished.
|
||||
* `tablets_allocated_per_shard` - `tablets_allocated` divided by shard count on the node.
|
||||
|
||||
|
||||
@@ -86,7 +86,6 @@ stateDiagram-v2
|
||||
de_left_token_ring --> [*]
|
||||
}
|
||||
state removing {
|
||||
re_left_token_ring : left_token_ring
|
||||
re_tablet_draining : tablet_draining
|
||||
re_tablet_migration : tablet_migration
|
||||
re_write_both_read_old : write_both_read_old
|
||||
@@ -99,8 +98,7 @@ stateDiagram-v2
|
||||
re_tablet_draining --> re_write_both_read_old
|
||||
re_write_both_read_old --> re_write_both_read_new: streaming completed
|
||||
re_write_both_read_old --> re_rollback_to_normal: rollback
|
||||
re_write_both_read_new --> re_left_token_ring
|
||||
re_left_token_ring --> [*]
|
||||
re_write_both_read_new --> [*]
|
||||
}
|
||||
rebuilding --> normal: streaming completed
|
||||
decommissioning --> left: operation succeeded
|
||||
@@ -124,10 +122,9 @@ Note that these are not all states, as there are other states specific to tablet
|
||||
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
|
||||
to modified token ring), reads are using old replicas.
|
||||
- `write_both_read_new` - as above, but reads are using new replicas.
|
||||
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
|
||||
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
|
||||
We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
|
||||
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
|
||||
moving the node we tried to decommission/remove back to the normal state.
|
||||
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
|
||||
@@ -144,9 +141,7 @@ reads that started before this point exist in the system. Finally we remove the
|
||||
transitioning state.
|
||||
|
||||
Decommission, removenode and replace work similarly, except they don't go through
|
||||
`commit_cdc_generation`. Both decommission and removenode go through the
|
||||
`left_token_ring` state to run a global barrier ensuring all nodes are aware
|
||||
of the topology change before the operation completes.
|
||||
`commit_cdc_generation`.
|
||||
|
||||
The state machine may also go only through the `commit_cdc_generation` state
|
||||
after getting a request from the user to create a new CDC generation if the
|
||||
|
||||
@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
|
||||
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
|
||||
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
|
||||
|
||||
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
|
||||
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
|
||||
- a keyspace/view was dropped
|
||||
- tablet operations (see [tablet operations section](#tablet-operations))
|
||||
In the first case we simply delete relevant view building tasks as they are no longer needed.
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
|
||||
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
|
||||
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
|
||||
|
||||
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user