Compare commits
7 Commits
copilot/up
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea26e4b3a5 | ||
|
|
3b793ef09f | ||
|
|
df0a59ba03 | ||
|
|
69024a09b2 | ||
|
|
bb8f28a1ab | ||
|
|
32bc7e3a1c | ||
|
|
fb4e37248d |
8
.github/CODEOWNERS
vendored
8
.github/CODEOWNERS
vendored
@@ -1,5 +1,5 @@
|
||||
# AUTH
|
||||
auth/* @nuivall
|
||||
auth/* @nuivall @ptrsmrn
|
||||
|
||||
# CACHE
|
||||
row_cache* @tgrabiec
|
||||
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
|
||||
transport/*
|
||||
|
||||
# CQL QUERY LANGUAGE
|
||||
cql3/* @tgrabiec @nuivall
|
||||
cql3/* @tgrabiec @nuivall @ptrsmrn
|
||||
|
||||
# COUNTERS
|
||||
counters* @nuivall
|
||||
tests/counter_test* @nuivall
|
||||
counters* @nuivall @ptrsmrn
|
||||
tests/counter_test* @nuivall @ptrsmrn
|
||||
|
||||
# DOCS
|
||||
docs/* @annastuchlik @tzach
|
||||
|
||||
2
.github/scripts/auto-backport.py
vendored
2
.github/scripts/auto-backport.py
vendored
@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
|
||||
if is_draft:
|
||||
labels_to_add.append("conflicts")
|
||||
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
|
||||
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
|
||||
pr_comment += "Please resolve them and mark this PR as ready for review"
|
||||
backport_pr.create_issue_comment(pr_comment)
|
||||
|
||||
# Apply all labels at once if we have any
|
||||
|
||||
@@ -18,7 +18,7 @@ jobs:
|
||||
|
||||
// Regular expression pattern to check for "Fixes" prefix
|
||||
// Adjusted to dynamically insert the repository full name
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
|
||||
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
|
||||
const regex = new RegExp(pattern);
|
||||
|
||||
if (!regex.test(body)) {
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
name: Call Jira release creation for new milestone
|
||||
|
||||
on:
|
||||
milestone:
|
||||
types: [created]
|
||||
|
||||
jobs:
|
||||
sync-milestone-to-jira:
|
||||
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
|
||||
with:
|
||||
# Comma-separated list of Jira project keys
|
||||
jira_project_keys: "SCYLLADB,CUSTOMER"
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,13 +0,0 @@
|
||||
name: validate_pr_author_email
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
|
||||
jobs:
|
||||
validate_pr_author_email:
|
||||
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main
|
||||
|
||||
2
.github/workflows/codespell.yaml
vendored
2
.github/workflows/codespell.yaml
vendored
@@ -13,5 +13,5 @@ jobs:
|
||||
- uses: codespell-project/actions-codespell@master
|
||||
with:
|
||||
only_warn: 1
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
|
||||
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"
|
||||
|
||||
10
.github/workflows/docs-validate-metrics.yml
vendored
10
.github/workflows/docs-validate-metrics.yml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
- enterprise
|
||||
paths:
|
||||
- '**/*.cc'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/get_description.py'
|
||||
- 'docs/_ext/scylladb_metrics.py'
|
||||
|
||||
@@ -15,20 +15,20 @@ jobs:
|
||||
validate-metrics:
|
||||
runs-on: ubuntu-latest
|
||||
name: Check metrics documentation coverage
|
||||
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install PyYAML
|
||||
|
||||
|
||||
- name: Validate metrics
|
||||
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml
|
||||
|
||||
5
.github/workflows/trigger-scylla-ci.yaml
vendored
5
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -3,13 +3,10 @@ name: Trigger Scylla CI Route
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
pull_request_target:
|
||||
types:
|
||||
- unlabeled
|
||||
|
||||
jobs:
|
||||
trigger-jenkins:
|
||||
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
|
||||
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger Scylla-CI-Route Jenkins Job
|
||||
|
||||
@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
|
||||
if (!comparison_operator.IsString()) {
|
||||
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
|
||||
}
|
||||
std::string op = rjson::to_string(comparison_operator);
|
||||
std::string op = comparison_operator.GetString();
|
||||
auto it = ops.find(op);
|
||||
if (it == ops.end()) {
|
||||
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
|
||||
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
|
||||
}
|
||||
if (kv1.name == "S") {
|
||||
return cmp(rjson::to_string_view(kv1.value),
|
||||
rjson::to_string_view(kv2.value));
|
||||
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
|
||||
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
|
||||
}
|
||||
if (kv1.name == "B") {
|
||||
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
|
||||
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "S") {
|
||||
return check_BETWEEN(rjson::to_string_view(kv_v.value),
|
||||
rjson::to_string_view(kv_lb.value),
|
||||
rjson::to_string_view(kv_ub.value),
|
||||
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
|
||||
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
|
||||
bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "B") {
|
||||
|
||||
@@ -8,8 +8,6 @@
|
||||
|
||||
#include "consumed_capacity.hh"
|
||||
#include "error.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
@@ -34,12 +32,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
if (!return_consumed->IsString()) {
|
||||
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
|
||||
}
|
||||
std::string_view consumed = rjson::to_string_view(*return_consumed);
|
||||
std::string consumed = return_consumed->GetString();
|
||||
if (consumed == "INDEXES") {
|
||||
throw api_error::validation("INDEXES consumed capacity is not supported");
|
||||
}
|
||||
if (consumed != "TOTAL") {
|
||||
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
|
||||
throw api_error::validation("Unknown consumed capacity "+ consumed);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
|
||||
controller::controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -40,7 +39,6 @@ controller::controller(
|
||||
: protocol_server(sg)
|
||||
, _gossiper(gossiper)
|
||||
, _proxy(proxy)
|
||||
, _ss(ss)
|
||||
, _mm(mm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _cdc_gen_svc(cdc_gen_svc)
|
||||
@@ -91,7 +89,7 @@ future<> controller::start_server() {
|
||||
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
|
||||
return cfg.alternator_timeout_in_ms;
|
||||
};
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
|
||||
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
|
||||
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
@@ -171,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class storage_service;
|
||||
class migration_manager;
|
||||
class memory_limiter;
|
||||
}
|
||||
@@ -58,7 +57,6 @@ class server;
|
||||
class controller : public protocol_server {
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
sharded<service::storage_proxy>& _proxy;
|
||||
sharded<service::storage_service>& _ss;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<cdc::generation_service>& _cdc_gen_svc;
|
||||
@@ -76,7 +74,6 @@ public:
|
||||
controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -96,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -67,14 +67,6 @@ using namespace std::chrono_literals;
|
||||
|
||||
logging::logger elogger("alternator-executor");
|
||||
|
||||
namespace std {
|
||||
template <> struct hash<std::pair<sstring, sstring>> {
|
||||
size_t operator () (const std::pair<sstring, sstring>& p) const {
|
||||
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// Alternator-specific table properties stored as hidden table tags:
|
||||
@@ -256,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
|
||||
return *(v.MemberBegin());
|
||||
}
|
||||
|
||||
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
|
||||
executor &_executor;
|
||||
|
||||
struct table_info {
|
||||
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
|
||||
};
|
||||
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
|
||||
bool active = false;
|
||||
|
||||
public:
|
||||
describe_table_info_manager(executor& executor) : _executor(executor) {
|
||||
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
|
||||
active = true;
|
||||
}
|
||||
describe_table_info_manager(const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager(describe_table_info_manager&&) = delete;
|
||||
~describe_table_info_manager() {
|
||||
if (active) {
|
||||
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
|
||||
}
|
||||
}
|
||||
|
||||
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
|
||||
|
||||
static std::chrono::high_resolution_clock::time_point now() {
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
}
|
||||
|
||||
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
|
||||
auto it = info_for_tables.find({ks_name, cf_name});
|
||||
if (it != info_for_tables.end()) {
|
||||
return it->second.size_in_bytes.get();
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
|
||||
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
|
||||
}
|
||||
future<> stop() {
|
||||
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
|
||||
active = false;
|
||||
co_return;
|
||||
}
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
|
||||
info_for_tables.erase({ks_name, cf_name});
|
||||
}
|
||||
};
|
||||
|
||||
executor::executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
smp_service_group ssg,
|
||||
utils::updateable_value<uint32_t> default_timeout_in_ms)
|
||||
: _gossiper(gossiper),
|
||||
_ss(ss),
|
||||
_proxy(proxy),
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
@@ -328,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_stats))
|
||||
{
|
||||
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
|
||||
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
|
||||
register_metrics(_metrics, _stats);
|
||||
}
|
||||
|
||||
@@ -480,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
|
||||
if (!table_name_value->IsString()) {
|
||||
throw api_error::validation("Non-string TableName field in request");
|
||||
}
|
||||
std::string table_name = rjson::to_string(*table_name_value);
|
||||
std::string table_name = table_name_value->GetString();
|
||||
return table_name;
|
||||
}
|
||||
|
||||
@@ -607,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
// does exist but the index does not (ValidationException).
|
||||
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
|
||||
throw api_error::validation(
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
|
||||
} else {
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
|
||||
@@ -648,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
|
||||
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return rjson::to_string(*attribute_value);
|
||||
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
|
||||
}
|
||||
|
||||
// Convenience function for getting the value of a boolean attribute, or a
|
||||
@@ -813,44 +752,12 @@ static future<bool> is_view_built(
|
||||
|
||||
}
|
||||
|
||||
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
|
||||
auto expiry = describe_table_info_manager::now() + ttl;
|
||||
return container().invoke_on_all(
|
||||
[schema, size_in_bytes, expiry] (executor& exec) {
|
||||
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
|
||||
});
|
||||
}
|
||||
|
||||
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
|
||||
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
|
||||
std::uint64_t total_size = 0;
|
||||
if (cached_size) {
|
||||
total_size = *cached_size;
|
||||
} else {
|
||||
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
|
||||
// move forward with deletion faster than we calculate the size
|
||||
if (!deleting) {
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
rjson::add(table_description, "TableSizeBytes", total_size);
|
||||
}
|
||||
|
||||
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
auto tags_ptr = db::get_tags_of_table(schema);
|
||||
|
||||
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
|
||||
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
|
||||
|
||||
auto creation_timestamp = get_table_creation_time(*schema);
|
||||
|
||||
@@ -894,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
|
||||
|
||||
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
|
||||
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
if (tbl_status != table_status::deleting) {
|
||||
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
|
||||
@@ -931,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
// (for a built view) or CREATING+Backfilling (if view building
|
||||
// is in progress).
|
||||
if (!is_lsi) {
|
||||
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
|
||||
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
|
||||
rjson::add(view_entry, "IndexStatus", "ACTIVE");
|
||||
} else {
|
||||
rjson::add(view_entry, "IndexStatus", "CREATING");
|
||||
@@ -959,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
}
|
||||
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
|
||||
}
|
||||
executor::supplement_table_stream_info(table_description, *schema, _proxy);
|
||||
executor::supplement_table_stream_info(table_description, *schema, proxy);
|
||||
|
||||
// FIXME: still missing some response fields (issue #5026)
|
||||
co_return table_description;
|
||||
}
|
||||
|
||||
@@ -980,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Table", std::move(table_description));
|
||||
elogger.trace("returning {}", response);
|
||||
@@ -1083,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
auto& p = _proxy.container();
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -1170,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
|
||||
}
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (attribute_info["AttributeName"].GetString() == name) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
data_type dt = parse_key_type(type);
|
||||
if (computed_column) {
|
||||
// Computed column for GSI (doesn't choose a real column as-is
|
||||
@@ -1206,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First element of KeySchema must be an object");
|
||||
}
|
||||
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
|
||||
throw api_error::validation("First key in KeySchema must be a HASH key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[0], "AttributeName");
|
||||
@@ -1214,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First key in KeySchema must have string AttributeName");
|
||||
}
|
||||
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
|
||||
std::string hash_key = rjson::to_string(*v);
|
||||
std::string hash_key = v->GetString();
|
||||
std::string range_key;
|
||||
if (key_schema->Size() == 2) {
|
||||
if (!(*key_schema)[1].IsObject()) {
|
||||
throw api_error::validation("Second element of KeySchema must be an object");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "KeyType");
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
|
||||
throw api_error::validation("Second key in KeySchema must be a RANGE key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "AttributeName");
|
||||
@@ -1647,7 +1557,8 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
|
||||
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
@@ -1834,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
|
||||
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, sp)) {
|
||||
validate_cdc_log_name_length(builder.cf_name());
|
||||
}
|
||||
}
|
||||
@@ -1853,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1869,18 +1780,18 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
|
||||
}
|
||||
|
||||
size_t retries = _mm.get_concurrent_ddl_retries();
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
utils::chunked_vector<mutation> schema_mutations;
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
const auto& topo = sp.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
if (rs->uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
@@ -1888,19 +1799,14 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
}
|
||||
}
|
||||
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
|
||||
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
|
||||
if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
}
|
||||
try {
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
|
||||
} catch (exceptions::already_exists_exception&) {
|
||||
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
|
||||
}
|
||||
}
|
||||
if (_proxy.data_dictionary().try_find_table(schema->id())) {
|
||||
if (sp.data_dictionary().try_find_table(schema->id())) {
|
||||
// This should never happen, the ID is supposed to be unique
|
||||
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
|
||||
}
|
||||
@@ -1909,9 +1815,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
for (schema_builder& view_builder : view_builders) {
|
||||
schemas.push_back(view_builder.build());
|
||||
}
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
|
||||
if (ksm->uses_tablets()) {
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
|
||||
}
|
||||
|
||||
// If a role is allowed to create a table, we must give it permissions to
|
||||
@@ -1936,7 +1842,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
|
||||
try {
|
||||
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
break;
|
||||
} catch (const service::group0_concurrent_modification& ex) {
|
||||
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
|
||||
@@ -1948,9 +1854,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
}
|
||||
|
||||
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
rjson::value status = rjson::empty_object();
|
||||
executor::supplement_table_info(request, *schema, _proxy);
|
||||
executor::supplement_table_info(request, *schema, sp);
|
||||
rjson::add(status, "TableDescription", std::move(request));
|
||||
co_return rjson::print(std::move(status));
|
||||
}
|
||||
@@ -1959,11 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
|
||||
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
|
||||
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1982,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
std::string def_type = type_to_string(def.type);
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (type != def_type) {
|
||||
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
|
||||
}
|
||||
@@ -2114,10 +2019,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation(fmt::format(
|
||||
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
|
||||
}
|
||||
if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
|
||||
!p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
}
|
||||
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
@@ -2461,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
validate_value(it->value, "PutItem");
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
|
||||
@@ -2882,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
|
||||
return;
|
||||
}
|
||||
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
|
||||
if (!used.contains(rjson::to_string(it->name))) {
|
||||
if (!used.contains(it->name.GetString())) {
|
||||
throw api_error::validation(
|
||||
format("{} has spurious '{}', not used in {}",
|
||||
field_name, rjson::to_string_view(it->name), operation));
|
||||
field_name, it->name.GetString(), operation));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3099,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
}
|
||||
|
||||
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
|
||||
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
|
||||
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
@@ -3154,44 +3055,17 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
if (!cas_shard.this_shard()) {
|
||||
_stats.shard_bounce_for_lwt++;
|
||||
return container().invoke_on(cas_shard.shard(), _ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = mutation_builders,
|
||||
&dk,
|
||||
ks = schema->ks_name(),
|
||||
cf = schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(executor& self) mutable {
|
||||
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt), &self]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
|
||||
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
|
||||
auto timeout = executor::default_timeout();
|
||||
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
|
||||
auto* op_ptr = op.get();
|
||||
auto cdc_opts = cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility =
|
||||
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
};
|
||||
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
|
||||
@@ -3217,11 +3091,13 @@ struct schema_decorated_key_equal {
|
||||
|
||||
// FIXME: if we failed writing some of the mutations, need to return a list
|
||||
// of these failed mutations rather than fail the whole write (issue #5650).
|
||||
future<> executor::do_batch_write(
|
||||
static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
smp_service_group ssg,
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
service_permit permit,
|
||||
stats& stats) {
|
||||
if (mutation_builders.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -3243,7 +3119,7 @@ future<> executor::do_batch_write(
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
any_cdc_enabled |= b.first->cdc_options().enabled();
|
||||
}
|
||||
return _proxy.mutate(std::move(mutations),
|
||||
return proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
executor::default_timeout(),
|
||||
trace_state,
|
||||
@@ -3252,7 +3128,7 @@ future<> executor::do_batch_write(
|
||||
false,
|
||||
cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
});
|
||||
} else {
|
||||
// Do the write via LWT:
|
||||
@@ -3264,35 +3140,46 @@ future<> executor::do_batch_write(
|
||||
schema_decorated_key_hash,
|
||||
schema_decorated_key_equal>;
|
||||
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto&& b : std::move(mutation_builders)) {
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
|
||||
.schema = b.first,
|
||||
.dk = dht::decorate_key(*b.first, b.second.pk())
|
||||
});
|
||||
for (auto& b : mutation_builders) {
|
||||
auto dk = dht::decorate_key(*b.first, b.second.pk());
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
|
||||
it->second.push_back(std::move(b.second));
|
||||
}
|
||||
auto* key_builders_ptr = key_builders.get();
|
||||
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
|
||||
_stats.write_using_lwt++;
|
||||
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
|
||||
auto s = e.first.schema;
|
||||
if (desired_shard.this_shard()) {
|
||||
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
|
||||
} else {
|
||||
stats.shard_bounce_for_lwt++;
|
||||
return proxy.container().invoke_on(desired_shard.shard(), ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = e.second,
|
||||
&dk = e.first.dk,
|
||||
ks = e.first.schema->ks_name(),
|
||||
cf = e.first.schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(service::storage_proxy& proxy) mutable {
|
||||
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.data_dictionary().find_schema(ks, cf);
|
||||
|
||||
static const auto* injection_name = "alternator_executor_batch_write_wait";
|
||||
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
|
||||
const auto ks = handler.get("keyspace");
|
||||
const auto cf = handler.get("table");
|
||||
const auto shard = std::atoll(handler.get("shard")->data());
|
||||
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
|
||||
elogger.info("{}: hit", injection_name);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
elogger.info("{}: continue", injection_name);
|
||||
}
|
||||
}).then([&e, desired_shard = std::move(desired_shard),
|
||||
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
|
||||
{
|
||||
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
|
||||
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
|
||||
});
|
||||
// The desired_shard on the original shard remains alive for the duration
|
||||
// of cas_write on this shard and prevents any tablet operations.
|
||||
// However, we need a local instance of cas_shard on this shard
|
||||
// to pass it to sp::cas, so we just create a new one.
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
}).finally([desired_shard = std::move(desired_shard)]{});
|
||||
}
|
||||
}).finally([key_builders = std::move(key_builders)]{});
|
||||
}
|
||||
}
|
||||
@@ -3440,7 +3327,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
|
||||
_stats.api_operations.batch_write_item_batch_total += total_items;
|
||||
_stats.api_operations.batch_write_item_histogram.add(total_items);
|
||||
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
|
||||
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
|
||||
// FIXME: Issue #5650: If we failed writing some of the updates,
|
||||
// need to return a list of these failed updates in UnprocessedItems
|
||||
// rather than fail the whole write (issue #5650).
|
||||
@@ -3485,7 +3372,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
}
|
||||
rjson::value newv = rjson::empty_object();
|
||||
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
|
||||
std::string attr = rjson::to_string(it->name);
|
||||
std::string attr = it->name.GetString();
|
||||
auto x = members.find(attr);
|
||||
if (x != members.end()) {
|
||||
if (x->second) {
|
||||
@@ -3705,7 +3592,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
|
||||
const rjson::value& attributes_to_get = req["AttributesToGet"];
|
||||
attrs_to_get ret;
|
||||
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
|
||||
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
|
||||
attribute_path_map_add("AttributesToGet", ret, it->GetString());
|
||||
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
|
||||
}
|
||||
if (ret.empty()) {
|
||||
@@ -4371,12 +4258,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
|
||||
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
// Note that it.key() is the name of the column, *it is the operation
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
const column_definition* cdef = _schema->get_column_definition(column_name);
|
||||
if (cdef && cdef->is_primary_key()) {
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
|
||||
}
|
||||
std::string action = rjson::to_string((it->value)["Action"]);
|
||||
std::string action = (it->value)["Action"].GetString();
|
||||
if (action == "DELETE") {
|
||||
// The DELETE operation can do two unrelated tasks. Without a
|
||||
// "Value" option, it is used to delete an attribute. With a
|
||||
@@ -5573,7 +5460,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
std::vector<query::clustering_range> ck_bounds;
|
||||
|
||||
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
|
||||
sstring key = rjson::to_sstring(it->name);
|
||||
std::string key = it->name.GetString();
|
||||
const rjson::value& condition = it->value;
|
||||
|
||||
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
|
||||
@@ -5581,13 +5468,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
|
||||
const column_definition& pk_cdef = schema->partition_key_columns().front();
|
||||
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
|
||||
if (key == pk_cdef.name_as_text()) {
|
||||
if (sstring(key) == pk_cdef.name_as_text()) {
|
||||
if (!partition_ranges.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
|
||||
}
|
||||
if (ck_cdef && key == ck_cdef->name_as_text()) {
|
||||
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
|
||||
if (!ck_bounds.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
@@ -5988,7 +5875,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
|
||||
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
|
||||
rjson::value* limit_json = rjson::find(request, "Limit");
|
||||
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
|
||||
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
|
||||
int limit = limit_json ? limit_json->GetInt() : 100;
|
||||
if (limit < 1 || limit > 100) {
|
||||
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
|
||||
@@ -6177,10 +6064,9 @@ future<> executor::start() {
|
||||
}
|
||||
|
||||
future<> executor::stop() {
|
||||
co_await _describe_table_info_manager->stop();
|
||||
// disconnect from the value source, but keep the value unchanged.
|
||||
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
|
||||
co_await _parsed_expression_cache->stop();
|
||||
return _parsed_expression_cache->stop();
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
|
||||
@@ -17,13 +17,11 @@
|
||||
#include "service/client_state.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "alternator/error.hh"
|
||||
#include "stats.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/simple_value_with_expiry.hh"
|
||||
|
||||
#include "tracing/trace_state.hh"
|
||||
|
||||
@@ -42,8 +40,6 @@ namespace cql3::selection {
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class cas_shard;
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -60,9 +56,7 @@ class schema_builder;
|
||||
|
||||
namespace alternator {
|
||||
|
||||
enum class table_status;
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
@@ -140,7 +134,6 @@ class expression_cache;
|
||||
|
||||
class executor : public peering_sharded_service<executor> {
|
||||
gms::gossiper& _gossiper;
|
||||
service::storage_service& _ss;
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_manager& _mm;
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
@@ -153,11 +146,6 @@ class executor : public peering_sharded_service<executor> {
|
||||
|
||||
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
|
||||
|
||||
struct describe_table_info_manager;
|
||||
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
|
||||
|
||||
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
|
||||
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
|
||||
public:
|
||||
using client_state = service::client_state;
|
||||
// request_return_type is the return type of the executor methods, which
|
||||
@@ -183,7 +171,6 @@ public:
|
||||
|
||||
executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
@@ -231,18 +218,6 @@ private:
|
||||
friend class rmw_operation;
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
|
||||
|
||||
future<> do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit);
|
||||
|
||||
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
|
||||
public:
|
||||
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
|
||||
return {"", nullptr};
|
||||
}
|
||||
auto it = v.MemberBegin();
|
||||
const std::string it_key = rjson::to_string(it->name);
|
||||
const std::string it_key = it->name.GetString();
|
||||
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
|
||||
return {std::move(it_key), nullptr};
|
||||
}
|
||||
|
||||
@@ -708,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -989,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
ret.emplace_back(r.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -108,7 +107,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
|
||||
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
|
||||
}
|
||||
sstring attribute_name = rjson::to_sstring(*v);
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
|
||||
@@ -31,7 +31,6 @@ set(swagger_files
|
||||
api-doc/column_family.json
|
||||
api-doc/commitlog.json
|
||||
api-doc/compaction_manager.json
|
||||
api-doc/client_routes.json
|
||||
api-doc/config.json
|
||||
api-doc/cql_server_test.json
|
||||
api-doc/endpoint_snitch_info.json
|
||||
@@ -69,7 +68,6 @@ target_sources(api
|
||||
PRIVATE
|
||||
api.cc
|
||||
cache_service.cc
|
||||
client_routes.cc
|
||||
collectd.cc
|
||||
column_family.cc
|
||||
commitlog.cc
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
, "client_routes_entry": {
|
||||
"id": "client_routes_entry",
|
||||
"summary": "An entry storing client routes",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"},
|
||||
"address": {"type": "string"},
|
||||
"port": {"type": "integer"},
|
||||
"tls_port": {"type": "integer"},
|
||||
"alternator_port": {"type": "integer"},
|
||||
"alternator_https_port": {"type": "integer"}
|
||||
},
|
||||
"required": ["connection_id", "host_id", "address"]
|
||||
}
|
||||
, "client_routes_key": {
|
||||
"id": "client_routes_key",
|
||||
"summary": "A key of client_routes_entry",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
, "/v2/client-routes":{
|
||||
"get": {
|
||||
"description":"List all client route entries",
|
||||
"operationId":"get_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[],
|
||||
"responses":{
|
||||
"200":{
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{"$ref":"#/definitions/ErrorModel"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"post": {
|
||||
"description":"Upsert one or more client route entries",
|
||||
"operationId":"set_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{ "description": "OK" },
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{ "$ref":"#/definitions/ErrorModel" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"delete": {
|
||||
"description":"Delete one or more client route entries",
|
||||
"operationId":"delete_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_key" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{
|
||||
"description": "OK"
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{
|
||||
"$ref":"#/definitions/ErrorModel"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3051,7 +3051,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -349,9 +349,13 @@
|
||||
"type":"long",
|
||||
"description":"The shard the task is running on"
|
||||
},
|
||||
"creation_time":{
|
||||
"type":"datetime",
|
||||
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
|
||||
},
|
||||
"start_time":{
|
||||
"type":"datetime",
|
||||
"description":"The start time of the task; unspecified (equal to epoch) when state == created"
|
||||
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
|
||||
},
|
||||
"end_time":{
|
||||
"type":"datetime",
|
||||
@@ -398,13 +402,17 @@
|
||||
"type":"boolean",
|
||||
"description":"Boolean flag indicating whether the task can be aborted"
|
||||
},
|
||||
"creation_time":{
|
||||
"type":"datetime",
|
||||
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
|
||||
},
|
||||
"start_time":{
|
||||
"type":"datetime",
|
||||
"description":"The start time of the task"
|
||||
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
|
||||
},
|
||||
"end_time":{
|
||||
"type":"datetime",
|
||||
"description":"The end time of the task (unspecified when the task is not completed)"
|
||||
"description":"The end time of the task (when execution completed); unspecified (equal to epoch) when the task is not completed"
|
||||
},
|
||||
"error":{
|
||||
"type":"string",
|
||||
|
||||
13
api/api.cc
13
api/api.cc
@@ -37,7 +37,6 @@
|
||||
#include "raft.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "service_levels.hh"
|
||||
#include "client_routes.hh"
|
||||
|
||||
logging::logger apilog("api");
|
||||
|
||||
@@ -68,11 +67,9 @@ future<> set_server_init(http_context& ctx) {
|
||||
rb02->set_api_doc(r);
|
||||
rb02->register_api_file(r, "swagger20_header");
|
||||
rb02->register_api_file(r, "metrics");
|
||||
rb02->register_api_file(r, "client_routes");
|
||||
rb->register_function(r, "system",
|
||||
"The system related API");
|
||||
rb02->add_definitions_file(r, "metrics");
|
||||
rb02->add_definitions_file(r, "client_routes");
|
||||
set_system(ctx, r);
|
||||
rb->register_function(r, "error_injection",
|
||||
"The error injection API");
|
||||
@@ -132,16 +129,6 @@ future<> unset_server_storage_service(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
|
||||
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
|
||||
set_client_routes(ctx, r, cr);
|
||||
});
|
||||
}
|
||||
|
||||
future<> unset_server_client_routes(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
|
||||
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ class storage_proxy;
|
||||
class storage_service;
|
||||
class raft_group0_client;
|
||||
class raft_group_registry;
|
||||
class client_routes_service;
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -100,8 +99,6 @@ future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snit
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
|
||||
future<> unset_server_sstables_loader(http_context& ctx);
|
||||
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);
|
||||
|
||||
@@ -1,176 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/http/short_streams.hh>
|
||||
|
||||
#include "client_routes.hh"
|
||||
#include "api/api.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/client_routes.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
|
||||
#include "api/api-doc/client_routes.json.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace json;
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
namespace api {
|
||||
|
||||
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
|
||||
if (!cr.local().get_feature_service().client_routes) {
|
||||
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
|
||||
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
|
||||
}
|
||||
}
|
||||
|
||||
static sstring parse_string(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
throw bad_param_exception(fmt::format("Missing '{}'", name));
|
||||
}
|
||||
if (!it->value.IsString()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be a string", name));
|
||||
}
|
||||
return {it->value.GetString(), it->value.GetStringLength()};
|
||||
}
|
||||
|
||||
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!it->value.IsInt()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
|
||||
}
|
||||
auto port = it->value.GetInt();
|
||||
if (port < 1 || port > 65535) {
|
||||
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_entry> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
|
||||
|
||||
const auto port = parse_port("port", element);
|
||||
const auto tls_port = parse_port("tls_port", element);
|
||||
const auto alternator_port = parse_port("alternator_port", element);
|
||||
const auto alternator_https_port = parse_port("alternator_https_port", element);
|
||||
|
||||
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
|
||||
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
|
||||
}
|
||||
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)},
|
||||
parse_string("address", element),
|
||||
port,
|
||||
tls_port,
|
||||
alternator_port,
|
||||
alternator_https_port
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "rest_set_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_key> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)}
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "delete_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "get_client_routes");
|
||||
|
||||
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
|
||||
seastar::httpd::client_routes_json::client_routes_entry obj;
|
||||
obj.connection_id = entry.connection_id;
|
||||
obj.host_id = fmt::to_string(entry.host_id);
|
||||
obj.address = entry.address;
|
||||
if (entry.port.has_value()) { obj.port = entry.port.value(); }
|
||||
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
|
||||
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
|
||||
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
|
||||
return obj;
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_set_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_delete_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_get_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
}
|
||||
|
||||
void unset_client_routes(http_context& ctx, routes& r) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::get_client_routes.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/json/json_elements.hh>
|
||||
#include "api/api_init.hh"
|
||||
|
||||
namespace api {
|
||||
|
||||
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
|
||||
void unset_client_routes(http_context& ctx, httpd::routes& r);
|
||||
|
||||
}
|
||||
@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
|
||||
vp.insert(b.second);
|
||||
}
|
||||
}
|
||||
std::vector<sstring> res;
|
||||
replica::database& db = vb.local().get_db();
|
||||
auto uuid = validate_table(db, ks, cf_name);
|
||||
replica::column_family& cf = db.find_column_family(uuid);
|
||||
co_return cf.get_index_manager().list_indexes()
|
||||
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
|
||||
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
|
||||
| std::ranges::to<std::vector>();
|
||||
res.reserve(cf.get_index_manager().list_indexes().size());
|
||||
for (auto&& i : cf.get_index_manager().list_indexes()) {
|
||||
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
|
||||
res.emplace_back(i.metadata().name());
|
||||
}
|
||||
}
|
||||
co_return res;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
|
||||
res.scope = status.scope;
|
||||
res.state = status.state;
|
||||
res.is_abortable = bool(status.is_abortable);
|
||||
res.creation_time = get_time(status.creation_time);
|
||||
res.start_time = get_time(status.start_time);
|
||||
res.end_time = get_time(status.end_time);
|
||||
res.error = status.error;
|
||||
@@ -83,6 +84,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
|
||||
res.table = stats.table;
|
||||
res.entity = stats.entity;
|
||||
res.shard = stats.shard;
|
||||
res.creation_time = get_time(stats.creation_time);
|
||||
res.start_time = get_time(stats.start_time);
|
||||
res.end_time = get_time(stats.end_time);;
|
||||
return res;
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
@@ -22,6 +23,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include <iterator>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/core/format.hh>
|
||||
|
||||
@@ -23,11 +22,9 @@ namespace auth {
|
||||
|
||||
logging::logger logger("auth-cache");
|
||||
|
||||
cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
|
||||
cache::cache(cql3::query_processor& qp) noexcept
|
||||
: _current_version(0)
|
||||
, _qp(qp)
|
||||
, _loading_sem(1)
|
||||
, _as(as) {
|
||||
, _qp(qp) {
|
||||
}
|
||||
|
||||
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
|
||||
@@ -119,8 +116,6 @@ future<> cache::load_all() {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto units = co_await get_units(_loading_sem, 1, _as);
|
||||
|
||||
++_current_version;
|
||||
|
||||
logger.info("Loading all roles");
|
||||
@@ -151,9 +146,6 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto units = co_await get_units(_loading_sem, 1, _as);
|
||||
|
||||
for (const auto& name : roles) {
|
||||
logger.info("Loading role {}", name);
|
||||
auto role = co_await fetch_role(name);
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -16,7 +15,6 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
@@ -43,7 +41,7 @@ public:
|
||||
version_tag_t version; // used for seamless cache reloads
|
||||
};
|
||||
|
||||
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
|
||||
explicit cache(cql3::query_processor& qp) noexcept;
|
||||
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
|
||||
future<> load_all();
|
||||
future<> load_roles(std::unordered_set<role_name_t> roles);
|
||||
@@ -54,8 +52,6 @@ private:
|
||||
roles_map _roles;
|
||||
version_tag_t _current_version;
|
||||
cql3::query_processor& _qp;
|
||||
semaphore _loading_sem;
|
||||
abort_source& _as;
|
||||
|
||||
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
|
||||
future<> prune_all() noexcept;
|
||||
|
||||
@@ -35,13 +35,14 @@ static const class_registrator<auth::authenticator
|
||||
, cql3::query_processor&
|
||||
, ::service::raft_group0_client&
|
||||
, ::service::migration_manager&
|
||||
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
, auth::cache&
|
||||
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
|
||||
enum class auth::certificate_authenticator::query_source {
|
||||
subject, altname
|
||||
};
|
||||
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
|
||||
: _queries([&] {
|
||||
auto& conf = qp.db().get_config();
|
||||
auto queries = conf.auth_certificate_role_queries();
|
||||
@@ -76,9 +77,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
|
||||
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
|
||||
}
|
||||
continue;
|
||||
} catch (const std::out_of_range&) {
|
||||
} catch (std::out_of_range&) {
|
||||
// just fallthrough
|
||||
} catch (const boost::regex_error&) {
|
||||
} catch (boost::regex_error&) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
|
||||
|
||||
namespace cql3 {
|
||||
@@ -33,7 +34,7 @@ class certificate_authenticator : public authenticator {
|
||||
enum class query_source;
|
||||
std::vector<std::pair<query_source, boost::regex>> _queries;
|
||||
public:
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
~certificate_authenticator();
|
||||
|
||||
future<> start() override;
|
||||
|
||||
@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
|
||||
try {
|
||||
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
|
||||
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
|
||||
} catch (const exceptions::already_exists_exception&) {}
|
||||
} catch (exceptions::already_exists_exception&) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
|
||||
} else {
|
||||
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
|
||||
}
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
|
||||
}
|
||||
}
|
||||
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -49,7 +49,8 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
|
||||
@@ -63,13 +64,14 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
password_authenticator::~password_authenticator() {
|
||||
}
|
||||
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
, _hashing_worker(hashing_worker)
|
||||
{}
|
||||
|
||||
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
|
||||
@@ -328,18 +330,20 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
}
|
||||
salted_hash = role->salted_hash;
|
||||
}
|
||||
const bool password_match = co_await passwords::check(password, *salted_hash);
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
|
||||
return passwords::check(password, *salted_hash);
|
||||
});
|
||||
if (!password_match) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
co_return username;
|
||||
} catch (const std::system_error &) {
|
||||
} catch (std::system_error &) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.what()));
|
||||
} catch (const exceptions::authentication_exception& e) {
|
||||
} catch (exceptions::authentication_exception& e) {
|
||||
std::throw_with_nested(e);
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
} catch (exceptions::unavailable_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "auth/passwords.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
@@ -48,12 +49,13 @@ class password_authenticator : public authenticator {
|
||||
shared_promise<> _superuser_created_promise;
|
||||
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
|
||||
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
|
||||
utils::alien_worker& _hashing_worker;
|
||||
|
||||
public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
|
||||
~password_authenticator();
|
||||
|
||||
|
||||
@@ -7,8 +7,6 @@
|
||||
*/
|
||||
|
||||
#include "auth/passwords.hh"
|
||||
#include "utils/crypt_sha512.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include <cerrno>
|
||||
|
||||
@@ -23,46 +21,25 @@ static thread_local crypt_data tlcrypt = {};
|
||||
|
||||
namespace detail {
|
||||
|
||||
void verify_hashing_output(const char * res) {
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
void verify_scheme(scheme scheme) {
|
||||
const sstring random_part_of_salt = "aaaabbbbccccdddd";
|
||||
|
||||
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
try {
|
||||
verify_hashing_output(e);
|
||||
} catch (const std::system_error& ex) {
|
||||
throw no_supported_schemes();
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw no_supported_schemes();
|
||||
}
|
||||
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
|
||||
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
verify_hashing_output(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
|
||||
sstring res;
|
||||
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
|
||||
// the __crypt_sha512 method. For other computations, we fall back to the
|
||||
// crypt_r implementation from `<crypt.h>`, which can stall.
|
||||
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
|
||||
char buf[128];
|
||||
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
} else {
|
||||
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
co_return res;
|
||||
return res;
|
||||
}
|
||||
|
||||
std::string_view prefix_for_scheme(scheme c) noexcept {
|
||||
@@ -81,9 +58,8 @@ no_supported_schemes::no_supported_schemes()
|
||||
: std::runtime_error("No allowed hashing schemes are supported on this system") {
|
||||
}
|
||||
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
|
||||
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
|
||||
co_return pwd_hash == salted_hash;
|
||||
bool check(const sstring& pass, const sstring& salted_hash) {
|
||||
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
|
||||
}
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <random>
|
||||
#include <stdexcept>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "seastarx.hh"
|
||||
@@ -76,19 +75,10 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
|
||||
|
||||
///
|
||||
/// Hash a password combined with an implementation-specific salt string.
|
||||
/// Deprecated in favor of `hash_with_salt_async`.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
///
|
||||
/// Async version of `hash_with_salt` that returns a future.
|
||||
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
} // namespace detail
|
||||
|
||||
@@ -117,6 +107,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
|
||||
bool check(const sstring& pass, const sstring& salted_hash);
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -35,9 +35,10 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
cache&,
|
||||
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -29,7 +30,7 @@ namespace auth {
|
||||
class saslauthd_authenticator : public authenticator {
|
||||
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
|
||||
public:
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
|
||||
|
||||
future<> start() override;
|
||||
|
||||
|
||||
@@ -191,7 +191,8 @@ service::service(
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache)
|
||||
cache& cache,
|
||||
utils::alien_worker& hashing_worker)
|
||||
: service(
|
||||
std::move(c),
|
||||
cache,
|
||||
@@ -199,7 +200,7 @@ service::service(
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
@@ -225,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
|
||||
try {
|
||||
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
|
||||
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
|
||||
} catch (const ::service::group0_concurrent_modification&) {
|
||||
} catch (::service::group0_concurrent_modification&) {
|
||||
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "cql3/description.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/observable.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "service/maintenance_mode.hh"
|
||||
@@ -130,7 +131,8 @@ public:
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
maintenance_socket_enabled,
|
||||
cache&);
|
||||
cache&,
|
||||
utils::alien_worker&);
|
||||
|
||||
future<> start(::service::migration_manager&, db::system_keyspace&);
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
log.warn("Skipped default role setup: some nodes were not ready; will retry");
|
||||
throw e;
|
||||
}
|
||||
|
||||
@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
@@ -81,7 +81,7 @@ public:
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -126,7 +126,7 @@ public:
|
||||
virtual bytes evaluate_response(bytes_view client_response) override {
|
||||
try {
|
||||
return _sasl->evaluate_response(client_response);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
_complete = true;
|
||||
return {};
|
||||
}
|
||||
@@ -141,7 +141,7 @@ public:
|
||||
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -241,7 +241,8 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
auth::cache&,
|
||||
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
|
||||
@@ -10,9 +10,7 @@
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
enum class client_type {
|
||||
@@ -29,20 +27,6 @@ enum class client_connection_stage {
|
||||
ready,
|
||||
};
|
||||
|
||||
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
|
||||
struct options_cache_value_type {};
|
||||
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
|
||||
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
|
||||
using client_options_cache_key_type = client_options_cache_type::key_type;
|
||||
|
||||
// This struct represents a single OPTION key-value pair from the client's connection options.
|
||||
// Both key and value are represented by corresponding "references" to their cached values.
|
||||
// Each "reference" is effectively a lw_shared_ptr value.
|
||||
struct client_option_key_value_cached_entry {
|
||||
client_options_cache_entry_type key;
|
||||
client_options_cache_entry_type value;
|
||||
};
|
||||
|
||||
sstring to_string(client_connection_stage ct);
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
@@ -53,8 +37,8 @@ struct client_data {
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
std::optional<client_options_cache_entry_type> driver_name;
|
||||
std::optional<client_options_cache_entry_type> driver_version;
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
std::optional<int32_t> protocol_version;
|
||||
std::optional<sstring> ssl_cipher_suite;
|
||||
@@ -62,7 +46,6 @@ struct client_data {
|
||||
std::optional<sstring> ssl_protocol;
|
||||
std::optional<sstring> username;
|
||||
std::optional<sstring> scheduling_group_name;
|
||||
std::list<client_option_key_value_cached_entry> client_options;
|
||||
|
||||
sstring stage_str() const { return to_string(connection_stage); }
|
||||
sstring client_type_str() const { return to_string(ct); }
|
||||
|
||||
@@ -125,6 +125,10 @@ if(target_arch)
|
||||
add_compile_options("-march=${target_arch}")
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
|
||||
endif()
|
||||
|
||||
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
|
||||
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "sstables/open_info.hh"
|
||||
#include "compaction_descriptor.hh"
|
||||
|
||||
class reader_permit;
|
||||
@@ -45,7 +44,7 @@ public:
|
||||
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
|
||||
virtual reader_permit make_compaction_reader_permit() const = 0;
|
||||
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
|
||||
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
|
||||
virtual sstables::shared_sstable make_sstable() const = 0;
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
|
||||
virtual api::timestamp_type min_memtable_timestamp() const = 0;
|
||||
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;
|
||||
|
||||
@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
|
||||
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
|
||||
}
|
||||
descriptor.creator = [&t] (shard_id) {
|
||||
// All compaction types going through this path will work on normal input sstables only.
|
||||
// Off-strategy, for example, waits until the sstables move out of staging state.
|
||||
return t.make_sstable(sstables::sstable_state::normal);
|
||||
return t.make_sstable();
|
||||
};
|
||||
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
|
||||
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
|
||||
@@ -1849,10 +1847,6 @@ protected:
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
}, false);
|
||||
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
|
||||
co_return co_await do_rewrite_sstable(std::move(sst));
|
||||
}
|
||||
};
|
||||
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
// Throw an error if split cannot be performed due to e.g. out of space prevention.
|
||||
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
|
||||
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
|
||||
if (is_disabled()) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
|
||||
"reason might be out of space prevention", sst->get_filename()))));
|
||||
if (!can_proceed(&t)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
|
||||
compaction_progress_monitor monitor;
|
||||
compaction_data info = create_compaction_data();
|
||||
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
|
||||
desc.creator = [&t, sst] (shard_id _) {
|
||||
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
|
||||
// For example, if base table has views, it's important that sstable produced by repair will be
|
||||
// in the staging state.
|
||||
return t.make_sstable(sst->state());
|
||||
desc.creator = [&t] (shard_id _) {
|
||||
return t.make_sstable();
|
||||
};
|
||||
desc.replacer = [&] (compaction_completion_desc d) {
|
||||
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));
|
||||
|
||||
@@ -376,8 +376,7 @@ public:
|
||||
// Splits a single SSTable by segregating all its data according to the classifier.
|
||||
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
||||
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
||||
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
|
||||
// Run a custom job for a given table, defined by a function
|
||||
// it completes when future returned by job is ready or returns immediately
|
||||
|
||||
@@ -571,10 +571,10 @@ commitlog_total_space_in_mb: -1
|
||||
# - "none": auditing is disabled (default)
|
||||
# - "table": save audited events in audit.audit_log column family
|
||||
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
|
||||
audit: "table"
|
||||
# audit: "none"
|
||||
#
|
||||
# List of statement categories that should be audited.
|
||||
audit_categories: "DCL,DDL,AUTH,ADMIN"
|
||||
# audit_categories: "DCL,DDL,AUTH"
|
||||
#
|
||||
# List of tables that should be audited.
|
||||
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
||||
|
||||
160
configure.py
160
configure.py
@@ -368,87 +368,6 @@ def find_ninja():
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def find_compiler(name):
|
||||
"""
|
||||
Find a compiler by name, skipping ccache wrapper directories.
|
||||
|
||||
This is useful when using sccache to avoid double-caching through ccache.
|
||||
|
||||
Args:
|
||||
name: The compiler name (e.g., 'clang++', 'clang', 'gcc')
|
||||
|
||||
Returns:
|
||||
Path to the compiler, skipping ccache directories, or None if not found.
|
||||
"""
|
||||
ccache_dirs = {'/usr/lib/ccache', '/usr/lib64/ccache'}
|
||||
for path_dir in os.environ.get('PATH', '').split(os.pathsep):
|
||||
# Skip ccache wrapper directories
|
||||
if os.path.realpath(path_dir) in ccache_dirs or path_dir in ccache_dirs:
|
||||
continue
|
||||
candidate = os.path.join(path_dir, name)
|
||||
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
def resolve_compilers_for_sccache(args, compiler_cache):
|
||||
"""
|
||||
When using sccache, resolve compiler paths to avoid ccache directories.
|
||||
|
||||
This prevents double-caching when ccache symlinks are in PATH.
|
||||
|
||||
Args:
|
||||
args: The argument namespace with cc and cxx attributes.
|
||||
compiler_cache: Path to the compiler cache binary, or None.
|
||||
"""
|
||||
if not compiler_cache or 'sccache' not in compiler_cache:
|
||||
return
|
||||
if not os.path.isabs(args.cxx):
|
||||
real_cxx = find_compiler(args.cxx)
|
||||
if real_cxx:
|
||||
args.cxx = real_cxx
|
||||
if not os.path.isabs(args.cc):
|
||||
real_cc = find_compiler(args.cc)
|
||||
if real_cc:
|
||||
args.cc = real_cc
|
||||
|
||||
|
||||
def find_compiler_cache(preference):
|
||||
"""
|
||||
Find a compiler cache based on the preference.
|
||||
|
||||
Args:
|
||||
preference: One of 'auto', 'sccache', 'ccache', 'none', or a path to a binary.
|
||||
|
||||
Returns:
|
||||
Path to the compiler cache binary, or None if not found/disabled.
|
||||
"""
|
||||
if preference == 'none':
|
||||
return None
|
||||
|
||||
if preference == 'auto':
|
||||
# Prefer sccache over ccache
|
||||
for cache in ['sccache', 'ccache']:
|
||||
path = which(cache)
|
||||
if path:
|
||||
return path
|
||||
return None
|
||||
|
||||
if preference in ('sccache', 'ccache'):
|
||||
path = which(preference)
|
||||
if path:
|
||||
return path
|
||||
print(f"Warning: {preference} not found on PATH, disabling compiler cache")
|
||||
return None
|
||||
|
||||
# Assume it's a path to a binary
|
||||
if os.path.isfile(preference) and os.access(preference, os.X_OK):
|
||||
return preference
|
||||
|
||||
print(f"Warning: compiler cache '{preference}' not found or not executable, disabling compiler cache")
|
||||
return None
|
||||
|
||||
|
||||
modes = {
|
||||
'debug': {
|
||||
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
@@ -813,8 +732,6 @@ arg_parser.add_argument('--compiler', action='store', dest='cxx', default='clang
|
||||
help='C++ compiler path')
|
||||
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clang',
|
||||
help='C compiler path')
|
||||
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
|
||||
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
|
||||
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
|
||||
help='Use dpdk (from seastar dpdk sources)')
|
||||
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
|
||||
@@ -942,7 +859,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/alien_worker.cc',
|
||||
'utils/array-search.cc',
|
||||
'utils/base64.cc',
|
||||
'utils/crypt_sha512.cc',
|
||||
'utils/logalloc.cc',
|
||||
'utils/large_bitset.cc',
|
||||
'utils/buffer_input_stream.cc',
|
||||
@@ -1241,7 +1157,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'locator/topology.cc',
|
||||
'locator/util.cc',
|
||||
'service/client_state.cc',
|
||||
'service/client_routes.cc',
|
||||
'service/storage_service.cc',
|
||||
'service/session.cc',
|
||||
'service/task_manager_module.cc',
|
||||
@@ -1402,8 +1317,6 @@ api = ['api/api.cc',
|
||||
'api/storage_proxy.cc',
|
||||
Json2Code('api/api-doc/cache_service.json'),
|
||||
'api/cache_service.cc',
|
||||
Json2Code('api/api-doc/client_routes.json'),
|
||||
'api/client_routes.cc',
|
||||
Json2Code('api/api-doc/collectd.json'),
|
||||
'api/collectd.cc',
|
||||
Json2Code('api/api-doc/endpoint_snitch_info.json'),
|
||||
@@ -1566,6 +1479,7 @@ deps = {
|
||||
|
||||
pure_boost_tests = set([
|
||||
'test/boost/anchorless_list_test',
|
||||
'test/boost/auth_passwords_test',
|
||||
'test/boost/auth_resource_test',
|
||||
'test/boost/big_decimal_test',
|
||||
'test/boost/caching_options_test',
|
||||
@@ -1698,7 +1612,6 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/schema_registry_test.cc',
|
||||
'test/boost/secondary_index_test.cc',
|
||||
'test/boost/sessions_test.cc',
|
||||
'test/boost/simple_value_with_expiry_test.cc',
|
||||
'test/boost/sstable_compaction_test.cc',
|
||||
'test/boost/sstable_compressor_factory_test.cc',
|
||||
'test/boost/sstable_compression_config_test.cc',
|
||||
@@ -1782,18 +1695,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
|
||||
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
|
||||
|
||||
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
|
||||
|
||||
# We need to link these files to all Boost tests to make sure that
|
||||
# we can execute `--list_json_content` on them. That will produce
|
||||
# a similar result as calling `--list_content={HRF,DOT}`.
|
||||
# Unfortunately, to be able to do that, we're forced to link the
|
||||
# relevant code by hand.
|
||||
for key in deps.keys():
|
||||
for prefix in boost_tests_prefixes:
|
||||
if key.startswith(prefix):
|
||||
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
|
||||
|
||||
wasm_deps = {}
|
||||
|
||||
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'
|
||||
@@ -2098,7 +1999,7 @@ def semicolon_separated(*flags):
|
||||
def real_relpath(path, start):
|
||||
return os.path.relpath(os.path.realpath(path), os.path.realpath(start))
|
||||
|
||||
def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
def configure_seastar(build_dir, mode, mode_config):
|
||||
seastar_cxx_ld_flags = mode_config['cxx_ld_flags']
|
||||
# We want to "undo" coverage for seastar if we have it enabled.
|
||||
if args.coverage:
|
||||
@@ -2145,10 +2046,6 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
'-DSeastar_IO_URING=ON',
|
||||
]
|
||||
|
||||
if compiler_cache:
|
||||
seastar_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
|
||||
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
|
||||
|
||||
if args.stack_guards is not None:
|
||||
stack_guards = 'ON' if args.stack_guards else 'OFF'
|
||||
seastar_cmake_args += ['-DSeastar_STACK_GUARDS={}'.format(stack_guards)]
|
||||
@@ -2180,7 +2077,7 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
|
||||
subprocess.check_call(seastar_cmd, shell=False, cwd=cmake_dir)
|
||||
|
||||
|
||||
def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
|
||||
def configure_abseil(build_dir, mode, mode_config):
|
||||
abseil_cflags = mode_config['lib_cflags']
|
||||
cxx_flags = mode_config['cxxflags']
|
||||
if '-DSANITIZE' in cxx_flags:
|
||||
@@ -2206,10 +2103,6 @@ def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
|
||||
'-DABSL_PROPAGATE_CXX_STD=ON',
|
||||
]
|
||||
|
||||
if compiler_cache:
|
||||
abseil_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
|
||||
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
|
||||
|
||||
cmake_args = abseil_cmake_args[:]
|
||||
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
|
||||
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + cmake_args
|
||||
@@ -2355,6 +2248,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
|
||||
if debuginfo and mode_config['can_have_debug_info']:
|
||||
cxxflags += ['-g', '-gz']
|
||||
|
||||
if 'clang' in cxx:
|
||||
# Since AssignmentTracking was enabled by default in clang
|
||||
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
|
||||
# coroutine frame debugging info (`coro_frame_ty`) is broken.
|
||||
#
|
||||
# It seems that we aren't losing much by disabling AssigmentTracking,
|
||||
# so for now we choose to disable it to get `coro_frame_ty` back.
|
||||
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
|
||||
|
||||
return cxxflags
|
||||
|
||||
|
||||
@@ -2382,15 +2284,10 @@ def write_build_file(f,
|
||||
scylla_product,
|
||||
scylla_version,
|
||||
scylla_release,
|
||||
compiler_cache,
|
||||
args):
|
||||
use_precompiled_header = not args.disable_precompiled_header
|
||||
warnings = get_warning_options(args.cxx)
|
||||
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
|
||||
# If compiler cache is available, prefix the compiler with it
|
||||
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
|
||||
# For Rust, sccache is used via RUSTC_WRAPPER environment variable
|
||||
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache else ''
|
||||
f.write(textwrap.dedent('''\
|
||||
configure_args = {configure_args}
|
||||
builddir = {outdir}
|
||||
@@ -2453,7 +2350,7 @@ def write_build_file(f,
|
||||
command = clang --target=wasm32 --no-standard-libraries -Wl,--export-all -Wl,--no-entry $in -o $out
|
||||
description = C2WASM $out
|
||||
rule rust2wasm
|
||||
command = {rustc_wrapper}cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
|
||||
command = cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
|
||||
&& wasm-opt -Oz $builddir/wasm/{rustc_target}/debug/examples/$example.wasm -o $builddir/wasm/$example.wasm $
|
||||
&& wasm-strip $builddir/wasm/$example.wasm
|
||||
description = RUST2WASM $out
|
||||
@@ -2469,7 +2366,7 @@ def write_build_file(f,
|
||||
command = llvm-profdata merge $in -output=$out
|
||||
''').format(configure_args=configure_args,
|
||||
outdir=outdir,
|
||||
cxx=cxx_with_cache,
|
||||
cxx=args.cxx,
|
||||
user_cflags=user_cflags,
|
||||
warnings=warnings,
|
||||
defines=defines,
|
||||
@@ -2477,7 +2374,6 @@ def write_build_file(f,
|
||||
user_ldflags=user_ldflags,
|
||||
libs=libs,
|
||||
rustc_target=rustc_target,
|
||||
rustc_wrapper=rustc_wrapper,
|
||||
link_pool_depth=link_pool_depth,
|
||||
seastar_path=args.seastar_path,
|
||||
ninja=ninja,
|
||||
@@ -2562,10 +2458,10 @@ def write_build_file(f,
|
||||
description = TEST {mode}
|
||||
# This rule is unused for PGO stages. They use the rust lib from the parent mode.
|
||||
rule rust_lib.{mode}
|
||||
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' {rustc_wrapper}cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
|
||||
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
|
||||
&& touch $out
|
||||
description = RUST_LIB $out
|
||||
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
|
||||
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
|
||||
f.write(
|
||||
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
|
||||
mode=mode,
|
||||
@@ -2629,7 +2525,7 @@ def write_build_file(f,
|
||||
# In debug/sanitize modes, we compile with fsanitizers,
|
||||
# so must use the same options during the link:
|
||||
if '-DSANITIZE' in modes[mode]['cxxflags']:
|
||||
f.write(' libs = -fsanitize=address -fsanitize=undefined -lubsan\n')
|
||||
f.write(' libs = -fsanitize=address -fsanitize=undefined\n')
|
||||
else:
|
||||
f.write(' libs =\n')
|
||||
f.write(f'build $builddir/{mode}/{binary}.stripped: strip $builddir/{mode}/{binary}\n')
|
||||
@@ -3025,9 +2921,6 @@ def create_build_system(args):
|
||||
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_sccache(args, compiler_cache)
|
||||
|
||||
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
|
||||
|
||||
for mode, mode_config in build_modes.items():
|
||||
@@ -3044,8 +2937,8 @@ def create_build_system(args):
|
||||
# {outdir}/{mode}/seastar/build.ninja, and
|
||||
# {outdir}/{mode}/seastar/seastar.pc is queried for building flags
|
||||
for mode, mode_config in build_modes.items():
|
||||
configure_seastar(outdir, mode, mode_config, compiler_cache)
|
||||
configure_abseil(outdir, mode, mode_config, compiler_cache)
|
||||
configure_seastar(outdir, mode, mode_config)
|
||||
configure_abseil(outdir, mode, mode_config)
|
||||
user_cflags += ' -isystem abseil'
|
||||
|
||||
for mode, mode_config in build_modes.items():
|
||||
@@ -3068,7 +2961,6 @@ def create_build_system(args):
|
||||
scylla_product,
|
||||
scylla_version,
|
||||
scylla_release,
|
||||
compiler_cache,
|
||||
args)
|
||||
generate_compdb('compile_commands.json', ninja, args.buildfile, selected_modes)
|
||||
|
||||
@@ -3111,10 +3003,6 @@ def configure_using_cmake(args):
|
||||
selected_modes = args.selected_modes or default_modes
|
||||
selected_configs = ';'.join(build_modes[mode].cmake_build_type for mode
|
||||
in selected_modes)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_sccache(args, compiler_cache)
|
||||
|
||||
settings = {
|
||||
'CMAKE_CONFIGURATION_TYPES': selected_configs,
|
||||
'CMAKE_CROSS_CONFIGS': selected_configs,
|
||||
@@ -3132,14 +3020,6 @@ def configure_using_cmake(args):
|
||||
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
|
||||
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
|
||||
}
|
||||
|
||||
if compiler_cache:
|
||||
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
|
||||
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
|
||||
# For Rust, sccache is used via RUSTC_WRAPPER
|
||||
if 'sccache' in compiler_cache:
|
||||
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
|
||||
|
||||
if args.date_stamp:
|
||||
settings['Scylla_DATE_STAMP'] = args.date_stamp
|
||||
if args.staticboost:
|
||||
@@ -3171,7 +3051,7 @@ def configure_using_cmake(args):
|
||||
|
||||
if not args.dist_only:
|
||||
for mode in selected_modes:
|
||||
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode], compiler_cache)
|
||||
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode])
|
||||
|
||||
cmake_command = ['cmake']
|
||||
cmake_command += [f'-D{var}={value}' for var, value in settings.items()]
|
||||
|
||||
@@ -64,10 +64,6 @@ bool query_processor::topology_global_queue_empty() {
|
||||
return remote().first.get().ss.topology_global_queue_empty();
|
||||
}
|
||||
|
||||
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
|
||||
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
|
||||
}
|
||||
|
||||
static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
|
||||
@@ -474,7 +474,6 @@ public:
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
|
||||
|
||||
query_options make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
|
||||
@@ -1322,10 +1322,6 @@ const std::vector<expr::expression>& statement_restrictions::index_restrictions(
|
||||
return _index_restrictions;
|
||||
}
|
||||
|
||||
bool statement_restrictions::is_empty() const {
|
||||
return !_where.has_value();
|
||||
}
|
||||
|
||||
// Current score table:
|
||||
// local and restrictions include full partition key: 2
|
||||
// global: 1
|
||||
|
||||
@@ -408,8 +408,6 @@ public:
|
||||
|
||||
/// Checks that the primary key restrictions don't contain null values, throws invalid_request_exception otherwise.
|
||||
void validate_primary_key(const query_options& options) const;
|
||||
|
||||
bool is_empty() const;
|
||||
};
|
||||
|
||||
statement_restrictions analyze_statement_restrictions(
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "seastar/coroutine/exception.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
@@ -139,7 +138,6 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
bool unknown_keyspace = false;
|
||||
try {
|
||||
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
|
||||
auto ks = qp.db().find_keyspace(_name);
|
||||
@@ -160,19 +158,14 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
service::topology_mutation_builder builder(ts);
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
rtbuilder.set("done", false);
|
||||
if (!qp.proxy().features().topology_global_request_queue) {
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(global_request_id);
|
||||
@@ -248,15 +241,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
target_type,
|
||||
keyspace());
|
||||
mc.add_mutations(std::move(muts), "CQL alter keyspace");
|
||||
co_return std::make_tuple(std::move(ret), warnings);
|
||||
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
|
||||
} catch (data_dictionary::no_such_keyspace& e) {
|
||||
unknown_keyspace = true;
|
||||
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
if (unknown_keyspace) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
std::unreachable();
|
||||
}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
|
||||
@@ -190,7 +190,7 @@ future<utils::chunked_vector<mutation>> batch_statement::get_mutations(query_pro
|
||||
co_return vresult;
|
||||
}
|
||||
|
||||
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const {
|
||||
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) {
|
||||
if (mutations.size() <= 1) {
|
||||
return; // We only warn for batch spanning multiple mutations
|
||||
}
|
||||
@@ -209,9 +209,8 @@ void batch_statement::verify_batch_size(query_processor& qp, const utils::chunke
|
||||
for (auto&& m : mutations) {
|
||||
ks_cf_pairs.insert(m.schema()->ks_name() + "." + m.schema()->cf_name());
|
||||
}
|
||||
const auto batch_type = _type == type::LOGGED ? "Logged" : "Unlogged";
|
||||
return seastar::format("{} batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
|
||||
batch_type, mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
|
||||
return seastar::format("Batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
|
||||
mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
|
||||
};
|
||||
if (size > fail_threshold) {
|
||||
_logger.error("{}", error("FAIL", fail_threshold).c_str());
|
||||
|
||||
@@ -116,7 +116,7 @@ public:
|
||||
* Checks batch size to ensure threshold is met. If not, a warning is logged.
|
||||
* @param cfs ColumnFamilies that will store the batch's mutations.
|
||||
*/
|
||||
void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const;
|
||||
static void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations);
|
||||
|
||||
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
@@ -61,7 +61,7 @@ expand_to_racks(const locator::token_metadata& tm,
|
||||
|
||||
// Handle ALTER:
|
||||
// ([]|0) -> numeric is allowed, there are no existing replicas
|
||||
// numeric -> numeric' is not supported unless numeric == numeric'. User should convert RF to rack list of equal count first.
|
||||
// numeric -> numeric' is not supported. User should convert RF to rack list of equal count first.
|
||||
// rack_list -> len(rack_list) is allowed (no-op)
|
||||
// rack_list -> numeric is not allowed
|
||||
if (old_options.contains(dc)) {
|
||||
@@ -75,8 +75,6 @@ expand_to_racks(const locator::token_metadata& tm,
|
||||
"Cannot change replication factor for '{}' from {} to numeric {}, use rack list instead",
|
||||
dc, old_rf_val, data.count()));
|
||||
}
|
||||
} else if (old_rf.count() == data.count()) {
|
||||
return rf;
|
||||
} else if (old_rf.count() > 0) {
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Cannot change replication factor for '{}' from {} to {}, only rack list is allowed",
|
||||
@@ -155,8 +153,6 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
}
|
||||
|
||||
// Validate options.
|
||||
bool numeric_to_rack_list_transition = false;
|
||||
bool rf_change = false;
|
||||
for (auto&& [dc, opt] : options) {
|
||||
locator::replication_factor_data rf(opt);
|
||||
|
||||
@@ -166,7 +162,6 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
old_rf = locator::replication_factor_data(i->second);
|
||||
}
|
||||
|
||||
rf_change = rf_change || (old_rf && old_rf->count() != rf.count()) || (!old_rf && rf.count() != 0);
|
||||
if (!rf.is_rack_based()) {
|
||||
if (old_rf && old_rf->is_rack_based() && rf.count() != 0) {
|
||||
if (old_rf->count() != rf.count()) {
|
||||
@@ -192,11 +187,12 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Rack list for '{}' contains duplicate entries", dc));
|
||||
}
|
||||
numeric_to_rack_list_transition = numeric_to_rack_list_transition || (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0);
|
||||
}
|
||||
|
||||
if (numeric_to_rack_list_transition && rf_change) {
|
||||
throw exceptions::configuration_exception("Cannot change replication factor from numeric to rack list and rf value at the same time");
|
||||
if (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0) {
|
||||
// FIXME: Allow this if replicas already conform to the given rack list.
|
||||
// FIXME: Implement automatic colocation to allow transition to rack list.
|
||||
throw exceptions::configuration_exception(fmt::format(
|
||||
"Cannot change replication factor from numeric to rack list for '{}'", dc));
|
||||
}
|
||||
}
|
||||
|
||||
if (!rf && options.empty() && old_options.empty()) {
|
||||
@@ -416,7 +412,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
|
||||
? std::optional<unsigned>(0) : std::nullopt;
|
||||
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
|
||||
bool uses_tablets = initial_tablets.has_value();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
bool rack_list_enabled = feat.rack_list_rf;
|
||||
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
@@ -432,7 +428,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
throw exceptions::invalid_request_exception("Cannot alter replication strategy vnode/tablets flavor");
|
||||
}
|
||||
auto sc = get_replication_strategy_class();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
bool rack_list_enabled = feat.rack_list_rf;
|
||||
if (sc) {
|
||||
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
|
||||
} else {
|
||||
|
||||
@@ -1976,7 +1976,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
if (it == indexes.end()) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
|
||||
}
|
||||
if (index_opt || parameters->allow_filtering() || !(restrictions->is_empty()) || check_needs_allow_filtering_anyway(*restrictions)) {
|
||||
if (index_opt || parameters->allow_filtering() || restrictions->need_filtering() || check_needs_allow_filtering_anyway(*restrictions)) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector does not support filtering");
|
||||
}
|
||||
index_opt = *it;
|
||||
|
||||
@@ -42,11 +42,6 @@ table::get_index_manager() const {
|
||||
return _ops->get_index_manager(*this);
|
||||
}
|
||||
|
||||
db_clock::time_point
|
||||
table::get_truncation_time() const {
|
||||
return _ops->get_truncation_time(*this);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
keyspace::metadata() const {
|
||||
return _ops->get_keyspace_metadata(*this);
|
||||
|
||||
@@ -77,7 +77,6 @@ public:
|
||||
schema_ptr schema() const;
|
||||
const std::vector<view_ptr>& views() const;
|
||||
const secondary_index::secondary_index_manager& get_index_manager() const;
|
||||
db_clock::time_point get_truncation_time() const;
|
||||
};
|
||||
|
||||
class keyspace {
|
||||
|
||||
@@ -27,7 +27,6 @@ public:
|
||||
virtual std::optional<table> try_find_table(database db, table_id id) const = 0;
|
||||
virtual const secondary_index::secondary_index_manager& get_index_manager(table t) const = 0;
|
||||
virtual schema_ptr get_table_schema(table t) const = 0;
|
||||
virtual db_clock::time_point get_truncation_time(table t) const = 0;
|
||||
virtual lw_shared_ptr<keyspace_metadata> get_keyspace_metadata(keyspace ks) const = 0;
|
||||
virtual bool is_internal(keyspace ks) const = 0;
|
||||
virtual const locator::abstract_replication_strategy& get_replication_strategy(keyspace ks) const = 0;
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mutation/mutation.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id);
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id);
|
||||
|
||||
}
|
||||
@@ -10,24 +10,20 @@
|
||||
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include <ranges>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
#include "batchlog_manager.hh"
|
||||
#include "batchlog.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "utils/rate_limiter.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/murmur_hash.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "idl/frozen_schema.dist.hh"
|
||||
@@ -37,94 +33,17 @@
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
static logging::logger blogger("batchlog_manager");
|
||||
|
||||
namespace db {
|
||||
|
||||
// Yields 256 batchlog shards. Even on the largest nodes we currently run on,
|
||||
// this should be enough to give every core a batchlog partition.
|
||||
static constexpr unsigned batchlog_shard_bits = 8;
|
||||
|
||||
int32_t batchlog_shard_of(db_clock::time_point written_at) {
|
||||
const int64_t count = written_at.time_since_epoch().count();
|
||||
std::array<uint64_t, 2> result;
|
||||
utils::murmur_hash::hash3_x64_128(bytes_view(reinterpret_cast<const signed char*>(&count), sizeof(count)), 0, result);
|
||||
uint64_t hash = result[0] ^ result[1];
|
||||
return hash & ((1ULL << batchlog_shard_bits) - 1);
|
||||
}
|
||||
|
||||
std::pair<partition_key, clustering_key>
|
||||
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
||||
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
|
||||
|
||||
std::vector<bytes> ckey_components;
|
||||
ckey_components.reserve(2);
|
||||
ckey_components.push_back(serialized(written_at));
|
||||
if (id) {
|
||||
ckey_components.push_back(serialized(*id));
|
||||
}
|
||||
auto ckey = clustering_key::from_exploded(schema, ckey_components);
|
||||
|
||||
return {std::move(pkey), std::move(ckey)};
|
||||
}
|
||||
|
||||
std::pair<partition_key, clustering_key>
|
||||
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
||||
return get_batchlog_key(schema, version, stage, batchlog_shard_of(written_at), written_at, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
||||
|
||||
auto timestamp = api::new_timestamp();
|
||||
|
||||
mutation m(schema, key);
|
||||
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
|
||||
auto cdef_data = schema->get_column_definition(to_bytes("data"));
|
||||
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
|
||||
|
||||
return m;
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto data = [&mutations] {
|
||||
utils::chunked_vector<canonical_mutation> fm(mutations.begin(), mutations.end());
|
||||
bytes_ostream out;
|
||||
for (auto& m : fm) {
|
||||
ser::serialize(out, m);
|
||||
}
|
||||
return std::move(out).to_managed_bytes();
|
||||
}();
|
||||
|
||||
return get_batchlog_mutation_for(std::move(schema), std::move(data), version, stage, now, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
||||
return get_batchlog_mutation_for(std::move(schema), mutations, version, batchlog_stage::initial, now, id);
|
||||
}
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
||||
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
||||
mutation m(schema, key);
|
||||
auto timestamp = api::new_timestamp();
|
||||
m.partition().apply_delete(*schema, ckey, tombstone(timestamp, gc_clock::now()));
|
||||
return m;
|
||||
}
|
||||
|
||||
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
||||
return get_batchlog_delete_mutation(std::move(schema), version, batchlog_stage::initial, now, id);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
const std::chrono::seconds db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
|
||||
: _qp(qp)
|
||||
, _sys_ks(sys_ks)
|
||||
, _replay_timeout(config.replay_timeout)
|
||||
, _write_request_timeout(std::chrono::duration_cast<db_clock::duration>(config.write_request_timeout))
|
||||
, _replay_rate(config.replay_rate)
|
||||
, _delay(config.delay)
|
||||
, _replay_cleanup_after_replays(config.replay_cleanup_after_replays)
|
||||
@@ -233,75 +152,18 @@ future<> db::batchlog_manager::stop() {
|
||||
}
|
||||
|
||||
future<size_t> db::batchlog_manager::count_all_batches() const {
|
||||
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> rs) {
|
||||
return size_t(rs->one().get_as<int64_t>("count"));
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
|
||||
if (_migration_done) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return with_gate(_gate, [this] () mutable -> future<> {
|
||||
blogger.info("Migrating batchlog entries from v1 -> v2");
|
||||
|
||||
auto schema_v1 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto schema_v2 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
auto batch = [this, schema_v1, schema_v2] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
// check version of serialization format
|
||||
if (!row.has("version")) {
|
||||
blogger.warn("Not migrating logged batch because of unknown version");
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto version = row.get_as<int32_t>("version");
|
||||
if (version != netw::messaging_service::current_version) {
|
||||
blogger.warn("Not migrating logged batch because of incorrect version");
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto data = row.get_blob_fragmented("data");
|
||||
|
||||
auto& sp = _qp.proxy();
|
||||
|
||||
utils::get_local_injector().inject("batchlog_manager_fail_migration", [] { throw std::runtime_error("Error injection: failing batchlog migration"); });
|
||||
|
||||
auto migrate_mut = get_batchlog_mutation_for(schema_v2, std::move(data), version, batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(migrate_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
mutation delete_mut(schema_v1, partition_key::from_single_value(*schema_v1, serialized(id)));
|
||||
delete_mut.partition().apply_delete(*schema_v1, clustering_key_prefix::make_empty(), tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
co_await sp.mutate_locally(delete_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
try {
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
page_size,
|
||||
std::move(batch));
|
||||
} catch (...) {
|
||||
blogger.warn("Batchlog v1 to v2 migration failed: {}; will retry", std::current_exception());
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await container().invoke_on_all([] (auto& bm) {
|
||||
bm._migration_done = true;
|
||||
});
|
||||
|
||||
blogger.info("Done migrating batchlog entries from v1 -> v2");
|
||||
});
|
||||
db_clock::duration db::batchlog_manager::get_batch_log_timeout() const {
|
||||
// enough time for the actual write + BM removal mutation
|
||||
return _write_request_timeout * 2;
|
||||
}
|
||||
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
|
||||
co_await maybe_migrate_v1_to_v2();
|
||||
|
||||
typedef db_clock::rep clock_type;
|
||||
|
||||
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
||||
@@ -310,26 +172,21 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
||||
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
struct replay_stats {
|
||||
std::optional<db_clock::time_point> min_too_fresh;
|
||||
bool need_cleanup = false;
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto delete_batch = [this, schema = std::move(schema)] (utils::UUID id) {
|
||||
auto key = partition_key::from_singular(*schema, id);
|
||||
mutation m(schema, key);
|
||||
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
|
||||
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
|
||||
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
};
|
||||
|
||||
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
||||
|
||||
// Use a stable `now` across all batches, so skip/replay decisions are the
|
||||
// same across a while prefix of written_at (across all ids).
|
||||
const auto now = db_clock::now();
|
||||
|
||||
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
|
||||
const auto batch_shard = row.get_as<int32_t>("shard");
|
||||
auto batch = [this, limiter, delete_batch = std::move(delete_batch), &all_replayed](const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
auto timeout = _replay_timeout;
|
||||
auto now = db_clock::now();
|
||||
auto timeout = get_batch_log_timeout();
|
||||
|
||||
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
||||
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
||||
@@ -337,48 +194,52 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
// check version of serialization format
|
||||
if (!row.has("version")) {
|
||||
blogger.warn("Skipping logged batch because of unknown version");
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto version = row.get_as<int32_t>("version");
|
||||
if (version != netw::messaging_service::current_version) {
|
||||
blogger.warn("Skipping logged batch because of incorrect version {}; current version = {}", version, netw::messaging_service::current_version);
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto data = row.get_blob_unfragmented("data");
|
||||
|
||||
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
|
||||
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
bool send_failed = false;
|
||||
|
||||
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
|
||||
blogger.debug("Replaying batch {}", id);
|
||||
|
||||
try {
|
||||
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
|
||||
auto fms = make_lw_shared<std::deque<canonical_mutation>>();
|
||||
auto in = ser::as_input_stream(data);
|
||||
while (in.size()) {
|
||||
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
|
||||
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
|
||||
if (!tbl) {
|
||||
continue;
|
||||
}
|
||||
if (written_at <= tbl->get_truncation_time()) {
|
||||
continue;
|
||||
}
|
||||
schema_ptr s = tbl->schema();
|
||||
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
fms.emplace_back(std::move(fm), std::move(s));
|
||||
fms->emplace_back(ser::deserialize(in, std::type_identity<canonical_mutation>()));
|
||||
schema_ptr s = _qp.db().find_schema(fms->back().column_family_id());
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
|
||||
if (now < written_at + timeout) {
|
||||
blogger.debug("Skipping replay of {}, too fresh", id);
|
||||
|
||||
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
|
||||
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto size = data.size();
|
||||
|
||||
for (const auto& [fm, s] : fms) {
|
||||
mutations.emplace_back(fm.to_mutation(s));
|
||||
co_await maybe_yield();
|
||||
}
|
||||
auto mutations = co_await map_reduce(*fms, [this, written_at] (canonical_mutation& fm) {
|
||||
const auto& cf = _qp.proxy().local_db().find_column_family(fm.column_family_id());
|
||||
return make_ready_future<canonical_mutation*>(written_at > cf.get_truncation_time() ? &fm : nullptr);
|
||||
},
|
||||
utils::chunked_vector<mutation>(),
|
||||
[this] (utils::chunked_vector<mutation> mutations, canonical_mutation* fm) {
|
||||
if (fm) {
|
||||
schema_ptr s = _qp.db().find_schema(fm->column_family_id());
|
||||
mutations.emplace_back(fm->to_mutation(s));
|
||||
}
|
||||
return mutations;
|
||||
});
|
||||
|
||||
if (!mutations.empty()) {
|
||||
const auto ttl = [written_at]() -> clock_type {
|
||||
@@ -404,11 +265,7 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
co_await limiter->reserve(size);
|
||||
_stats.write_attempts += mutations.size();
|
||||
auto timeout = db::timeout_clock::now() + write_timeout;
|
||||
if (cleanup) {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
|
||||
} else {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
}
|
||||
} catch (data_dictionary::no_such_keyspace& ex) {
|
||||
@@ -422,80 +279,31 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
// Do _not_ remove the batch, assuning we got a node write error.
|
||||
// Since we don't have hints (which origin is satisfied with),
|
||||
// we have to resort to keeping this batch to next lap.
|
||||
if (!cleanup || stage == batchlog_stage::failed_replay) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
send_failed = true;
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto& sp = _qp.proxy();
|
||||
|
||||
if (send_failed) {
|
||||
blogger.debug("Moving batch {} to stage failed_replay", id);
|
||||
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
}
|
||||
|
||||
// delete batch
|
||||
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
|
||||
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
shard_written_at.need_cleanup = true;
|
||||
|
||||
co_await delete_batch(id);
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
|
||||
co_await with_gate(_gate, [this, cleanup, &all_replayed, batch = std::move(batch), now, &replay_stats_per_shard] () mutable -> future<> {
|
||||
blogger.debug("Started replayAllFailedBatches with cleanup: {}", cleanup);
|
||||
co_await with_gate(_gate, [this, cleanup, batch = std::move(batch)] () mutable -> future<> {
|
||||
blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id());
|
||||
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
co_await coroutine::parallel_for_each(std::views::iota(0, 16), [&] (int32_t chunk) -> future<> {
|
||||
const int32_t batchlog_chunk_base = chunk * 16;
|
||||
for (int32_t i = 0; i < 16; ++i) {
|
||||
int32_t batchlog_shard = batchlog_chunk_base + i;
|
||||
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
||||
db::consistency_level::ONE,
|
||||
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::failed_replay)), data_value(batchlog_shard)},
|
||||
page_size,
|
||||
batch);
|
||||
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
||||
db::consistency_level::ONE,
|
||||
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::initial)), data_value(batchlog_shard)},
|
||||
page_size,
|
||||
batch);
|
||||
|
||||
if (cleanup != post_replay_cleanup::yes) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto it = replay_stats_per_shard.find(batchlog_shard);
|
||||
if (it == replay_stats_per_shard.end() || !it->second.need_cleanup) {
|
||||
// Nothing was replayed on this batchlog shard, nothing to cleanup.
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto write_time = it->second.min_too_fresh.value_or(now - _replay_timeout);
|
||||
const auto end_weight = it->second.min_too_fresh ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed;
|
||||
auto [key, ckey] = get_batchlog_key(*schema, netw::messaging_service::current_version, batchlog_stage::initial, batchlog_shard, write_time, {});
|
||||
auto end_pos = position_in_partition(partition_region::clustered, end_weight, std::move(ckey));
|
||||
|
||||
range_tombstone rt(position_in_partition::before_all_clustered_rows(), std::move(end_pos), tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
|
||||
blogger.trace("Clean up batchlog shard {} with range tombstone {}", batchlog_shard, rt);
|
||||
|
||||
mutation m(schema, key);
|
||||
m.partition().apply_row_tombstone(*schema, std::move(rt));
|
||||
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT id, data, written_at, version FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
page_size,
|
||||
std::move(batch)).then([this, cleanup] {
|
||||
if (cleanup == post_replay_cleanup::no) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// Replaying batches could have generated tombstones, flush to disk,
|
||||
// where they can be compacted away.
|
||||
return replica::database::flush_table_on_all_shards(_qp.proxy().get_db(), system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
}).then([] {
|
||||
blogger.debug("Finished replayAllFailedBatches");
|
||||
});
|
||||
|
||||
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
|
||||
});
|
||||
|
||||
co_return all_replayed;
|
||||
|
||||
@@ -34,17 +34,12 @@ class system_keyspace;
|
||||
using all_batches_replayed = bool_class<struct all_batches_replayed_tag>;
|
||||
|
||||
struct batchlog_manager_config {
|
||||
db_clock::duration replay_timeout;
|
||||
std::chrono::duration<double> write_request_timeout;
|
||||
uint64_t replay_rate = std::numeric_limits<uint64_t>::max();
|
||||
std::chrono::milliseconds delay = std::chrono::milliseconds(0);
|
||||
unsigned replay_cleanup_after_replays;
|
||||
};
|
||||
|
||||
enum class batchlog_stage : int8_t {
|
||||
initial,
|
||||
failed_replay
|
||||
};
|
||||
|
||||
class batchlog_manager : public peering_sharded_service<batchlog_manager> {
|
||||
public:
|
||||
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
|
||||
@@ -64,7 +59,7 @@ private:
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
db::system_keyspace& _sys_ks;
|
||||
db_clock::duration _replay_timeout;
|
||||
db_clock::duration _write_request_timeout;
|
||||
uint64_t _replay_rate;
|
||||
std::chrono::milliseconds _delay;
|
||||
unsigned _replay_cleanup_after_replays = 100;
|
||||
@@ -76,14 +71,6 @@ private:
|
||||
|
||||
gc_clock::time_point _last_replay;
|
||||
|
||||
// Was the v1 -> v2 migration already done since last restart?
|
||||
// The migration is attempted once after each restart. This is redundant but
|
||||
// keeps thing simple. Once no upgrade path exists from a ScyllaDB version
|
||||
// which can still produce v1 entries, this migration code can be removed.
|
||||
bool _migration_done = false;
|
||||
|
||||
future<> maybe_migrate_v1_to_v2();
|
||||
|
||||
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
|
||||
public:
|
||||
// Takes a QP, not a distributes. Because this object is supposed
|
||||
@@ -98,13 +85,10 @@ public:
|
||||
future<all_batches_replayed> do_batch_log_replay(post_replay_cleanup cleanup);
|
||||
|
||||
future<size_t> count_all_batches() const;
|
||||
db_clock::duration get_batch_log_timeout() const;
|
||||
gc_clock::time_point get_last_replay() const {
|
||||
return _last_replay;
|
||||
}
|
||||
|
||||
const stats& stats() const {
|
||||
return _stats;
|
||||
}
|
||||
private:
|
||||
future<> batchlog_replay_loop();
|
||||
};
|
||||
|
||||
@@ -54,14 +54,12 @@ public:
|
||||
uint64_t applied_mutations = 0;
|
||||
uint64_t corrupt_bytes = 0;
|
||||
uint64_t truncated_at = 0;
|
||||
uint64_t broken_files = 0;
|
||||
|
||||
stats& operator+=(const stats& s) {
|
||||
invalid_mutations += s.invalid_mutations;
|
||||
skipped_mutations += s.skipped_mutations;
|
||||
applied_mutations += s.applied_mutations;
|
||||
corrupt_bytes += s.corrupt_bytes;
|
||||
broken_files += s.broken_files;
|
||||
return *this;
|
||||
}
|
||||
stats operator+(const stats& s) const {
|
||||
@@ -194,8 +192,6 @@ db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const comm
|
||||
s->corrupt_bytes += e.bytes();
|
||||
} catch (commitlog::segment_truncation& e) {
|
||||
s->truncated_at = e.position();
|
||||
} catch (commitlog::header_checksum_error&) {
|
||||
++s->broken_files;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
@@ -374,9 +370,6 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
|
||||
if (stats.truncated_at != 0) {
|
||||
rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at);
|
||||
}
|
||||
if (stats.broken_files != 0) {
|
||||
rlogger.warn("Corrupted file header: {}. Skipped.", f);
|
||||
}
|
||||
rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
|
||||
18
db/config.cc
18
db/config.cc
@@ -1105,14 +1105,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142,
|
||||
"Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_transport_port_proxy_protocol(this, "native_transport_port_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the CQL native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, native_transport_port_ssl_proxy_protocol(this, "native_transport_port_ssl_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the CQL TLS native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, native_shard_aware_transport_port_proxy_protocol(this, "native_shard_aware_transport_port_proxy_protocol", value_status::Used, 0,
|
||||
"Like native_transport_port_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_shard_aware_transport_port_ssl_proxy_protocol(this, "native_shard_aware_transport_port_ssl_proxy_protocol", value_status::Used, 0,
|
||||
"Like native_transport_port_ssl_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128,
|
||||
"The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n"
|
||||
"Default is different (128 versus unlimited).\n"
|
||||
@@ -1160,7 +1152,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Number of threads with which to deliver hints. In multiple data-center deployments, consider increasing this number because cross data-center handoff is generally slower.")
|
||||
, batchlog_replay_throttle_in_kb(this, "batchlog_replay_throttle_in_kb", value_status::Unused, 1024,
|
||||
"Total maximum throttle. Throttling is reduced proportionally to the number of nodes in the cluster.")
|
||||
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 1,
|
||||
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 60,
|
||||
"Clean up batchlog memtable after every N replays. Replays are issued on a timer, every 60 seconds. So if batchlog_replay_cleanup_after_replays is set to 60, the batchlog memtable is flushed every 60 * 60 seconds.")
|
||||
/**
|
||||
* @Group Request scheduler properties
|
||||
@@ -1478,8 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
|
||||
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
|
||||
, alternator_describe_table_info_cache_validity_in_seconds(this, "alternator_describe_table_info_cache_validity_in_seconds", liveness::LiveUpdate, value_status::Used, 60 * 60 * 6,
|
||||
"The validity of DescribeTable information - table size in bytes. This is how long calculated value will be reused before recalculation.")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
|
||||
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
|
||||
@@ -1576,12 +1566,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
|
||||
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
|
||||
"Tablet load stats refresh rate in seconds.")
|
||||
, force_capacity_based_balancing(this, "force_capacity_based_balancing", liveness::LiveUpdate, value_status::Used, false,
|
||||
"Forces the load balancer to perform capacity based balancing, instead of size based balancing.")
|
||||
, size_based_balance_threshold_percentage(this, "size_based_balance_threshold_percentage", liveness::LiveUpdate, value_status::Used, 1.0,
|
||||
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
|
||||
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
|
||||
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")
|
||||
|
||||
@@ -324,10 +324,6 @@ public:
|
||||
named_value<uint16_t> native_transport_port_ssl;
|
||||
named_value<uint16_t> native_shard_aware_transport_port;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_ssl;
|
||||
named_value<uint16_t> native_transport_port_proxy_protocol;
|
||||
named_value<uint16_t> native_transport_port_ssl_proxy_protocol;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_proxy_protocol;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_ssl_proxy_protocol;
|
||||
named_value<uint32_t> native_transport_max_threads;
|
||||
named_value<uint32_t> native_transport_max_frame_size_in_mb;
|
||||
named_value<sstring> broadcast_rpc_address;
|
||||
@@ -477,7 +473,6 @@ public:
|
||||
named_value<bool> alternator_allow_system_table_write;
|
||||
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
|
||||
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
|
||||
named_value<uint32_t> alternator_describe_table_info_cache_validity_in_seconds;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
@@ -595,9 +590,6 @@ public:
|
||||
named_value<bool> rf_rack_valid_keyspaces;
|
||||
|
||||
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
|
||||
named_value<bool> force_capacity_based_balancing;
|
||||
named_value<float> size_based_balance_threshold_percentage;
|
||||
named_value<uint64_t> minimal_tablet_size_for_balancing;
|
||||
|
||||
static const sstring default_tls_priority;
|
||||
private:
|
||||
|
||||
@@ -248,7 +248,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
|
||||
// which is larger than the segment ID of the RP of the last written hint.
|
||||
cfg.base_segment_id = _last_written_rp.base_id();
|
||||
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (this auto, commitlog l) -> future<commitlog> {
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) -> future<commitlog> {
|
||||
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
|
||||
// When this happens we want to refill _sender's segments only if it has finished with the segments he had before.
|
||||
if (_sender.have_segments()) {
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
#include <seastar/core/smp.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/util/file.hh>
|
||||
|
||||
// Boost features.
|
||||
|
||||
@@ -900,7 +899,7 @@ future<> manager::migrate_ip_directories() {
|
||||
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
|
||||
try {
|
||||
manager_logger.warn("Removing hint directory {}", directory.native());
|
||||
co_await seastar::recursive_remove_directory(directory);
|
||||
co_await lister::rmdir(directory);
|
||||
} catch (...) {
|
||||
on_internal_error(manager_logger,
|
||||
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "object_storage_endpoint_param.hh"
|
||||
|
||||
using namespace std::string_literals;
|
||||
@@ -19,6 +21,9 @@ using namespace std::string_literals;
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(s3_storage s)
|
||||
: _data(std::move(s))
|
||||
{}
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config)
|
||||
: object_storage_endpoint_param(s3_storage{std::move(endpoint), std::move(config)})
|
||||
{}
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(gs_storage s)
|
||||
: _data(std::move(s))
|
||||
{}
|
||||
@@ -27,8 +32,8 @@ db::object_storage_endpoint_param::object_storage_endpoint_param() = default;
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(const object_storage_endpoint_param&) = default;
|
||||
|
||||
std::string db::object_storage_endpoint_param::s3_storage::to_json_string() const {
|
||||
return fmt::format("{{ \"type\": \"s3\", \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
|
||||
region, iam_role_arn
|
||||
return fmt::format("{{ \"port\": {}, \"use_https\": {}, \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
|
||||
config.port, config.use_https, config.region, config.role_arn
|
||||
);
|
||||
}
|
||||
|
||||
@@ -94,6 +99,8 @@ const std::string& db::object_storage_endpoint_param::type() const {
|
||||
|
||||
db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(const YAML::Node& node) {
|
||||
auto name = node["name"];
|
||||
auto aws_region = node["aws_region"];
|
||||
auto iam_role_arn = node["iam_role_arn"];
|
||||
auto type = node["type"];
|
||||
|
||||
auto get_opt = [](auto& node, const std::string& key, auto def) {
|
||||
@@ -101,19 +108,13 @@ db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(cons
|
||||
return tmp ? tmp.template as<std::decay_t<decltype(def)>>() : def;
|
||||
};
|
||||
// aws s3 endpoint.
|
||||
if (!type || type.as<std::string>() == s3_type ) {
|
||||
if (!type || type.as<std::string>() == s3_type || aws_region || iam_role_arn) {
|
||||
s3_storage ep;
|
||||
ep.endpoint = name.as<std::string>();
|
||||
auto aws_region = node["aws_region"];
|
||||
ep.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
|
||||
ep.iam_role_arn = get_opt(node, "iam_role_arn", ""s);
|
||||
|
||||
if (maybe_legacy_endpoint_name(ep.endpoint)) {
|
||||
// Support legacy config for a while
|
||||
auto port = node["port"].as<unsigned>();
|
||||
auto use_https = node["https"].as<bool>(false);
|
||||
ep.endpoint = fmt::format("http{}://{}:{}", use_https ? "s" : "", ep.endpoint, port);
|
||||
}
|
||||
ep.config.port = node["port"].as<unsigned>();
|
||||
ep.config.use_https = node["https"].as<bool>(false);
|
||||
ep.config.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
|
||||
ep.config.role_arn = iam_role_arn ? iam_role_arn.as<std::string>() : "";
|
||||
|
||||
return object_storage_endpoint_param{std::move(ep)};
|
||||
}
|
||||
@@ -134,5 +135,5 @@ const std::string db::object_storage_endpoint_param::gs_type = "gs";
|
||||
|
||||
auto fmt::formatter<db::object_storage_endpoint_param>::format(const db::object_storage_endpoint_param& e, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{}", e.to_json_string());
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{{}}", e.to_json_string());
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <variant>
|
||||
#include <compare>
|
||||
#include <fmt/core.h>
|
||||
#include "utils/s3/creds.hh"
|
||||
|
||||
namespace YAML {
|
||||
class Node;
|
||||
@@ -24,8 +25,7 @@ class object_storage_endpoint_param {
|
||||
public:
|
||||
struct s3_storage {
|
||||
std::string endpoint;
|
||||
std::string region;
|
||||
std::string iam_role_arn;
|
||||
s3::endpoint_config config;
|
||||
|
||||
std::strong_ordering operator<=>(const s3_storage&) const = default;
|
||||
std::string to_json_string() const;
|
||||
@@ -43,6 +43,7 @@ public:
|
||||
object_storage_endpoint_param();
|
||||
object_storage_endpoint_param(const object_storage_endpoint_param&);
|
||||
object_storage_endpoint_param(s3_storage);
|
||||
object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config);
|
||||
object_storage_endpoint_param(gs_storage);
|
||||
|
||||
std::strong_ordering operator<=>(const object_storage_endpoint_param&) const;
|
||||
@@ -76,7 +77,3 @@ template <>
|
||||
struct fmt::formatter<db::object_storage_endpoint_param> : fmt::formatter<std::string_view> {
|
||||
auto format(const db::object_storage_endpoint_param&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
inline bool maybe_legacy_endpoint_name(std::string_view ep) noexcept {
|
||||
return !(ep.starts_with("http://") || ep.starts_with("https://"));
|
||||
}
|
||||
|
||||
@@ -1262,9 +1262,16 @@ static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
schema_applier ap(proxy, ss, sys_ks, reload);
|
||||
co_await execute_do_merge_schema(proxy, ap, std::move(mutations)).finally([&ap]() {
|
||||
return ap.destroy();
|
||||
});
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await execute_do_merge_schema(proxy, ap, std::move(mutations));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await ap.destroy();
|
||||
if (ex) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -55,7 +55,6 @@
|
||||
#include "message/shared_dict.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "db/compaction_history_entry.hh"
|
||||
#include "mutation/async_utils.hh"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -111,7 +110,6 @@ namespace {
|
||||
system_keyspace::v3::CDC_LOCAL,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.enable_schema_commitlog();
|
||||
@@ -139,7 +137,6 @@ namespace {
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
@@ -216,30 +213,6 @@ schema_ptr system_keyspace::batchlog() {
|
||||
return batchlog;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::batchlog_v2() {
|
||||
static thread_local auto batchlog_v2 = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BATCHLOG_V2), NAME, BATCHLOG_V2,
|
||||
// partition key
|
||||
{{"version", int32_type}, {"stage", byte_type}, {"shard", int32_type}},
|
||||
// clustering key
|
||||
{{"written_at", timestamp_type}, {"id", uuid_type}},
|
||||
// regular columns
|
||||
{{"data", bytes_type}},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"batches awaiting replay"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
builder.with_hash_version();
|
||||
return builder.build(schema_builder::compact_storage::no);
|
||||
}();
|
||||
return batchlog_v2;
|
||||
}
|
||||
|
||||
/*static*/ schema_ptr system_keyspace::paxos() {
|
||||
static thread_local auto paxos = [] {
|
||||
// FIXME: switch to the new schema_builder interface (with_column(...), etc)
|
||||
@@ -312,7 +285,6 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -1419,23 +1391,6 @@ schema_ptr system_keyspace::view_building_tasks() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::client_routes() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, CLIENT_ROUTES);
|
||||
return schema_builder(NAME, CLIENT_ROUTES, std::make_optional(id))
|
||||
.with_column("connection_id", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("address", utf8_type)
|
||||
.with_column("port", int32_type)
|
||||
.with_column("tls_port", int32_type)
|
||||
.with_column("alternator_port", int32_type)
|
||||
.with_column("alternator_https_port", int32_type)
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
future<system_keyspace::local_info> system_keyspace::load_local_info() {
|
||||
auto msg = co_await execute_cql(format("SELECT host_id, cluster_name, data_center, rack FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL));
|
||||
|
||||
@@ -2349,7 +2304,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r));
|
||||
auto auth_tables = system_keyspace::auth_tables();
|
||||
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), batchlog_v2(), paxos(), local(),
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
|
||||
peers(), peer_events(), range_xfers(),
|
||||
compactions_in_progress(), compaction_history(),
|
||||
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
|
||||
@@ -2363,7 +2318,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
v3::cdc_local(),
|
||||
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
|
||||
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
|
||||
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
|
||||
dicts(), view_building_tasks(), cdc_streams_state(), cdc_streams_history()
|
||||
});
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
||||
@@ -2380,9 +2335,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s) {
|
||||
return (s.get() == system_keyspace::batchlog().get())
|
||||
|| (s.get() == system_keyspace::batchlog_v2().get())
|
||||
|| (s.get() == system_keyspace::paxos().get())
|
||||
return (s.get() == system_keyspace::batchlog().get()) || (s.get() == system_keyspace::paxos().get())
|
||||
|| s == system_keyspace::v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
@@ -3000,9 +2953,7 @@ future<mutation> system_keyspace::get_group0_history(sharded<replica::database>&
|
||||
SCYLLA_ASSERT(rs);
|
||||
auto& ps = rs->partitions();
|
||||
for (auto& p: ps) {
|
||||
// Note: we could decorate the frozen_mutation's key to check if it's the expected one
|
||||
// but since this is a single partition table, we can just check after unfreezing the whole mutation.
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
auto mut = p.mut().unfreeze(s);
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
||||
if (partition_key == GROUP0_HISTORY_KEY) {
|
||||
co_return mut;
|
||||
@@ -3160,10 +3111,7 @@ static bool must_have_tokens(service::node_state nst) {
|
||||
// A decommissioning node doesn't have tokens at the end, they are
|
||||
// removed during transition to the left_token_ring state.
|
||||
case service::node_state::decommissioning: return false;
|
||||
// A removing node might or might not have tokens depending on whether
|
||||
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
|
||||
// cases, we allow removing nodes to not have tokens.
|
||||
case service::node_state::removing: return false;
|
||||
case service::node_state::removing: return true;
|
||||
case service::node_state::rebuilding: return true;
|
||||
case service::node_state::normal: return true;
|
||||
case service::node_state::left: return false;
|
||||
@@ -3403,12 +3351,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("paused_rf_change_requests")) {
|
||||
for (auto&& v : deserialize_set_column(*topology(), some_row, "paused_rf_change_requests")) {
|
||||
ret.paused_rf_change_requests.insert(value_cast<utils::UUID>(v));
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
@@ -3620,43 +3562,35 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
return entry;
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id) {
|
||||
auto r = co_await get_topology_request_entry_opt(id);
|
||||
if (!r) {
|
||||
on_internal_error(slogger, format("no entry for request id {}", id));
|
||||
}
|
||||
co_return std::move(*r);
|
||||
}
|
||||
|
||||
future<std::optional<system_keyspace::topology_requests_entry>> system_keyspace::get_topology_request_entry_opt(utils::UUID id) {
|
||||
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id, bool require_entry) {
|
||||
auto rs = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
|
||||
|
||||
if (!rs || rs->empty()) {
|
||||
co_return std::nullopt;
|
||||
if (require_entry) {
|
||||
on_internal_error(slogger, format("no entry for request id {}", id));
|
||||
} else {
|
||||
co_return topology_requests_entry{
|
||||
.id = utils::null_uuid()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const auto& row = rs->one();
|
||||
co_return topology_request_row_to_entry(id, row);
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit) {
|
||||
sstring request_types_str = "";
|
||||
bool first = true;
|
||||
for (const auto& rt : request_types) {
|
||||
if (!std::exchange(first, false)) {
|
||||
request_types_str += ", ";
|
||||
}
|
||||
request_types_str += std::visit([] (auto&& arg) { return fmt::format("'{}'", arg); }, rt);
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
|
||||
// Running requests.
|
||||
auto rs_running = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, request_types_str));
|
||||
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS,
|
||||
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
|
||||
|
||||
|
||||
// Requests which finished after end_time_limit.
|
||||
auto rs_done = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(), request_types_str));
|
||||
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(),
|
||||
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
|
||||
|
||||
topology_requests_entries m;
|
||||
for (const auto& row: *rs_done) {
|
||||
@@ -3674,16 +3608,6 @@ future<system_keyspace::topology_requests_entries> system_keyspace::get_topology
|
||||
co_return m;
|
||||
}
|
||||
|
||||
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
|
||||
return get_topology_request_entries({
|
||||
service::topology_request::join,
|
||||
service::topology_request::replace,
|
||||
service::topology_request::rebuild,
|
||||
service::topology_request::leave,
|
||||
service::topology_request::remove
|
||||
}, end_time_limit);
|
||||
}
|
||||
|
||||
future<mutation> system_keyspace::get_insert_dict_mutation(
|
||||
std::string_view name,
|
||||
bytes data,
|
||||
|
||||
@@ -163,7 +163,6 @@ public:
|
||||
static constexpr auto NAME = "system";
|
||||
static constexpr auto HINTS = "hints";
|
||||
static constexpr auto BATCHLOG = "batchlog";
|
||||
static constexpr auto BATCHLOG_V2 = "batchlog_v2";
|
||||
static constexpr auto PAXOS = "paxos";
|
||||
static constexpr auto BUILT_INDEXES = "IndexInfo";
|
||||
static constexpr auto LOCAL = "local";
|
||||
@@ -199,8 +198,6 @@ public:
|
||||
static constexpr auto VIEW_BUILD_STATUS_V2 = "view_build_status_v2";
|
||||
static constexpr auto DICTS = "dicts";
|
||||
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
|
||||
static constexpr auto CLIENT_ROUTES = "client_routes";
|
||||
static constexpr auto VERSIONS = "versions";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
@@ -258,7 +255,6 @@ public:
|
||||
|
||||
static schema_ptr hints();
|
||||
static schema_ptr batchlog();
|
||||
static schema_ptr batchlog_v2();
|
||||
static schema_ptr paxos();
|
||||
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
static schema_ptr raft();
|
||||
@@ -278,7 +274,6 @@ public:
|
||||
static schema_ptr view_build_status_v2();
|
||||
static schema_ptr dicts();
|
||||
static schema_ptr view_building_tasks();
|
||||
static schema_ptr client_routes();
|
||||
|
||||
// auth
|
||||
static schema_ptr roles();
|
||||
@@ -670,9 +665,7 @@ public:
|
||||
|
||||
future<service::topology_request_state> get_topology_request_state(utils::UUID id, bool require_entry);
|
||||
topology_requests_entry topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row);
|
||||
future<topology_requests_entry> get_topology_request_entry(utils::UUID id);
|
||||
future<std::optional<topology_requests_entry>> get_topology_request_entry_opt(utils::UUID id);
|
||||
future<system_keyspace::topology_requests_entries> get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit);
|
||||
future<topology_requests_entry> get_topology_request_entry(utils::UUID id, bool require_entry);
|
||||
future<topology_requests_entries> get_node_ops_request_entries(db_clock::time_point end_time_limit);
|
||||
|
||||
public:
|
||||
|
||||
399
db/view/view.cc
399
db/view/view.cc
@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
|
||||
&& std::ranges::contains(shards, this_shard_id());
|
||||
}
|
||||
|
||||
static endpoints_to_update get_view_natural_endpoint_vnodes(
|
||||
locator::host_id me,
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
locator::endpoint_dc_rack my_location,
|
||||
const locator::network_topology_strategy* network_topology,
|
||||
replica::cf_stats& cf_stats) {
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
auto& my_datacenter = my_location.dc;
|
||||
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (!network_topology || node.get().dc() == my_datacenter) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
}
|
||||
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
|
||||
}
|
||||
|
||||
static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
replica::cf_stats& cf_stats) {
|
||||
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
|
||||
for (auto&& base_node : base_nodes) {
|
||||
if (base_dc_racks.contains(base_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
|
||||
base_node.get().dc(), base_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
base_dc_racks.insert(base_node.get().dc_rack());
|
||||
}
|
||||
|
||||
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
|
||||
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
|
||||
view_node.get().dc(), view_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
// Track unpaired replicas in both sets
|
||||
if (base_dc_racks.contains(view_node.get().dc_rack())) {
|
||||
paired_view_dc_racks.insert(view_node.get().dc_rack());
|
||||
} else {
|
||||
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
|
||||
}
|
||||
}
|
||||
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// There are view replicas that can't be paired with any base replica
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
|
||||
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
unpaired_view_dc_rack_replicas | std::views::values);
|
||||
}
|
||||
return extra_replica;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// Calculate the node ("natural endpoint") to which this node should send
|
||||
// a view update.
|
||||
//
|
||||
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
||||
// of this function is to find, assuming that this node is one of the base
|
||||
// replicas for a given partition, the paired view replica.
|
||||
//
|
||||
// When using vnodes, we have an optimization called "self-pairing" - if a single
|
||||
// node is both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node sends the update to itself and this node is removed
|
||||
// from the lists of nodes paired by index. This self-pairing optimization can
|
||||
// cause the pairing to change after view ranges are moved between nodes.
|
||||
// In the past, we used an optimization called "self-pairing" that if a single
|
||||
// node was both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node would send the update to itself. This self-
|
||||
// pairing optimization could cause the pairing to change after view ranges
|
||||
// are moved between nodes, so currently we only use it if
|
||||
// use_legacy_self_pairing is set to true. When using tablets - where range
|
||||
// movements are common - it is strongly recommended to set it to false.
|
||||
//
|
||||
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
|
||||
// we pair only nodes in the same datacenter.
|
||||
//
|
||||
// If the table uses tablets, then pairing is rack-aware. In this case, in each
|
||||
// rack where we have a base replica there is also one replica of each view tablet.
|
||||
// Therefore, the base replicas are naturally paired with the view replicas that
|
||||
// are in the same rack.
|
||||
// When use_legacy_self_pairing is enabled, if one of the base replicas
|
||||
// also happens to be a view replica, it is paired with itself
|
||||
// (with the other nodes paired by order in the list
|
||||
// after taking this node out).
|
||||
//
|
||||
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
|
||||
// and the replication factor in the node's datacenter is a multiple of the number
|
||||
// of racks in the datacenter, then pairing is rack-aware. In this case,
|
||||
// all racks have the same number of replicas, and those are never migrated
|
||||
// outside their racks. Therefore, the base replicas are naturally paired with the
|
||||
// view replicas that are in the same rack, based on the ordinal position.
|
||||
// Note that typically, there is a single replica per rack and pairing is trivial.
|
||||
//
|
||||
// If the assumption that the given base token belongs to this replica
|
||||
// does not hold, we return an empty optional.
|
||||
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_tablets,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats) {
|
||||
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& my_location = topology.get_location(me);
|
||||
auto& my_datacenter = my_location.dc;
|
||||
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
|
||||
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
|
||||
bool simple_rack_aware_pairing = false;
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector orig_base_endpoints, orig_view_endpoints;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
|
||||
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
|
||||
if (auto* np = topology.find_node(ep)) {
|
||||
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
|
||||
// We need to use get_replicas() for pairing to be stable in case base or view tablet
|
||||
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
|
||||
return resolve(topology, ep, false);
|
||||
}) | std::ranges::to<node_vector>();
|
||||
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
|
||||
auto leaving_base = it->get().host_id();
|
||||
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
|
||||
view_token, use_tablets, cf_stats);
|
||||
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!use_tablets) {
|
||||
return get_view_natural_endpoint_vnodes(
|
||||
me,
|
||||
base_nodes,
|
||||
view_nodes,
|
||||
my_location,
|
||||
network_topology,
|
||||
cf_stats);
|
||||
std::function<bool(const locator::node&)> is_candidate;
|
||||
if (network_topology) {
|
||||
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
|
||||
} else {
|
||||
is_candidate = [&] (const locator::node&) { return true; };
|
||||
}
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (is_candidate(node)) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
}
|
||||
|
||||
std::optional<locator::host_id> paired_replica;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (view_node.get().dc_rack() == my_location) {
|
||||
paired_replica = view_node.get().host_id();
|
||||
break;
|
||||
if (use_legacy_self_pairing) {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (is_candidate(view_node)) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
process_candidate(view_endpoints, view_node);
|
||||
}
|
||||
}
|
||||
if (paired_replica && base_nodes.size() == view_nodes.size()) {
|
||||
// We don't need to find any extra replicas, so we can return early
|
||||
return {.natural_endpoint = paired_replica};
|
||||
|
||||
// Try optimizing for simple rack-aware pairing
|
||||
// If the numbers of base and view replica differ, that means an RF change is taking place
|
||||
// and we can't use simple rack-aware pairing.
|
||||
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
|
||||
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
|
||||
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
|
||||
// Simple rack-aware pairing is possible when the datacenter replication factor
|
||||
// is a multiple of the number of racks in the datacenter.
|
||||
if (dc_rf % racks.size() == 0) {
|
||||
simple_rack_aware_pairing = true;
|
||||
size_t rack_rf = dc_rf / racks.size();
|
||||
// If any rack doesn't have enough nodes to satisfy the per-rack rf
|
||||
// simple rack-aware pairing is disabled.
|
||||
for (const auto& [rack, nodes] : racks) {
|
||||
if (nodes.size() < rack_rf) {
|
||||
simple_rack_aware_pairing = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (dc_rf != base_endpoints.size()) {
|
||||
// If the datacenter replication factor is not equal to the number of base replicas,
|
||||
// we're in progress of a RF change and we can't use simple rack-aware pairing.
|
||||
simple_rack_aware_pairing = false;
|
||||
}
|
||||
if (simple_rack_aware_pairing) {
|
||||
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
}
|
||||
}
|
||||
if (!paired_replica) {
|
||||
// We couldn't find any view replica in our rack
|
||||
|
||||
orig_base_endpoints = base_endpoints;
|
||||
orig_view_endpoints = view_endpoints;
|
||||
|
||||
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
|
||||
// Use best-match, for the minimum number of base and view replicas in each rack,
|
||||
// and ordinal match for the rest.
|
||||
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
|
||||
if (rack_aware_pairing && !simple_rack_aware_pairing) {
|
||||
struct indexed_replica {
|
||||
size_t idx;
|
||||
std::reference_wrapper<const locator::node> node;
|
||||
};
|
||||
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
|
||||
|
||||
// First, index all replicas by rack
|
||||
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
|
||||
size_t idx = 0;
|
||||
for (const auto& r: replicas) {
|
||||
racks[r.get().rack()].emplace_back(idx++, r);
|
||||
}
|
||||
};
|
||||
index_replica_set(base_racks, base_endpoints);
|
||||
index_replica_set(view_racks, view_endpoints);
|
||||
|
||||
// Try optimistically pairing `me` first
|
||||
const auto& my_base_replicas = base_racks[my_location.rack];
|
||||
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
|
||||
if (base_it == my_base_replicas.end()) {
|
||||
return {};
|
||||
}
|
||||
const auto& my_view_replicas = view_racks[my_location.rack];
|
||||
size_t idx = base_it - my_base_replicas.begin();
|
||||
if (idx < my_view_replicas.size()) {
|
||||
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
|
||||
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
|
||||
} else {
|
||||
// If the number of view replicas is larger than the number of base replicas,
|
||||
// we need to find the unpaired view replica, so we can't return yet.
|
||||
paired_replica = my_view_replicas[idx].node;
|
||||
}
|
||||
}
|
||||
|
||||
// Collect all unpaired base and view replicas,
|
||||
// where the number of replicas in the base rack is different than the respective view rack
|
||||
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
|
||||
for (const auto& [rack, base_replicas] : base_racks) {
|
||||
const auto& view_replicas = view_racks[rack];
|
||||
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
|
||||
unpaired_base_replicas.emplace_back(base_replicas[i]);
|
||||
}
|
||||
}
|
||||
for (const auto& [rack, view_replicas] : view_racks) {
|
||||
const auto& base_replicas = base_racks[rack];
|
||||
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
|
||||
unpaired_view_replicas.emplace_back(view_replicas[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by the original ordinality, and copy the sorted results
|
||||
// back into {base,view}_endpoints, for backward compatible processing below.
|
||||
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
base_endpoints.clear();
|
||||
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
|
||||
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
view_endpoints.clear();
|
||||
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (!paired_replica && base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
|
||||
if (!paired_replica && idx >= view_endpoints.size()) {
|
||||
// There are fewer view replicas than base replicas
|
||||
// FIXME: This might still happen when reducing replication factor with tablets,
|
||||
// see https://github.com/scylladb/scylladb/issues/21492
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
|
||||
me,
|
||||
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
|
||||
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
} else if (base_endpoints.size() < view_endpoints.size()) {
|
||||
// There are fewer base replicas than view replicas.
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_endpoints.back();
|
||||
if (base_endpoints.size() < view_endpoints.size() - 1) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
}
|
||||
}
|
||||
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
|
||||
return {.natural_endpoint = paired_replica,
|
||||
.endpoint_with_no_pairing = no_pairing_replica};
|
||||
|
||||
if (!paired_replica) {
|
||||
paired_replica = view_endpoints[idx];
|
||||
}
|
||||
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
|
||||
// This can happen when the view replica with no pairing is in another DC.
|
||||
// We need to send an update to it if there are no base replicas in that DC yet,
|
||||
// as it won't receive updates otherwise.
|
||||
std::unordered_set<sstring> dcs_with_base_replicas;
|
||||
for (const auto& base_node : base_nodes) {
|
||||
dcs_with_base_replicas.insert(base_node.get().dc());
|
||||
}
|
||||
for (const auto& view_node : view_nodes) {
|
||||
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// https://github.com/scylladb/scylladb/issues/19439
|
||||
// With tablets, a node being replaced might transition to "left" state
|
||||
// but still be kept as a replica.
|
||||
// As of writing this hints are not prepared to handle nodes that are left
|
||||
// but are still replicas. Therefore, there is no other sensible option
|
||||
// right now but to give up attempt to send the update or write a hint
|
||||
// to the paired, permanently down replica.
|
||||
// We use the same workaround for the extra replica.
|
||||
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
|
||||
if (!replica) {
|
||||
return std::nullopt;
|
||||
}
|
||||
const auto& node = replica->get();
|
||||
if (!node.left()) {
|
||||
return node.host_id();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
|
||||
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
|
||||
{
|
||||
auto& ks = _db.find_keyspace(base->ks_name());
|
||||
auto& replication = ks.get_replication_strategy();
|
||||
// We set legacy self-pairing for old vnode-based tables (for backward
|
||||
// compatibility), and unset it for tablets - where range movements
|
||||
// are more frequent and backward compatibility is less important.
|
||||
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
|
||||
// on a view, like we have the synchronous_updates_flag.
|
||||
bool use_legacy_self_pairing = !ks.uses_tablets();
|
||||
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
|
||||
auto get_erm = [&] (table_id id) {
|
||||
auto it = erms.find(id);
|
||||
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
|
||||
for (const auto& mut : view_updates) {
|
||||
(void)get_erm(mut.s->id());
|
||||
}
|
||||
// Enable rack-aware view updates pairing for tablets
|
||||
// when the cluster feature is enabled so that all replicas agree
|
||||
// on the pairing algorithm.
|
||||
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
|
||||
auto me = base_ermp->get_topology().my_host_id();
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
||||
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto view_ermp = erms.at(mut.s->id());
|
||||
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
|
||||
ks.uses_tablets(), cf_stats);
|
||||
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
|
||||
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
|
||||
if (no_pairing_endpoint) {
|
||||
|
||||
@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_tablets,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_basic_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats);
|
||||
|
||||
/// Verify that the provided keyspace is eligible for storing materialized views.
|
||||
|
||||
@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
|
||||
} catch (...) {
|
||||
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
|
||||
sleep = true;
|
||||
}
|
||||
|
||||
if (sleep) {
|
||||
vbw_logger.debug("Sleeping after exception.");
|
||||
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
|
||||
|
||||
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
|
||||
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
|
||||
auto it = vbw._state._batch->tasks.begin();
|
||||
while (it != vbw._state._batch->tasks.end()) {
|
||||
auto id = it->first;
|
||||
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
|
||||
|
||||
++it; // Advance the iterator before potentially removing the entry from the map.
|
||||
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
|
||||
for (auto& [id, t]: tasks_map) {
|
||||
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
|
||||
if (!task_opt || task_opt->get().aborted) {
|
||||
co_await vbw._state._batch->abort_task(id);
|
||||
}
|
||||
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
if (processing_base_table != building_state.currently_processed_base_table) {
|
||||
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
|
||||
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
tasks.insert({id, *task_opt});
|
||||
}
|
||||
#ifdef SEASTAR_DEBUG
|
||||
{
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
co_return collect_completed_tasks();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -605,8 +605,8 @@ public:
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
|
||||
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
|
||||
.with_column("key", utf8_type, column_kind::partition_key)
|
||||
.with_column("version", utf8_type)
|
||||
.with_column("build_mode", utf8_type)
|
||||
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
|
||||
.with_column("ssl_protocol", utf8_type)
|
||||
.with_column("username", utf8_type)
|
||||
.with_column("scheduling_group", utf8_type)
|
||||
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Collect
|
||||
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
|
||||
using client_data_vec = utils::chunked_vector<client_data>;
|
||||
using shard_client_data = std::vector<client_data_vec>;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
|
||||
cd_vec.resize(smp::count);
|
||||
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
|
||||
for (unsigned i = 0; i < smp::count; i++) {
|
||||
for (auto&& ps_cdc : *cd_vec[i]) {
|
||||
for (auto&& cd : ps_cdc) {
|
||||
if (cd_map.contains(cd->ip)) {
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
if (cd_map.contains(cd.ip)) {
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
} else {
|
||||
dht::decorated_key key = make_partition_key(cd->ip);
|
||||
dht::decorated_key key = make_partition_key(cd.ip);
|
||||
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
|
||||
ips.insert(decorated_ip{std::move(key), cd->ip});
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
ips.insert(decorated_ip{std::move(key), cd.ip});
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
|
||||
co_await result.emit_partition_start(dip.key);
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
|
||||
return a.port < b.port || a.client_type_str() < b.client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd->shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd->stage_str());
|
||||
if (cd->driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
|
||||
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd.shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd.stage_str());
|
||||
if (cd.driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", *cd.driver_name);
|
||||
}
|
||||
if (cd->driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
|
||||
if (cd.driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", *cd.driver_version);
|
||||
}
|
||||
if (cd->hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd->hostname);
|
||||
if (cd.hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd.hostname);
|
||||
}
|
||||
if (cd->protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
|
||||
if (cd.protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
|
||||
}
|
||||
if (cd->ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
|
||||
if (cd.ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
|
||||
}
|
||||
if (cd->ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
|
||||
if (cd.ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
|
||||
}
|
||||
if (cd->ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
|
||||
if (cd.ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
|
||||
}
|
||||
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
|
||||
if (cd->scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
|
||||
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
|
||||
if (cd.scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(
|
||||
utf8_type,
|
||||
utf8_type,
|
||||
false
|
||||
);
|
||||
|
||||
auto prepare_client_options = [] (const auto& client_options) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& co: client_options) {
|
||||
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
};
|
||||
|
||||
set_cell(cr.cells(), "client_options",
|
||||
make_map_value(map_type, prepare_client_options(cd->client_options)));
|
||||
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
@@ -1120,10 +1100,9 @@ public:
|
||||
}
|
||||
|
||||
auto tm = _db.local().get_token_metadata_ptr();
|
||||
auto target_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
|
||||
|
||||
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
|
||||
|
||||
locator::load_sketch load(tm, stats, default_tablet_size);
|
||||
locator::load_sketch load(tm);
|
||||
co_await load.populate();
|
||||
|
||||
tm->get_topology().for_each_node([&] (const auto& node) {
|
||||
@@ -1137,23 +1116,18 @@ public:
|
||||
if (auto ip = _gossiper.local().get_address_map().find(host)) {
|
||||
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
|
||||
}
|
||||
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
|
||||
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
|
||||
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
|
||||
set_cell(r.cells(), "tablets_allocated", load.get_load(host));
|
||||
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_shard_load(host))));
|
||||
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_load(host) * target_tablet_size)));
|
||||
|
||||
if (stats && stats->capacity.contains(host)) {
|
||||
auto capacity = stats->capacity.at(host);
|
||||
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
|
||||
|
||||
if (auto utilization = load.get_allocated_utilization(host)) {
|
||||
auto utilization = load.get_allocated_utilization(host, *stats, target_tablet_size);
|
||||
if (utilization) {
|
||||
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
|
||||
}
|
||||
if (load.has_complete_data(host)) {
|
||||
if (auto utilization = load.get_storage_utilization(host)) {
|
||||
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
|
||||
}
|
||||
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
|
||||
}
|
||||
}
|
||||
mutation_sink(m);
|
||||
});
|
||||
@@ -1173,8 +1147,6 @@ private:
|
||||
.with_column("storage_capacity", long_type)
|
||||
.with_column("storage_allocated_load", long_type)
|
||||
.with_column("storage_allocated_utilization", double_type)
|
||||
.with_column("storage_load", long_type)
|
||||
.with_column("storage_utilization", double_type)
|
||||
.with_sharder(1, 0) // shard0-only
|
||||
.with_hash_version()
|
||||
.build();
|
||||
|
||||
2
dist/common/sysconfig/scylla-node-exporter
vendored
2
dist/common/sysconfig/scylla-node-exporter
vendored
@@ -1 +1 @@
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
|
||||
@@ -71,7 +71,7 @@ Use "Bash on Ubuntu on Windows" for the same tools and capabilities as on Linux
|
||||
|
||||
### Building the Docs
|
||||
|
||||
1. Run `make preview` in the `docs/` directory to build the documentation.
|
||||
1. Run `make preview` to build the documentation.
|
||||
1. Preview the built documentation locally at http://127.0.0.1:5500/.
|
||||
|
||||
### Cleanup
|
||||
|
||||
@@ -41,8 +41,6 @@ class MetricsProcessor:
|
||||
# Get metrics from the file
|
||||
try:
|
||||
metrics_file = metrics.get_metrics_from_file(relative_path, "scylla_", metrics_info, strict=strict)
|
||||
except SystemExit:
|
||||
pass
|
||||
finally:
|
||||
os.chdir(old_cwd)
|
||||
if metrics_file:
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
# Alternator: DynamoDB API in ScyllaDB
|
||||
# Alternator: DynamoDB API in Scylla
|
||||
|
||||
## Introduction
|
||||
Alternator is a ScyllaDB feature adding compatibility with Amazon DynamoDB(TM).
|
||||
Alternator is a Scylla feature adding compatibility with Amazon DynamoDB(TM).
|
||||
DynamoDB's API uses JSON-encoded requests and responses which are sent over
|
||||
an HTTP or HTTPS transport. It is described in detail in Amazon's [DynamoDB
|
||||
API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/).
|
||||
|
||||
Our goal is that any application written to use Amazon DynamoDB could
|
||||
be run, unmodified, against ScyllaDB with Alternator enabled. Alternator's
|
||||
be run, unmodified, against Scylla with Alternator enabled. Alternator's
|
||||
compatibility with DynamoDB is fairly complete, but users should be aware
|
||||
of some differences and some unimplemented features. The extent of
|
||||
Alternator's compatibility with DynamoDB is described in the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md) document,
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md) document,
|
||||
which is updated as the work on Alternator progresses and compatibility
|
||||
continues to improve.
|
||||
|
||||
@@ -19,8 +19,8 @@ Alternator also adds several features and APIs that are not available in
|
||||
DynamoDB. These are described in [Alternator-specific APIs](new-apis.md).
|
||||
|
||||
## Running Alternator
|
||||
By default, ScyllaDB does not listen for DynamoDB API requests. To enable
|
||||
this API in ScyllaDB you must set at least two configuration options,
|
||||
By default, Scylla does not listen for DynamoDB API requests. To enable
|
||||
this API in Scylla you must set at least two configuration options,
|
||||
**alternator_port** and **alternator_write_isolation**. For example in the
|
||||
YAML configuration file:
|
||||
```yaml
|
||||
@@ -30,7 +30,7 @@ alternator_write_isolation: only_rmw_uses_lwt # or always, forbid or unsafe
|
||||
or, equivalently, via command-line arguments: `--alternator-port=8000
|
||||
--alternator-write-isolation=only_rmw_uses_lwt.
|
||||
|
||||
the **alternator_port** option determines on which port ScyllaDB listens for
|
||||
the **alternator_port** option determines on which port Scylla listens for
|
||||
DynamoDB API requests. By default, it listens on this port on all network
|
||||
interfaces. To listen only on a specific interface, configure also the
|
||||
**alternator_address** option.
|
||||
@@ -41,12 +41,12 @@ Alternator has four different choices
|
||||
for the implementation of writes, each with different advantages. You should
|
||||
carefully consider which of the options makes more sense for your intended
|
||||
use case and configure alternator_write_isolation accordingly. There is
|
||||
currently no default for this option: Trying to run ScyllaDB with an Alternator
|
||||
currently no default for this option: Trying to run Scylla with an Alternator
|
||||
port selected but without configuring write isolation will result in an error message,
|
||||
asking you to set it.
|
||||
|
||||
In addition to (or instead of) serving HTTP requests on alternator_port,
|
||||
ScyllaDB can accept DynamoDB API requests over HTTPS (encrypted), on the port
|
||||
Scylla can accept DynamoDB API requests over HTTPS (encrypted), on the port
|
||||
specified by **alternator_https_port**. As usual for HTTPS servers, the
|
||||
operator must specify certificate and key files. By default these should
|
||||
be placed in `/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key`, but
|
||||
@@ -54,7 +54,7 @@ these default locations can overridden by specifying
|
||||
`--alternator-encryption-options keyfile="..."` and
|
||||
`--alternator-encryption-options certificate="..."`.
|
||||
|
||||
By default, ScyllaDB saves a snapshot of deleted tables. But Alternator does
|
||||
By default, Scylla saves a snapshot of deleted tables. But Alternator does
|
||||
not offer an API to restore these snapshots, so these snapshots are not useful
|
||||
and waste disk space - deleting a table does not recover any disk space.
|
||||
It is therefore recommended to disable this automatic-snapshotting feature
|
||||
@@ -73,11 +73,11 @@ itself. Instructions, code and examples for doing this can be found in the
|
||||
|
||||
This section provides only a very brief introduction to Alternator's
|
||||
design. A much more detailed document about the features of the DynamoDB
|
||||
API and how they are, or could be, implemented in ScyllaDB can be found in:
|
||||
API and how they are, or could be, implemented in Scylla can be found in:
|
||||
<https://docs.google.com/document/d/1i4yjF5OSAazAY_-T8CBce9-2ykW4twx_E_Nt2zDoOVs>
|
||||
|
||||
Almost all of Alternator's source code (except some initialization code)
|
||||
can be found in the alternator/ subdirectory of ScyllaDB's source code.
|
||||
can be found in the alternator/ subdirectory of Scylla's source code.
|
||||
Extensive functional tests can be found in the test/alternator
|
||||
subdirectory. These tests are written in Python, and can be run against
|
||||
both Alternator and Amazon's DynamoDB; This allows verifying that
|
||||
@@ -85,15 +85,15 @@ Alternator's behavior matches the one observed on DynamoDB.
|
||||
See test/alternator/README.md for more information about the tests and
|
||||
how to run them.
|
||||
|
||||
With Alternator enabled on port 8000 (for example), every ScyllaDB node
|
||||
With Alternator enabled on port 8000 (for example), every Scylla node
|
||||
listens for DynamoDB API requests on this port. These requests, in
|
||||
JSON format over HTTP, are parsed and result in calls to internal Scylla
|
||||
C++ functions - there is no CQL generation or parsing involved.
|
||||
In ScyllaDB terminology, the node receiving the request acts as the
|
||||
In Scylla terminology, the node receiving the request acts as the
|
||||
*coordinator*, and often passes the request on to one or more other nodes -
|
||||
*replicas* which hold copies of the requested data.
|
||||
|
||||
Alternator tables are stored as ScyllaDB tables, each in a separate keyspace.
|
||||
Alternator tables are stored as Scylla tables, each in a separate keyspace.
|
||||
Each keyspace is initialized when the corresponding Alternator table is
|
||||
created (with a CreateTable request). The replication factor (RF) for this
|
||||
keyspace is chosen at that point, depending on the size of the cluster:
|
||||
@@ -101,19 +101,19 @@ RF=3 is used on clusters with three or more nodes, and RF=1 is used for
|
||||
smaller clusters. Such smaller clusters are, of course, only recommended
|
||||
for tests because of the risk of data loss.
|
||||
|
||||
Each table in Alternator is stored as a ScyllaDB table in a separate
|
||||
Each table in Alternator is stored as a Scylla table in a separate
|
||||
keyspace. The DynamoDB key columns (hash and sort key) have known types,
|
||||
and become partition and clustering key columns of the ScyllaDB table.
|
||||
and become partition and clustering key columns of the Scylla table.
|
||||
All other attributes may be different for each row, so are stored in one
|
||||
map column in ScyllaDB, and not as separate columns.
|
||||
map column in Scylla, and not as separate columns.
|
||||
|
||||
DynamoDB supports two consistency levels for reads, "eventual consistency"
|
||||
and "strong consistency". These two modes are implemented using ScyllaDB's CL
|
||||
and "strong consistency". These two modes are implemented using Scylla's CL
|
||||
(consistency level) feature: All writes are done using the `LOCAL_QUORUM`
|
||||
consistency level, then strongly-consistent reads are done with
|
||||
`LOCAL_QUORUM`, while eventually-consistent reads are with just `LOCAL_ONE`.
|
||||
|
||||
In ScyllaDB (and its inspiration, Cassandra), high write performance is
|
||||
In Scylla (and its inspiration, Cassandra), high write performance is
|
||||
achieved by ensuring that writes do not require reads from disk.
|
||||
The DynamoDB API, however, provides many types of requests that need a read
|
||||
before the write (a.k.a. RMW requests - read-modify-write). For example,
|
||||
@@ -121,7 +121,7 @@ a request may copy an existing attribute, increment an attribute,
|
||||
be conditional on some expression involving existing values of attribute,
|
||||
or request that the previous values of attributes be returned. These
|
||||
read-modify-write transactions should be _isolated_ from each other, so
|
||||
by default Alternator implements every write operation using ScyllaDB's
|
||||
by default Alternator implements every write operation using Scylla's
|
||||
LWT (lightweight transactions). This default can be overridden on a per-table
|
||||
basis, by tagging the table as explained above in the "write isolation
|
||||
policies" section.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# ScyllaDB Alternator for DynamoDB users
|
||||
|
||||
ScyllaDB supports the DynamoDB API (this feature is codenamed "Alternator").
|
||||
Scylla supports the DynamoDB API (this feature is codenamed "Alternator").
|
||||
Our goal is to support any application written for Amazon DynamoDB.
|
||||
Nevertheless, there are a few differences between DynamoDB and Scylla, and
|
||||
and a few DynamoDB features that have not yet been implemented in Scylla.
|
||||
@@ -8,16 +8,16 @@ The purpose of this document is to inform users of these differences.
|
||||
|
||||
## Provisioning
|
||||
|
||||
The most obvious difference between DynamoDB and ScyllaDB is that while
|
||||
DynamoDB is a shared cloud service, ScyllaDB is a dedicated service running
|
||||
The most obvious difference between DynamoDB and Scylla is that while
|
||||
DynamoDB is a shared cloud service, Scylla is a dedicated service running
|
||||
on your private cluster. Whereas DynamoDB allows you to "provision" the
|
||||
number of requests per second you'll need - or at an extra cost not even
|
||||
provision that - ScyllaDB requires you to provision your cluster. You need
|
||||
provision that - Scylla requires you to provision your cluster. You need
|
||||
to reason about the number and size of your nodes - not the throughput.
|
||||
|
||||
Moreover, DynamoDB's per-table provisioning (`BillingMode=PROVISIONED`) is
|
||||
not yet supported by Scylla. The BillingMode and ProvisionedThroughput options
|
||||
on a table need to be valid but are ignored, and ScyllaDB behaves like DynamoDB's
|
||||
on a table need to be valid but are ignored, and Scylla behaves like DynamoDB's
|
||||
`BillingMode=PAY_PER_REQUEST`: All requests are accepted without a per-table
|
||||
throughput cap.
|
||||
|
||||
@@ -33,7 +33,7 @@ Instructions for doing this can be found in:
|
||||
|
||||
## Write isolation policies
|
||||
|
||||
ScyllaDB was designed to optimize the performance of pure write operations -
|
||||
Scylla was designed to optimize the performance of pure write operations -
|
||||
writes which do not need to read the previous value of the item.
|
||||
In CQL, writes which do need the previous value of the item must explicitly
|
||||
use the slower LWT ("LightWeight Transaction") feature to be correctly
|
||||
@@ -79,11 +79,11 @@ a _higher_ timestamp - and this will be the "last write" that wins.
|
||||
To avoid or mitigate this write reordering issue, users may consider
|
||||
one or more of the following:
|
||||
|
||||
1. Use NTP to keep the clocks on the different ScyllaDB nodes synchronized.
|
||||
1. Use NTP to keep the clocks on the different Scylla nodes synchronized.
|
||||
If the delay between the two writes is longer than NTP's accuracy,
|
||||
they will not be reordered.
|
||||
2. If an application wants to ensure that two specific writes are not
|
||||
reordered, it should send both requests to the same ScyllaDB node.
|
||||
reordered, it should send both requests to the same Scylla node.
|
||||
Care should be taken when using a load balancer - which might redirect
|
||||
two requests to two different nodes.
|
||||
3. Consider using the `always_use_lwt` write isolation policy.
|
||||
@@ -210,7 +210,7 @@ CREATE SERVICE_LEVEL IF NOT EXISTS oltp WITH SHARES = 1000;
|
||||
ATTACH SERVICE_LEVEL olap TO alice;
|
||||
ATTACH SERVICE_LEVEL oltp TO bob;
|
||||
```
|
||||
Note that `alternator_enforce_authorization` has to be enabled in ScyllaDB configuration.
|
||||
Note that `alternator_enforce_authorization` has to be enabled in Scylla configuration.
|
||||
|
||||
See [Authorization](##Authorization) section to learn more about roles and authorization.
|
||||
See [Workload Prioritization](../features/workload-prioritization)
|
||||
@@ -218,11 +218,11 @@ to read about Workload Prioritization in detail.
|
||||
|
||||
## Metrics
|
||||
|
||||
ScyllaDB has an advanced and extensive monitoring framework for inspecting
|
||||
and graphing hundreds of different metrics of ScyllaDB's usage and performance.
|
||||
ScyllaDB's monitoring stack, based on Grafana and Prometheus, is described in
|
||||
Scylla has an advanced and extensive monitoring framework for inspecting
|
||||
and graphing hundreds of different metrics of Scylla's usage and performance.
|
||||
Scylla's monitoring stack, based on Grafana and Prometheus, is described in
|
||||
<https://docs.scylladb.com/operating-scylla/monitoring/>.
|
||||
This monitoring stack is different from DynamoDB's offering - but ScyllaDB's
|
||||
This monitoring stack is different from DynamoDB's offering - but Scylla's
|
||||
is significantly more powerful and gives the user better insights on
|
||||
the internals of the database and its performance.
|
||||
|
||||
@@ -248,7 +248,7 @@ data in different partition order. Applications mustn't rely on that
|
||||
undocumented order.
|
||||
|
||||
Note that inside each partition, the individual items will be sorted the same
|
||||
in DynamoDB and ScyllaDB - determined by the _sort key_ defined for that table.
|
||||
in DynamoDB and Scylla - determined by the _sort key_ defined for that table.
|
||||
|
||||
---
|
||||
|
||||
@@ -271,16 +271,10 @@ is different, or can be configured in Alternator:
|
||||
So for example, if you create a table whose name is 192 characters, you
|
||||
can't create a GSI whose name is longer than 29 characters.
|
||||
|
||||
* DynamoDB's DescribeTable will return information about the table. According to
|
||||
AWS documentation, fields TableSizeBytes, IndexSizeBytes and ItemCount can
|
||||
lag behind by up to 6 hours.
|
||||
The `alternator_describe_table_info_cache_validity_in_seconds` parameter allows
|
||||
users to change this timeout - the default value in seconds is set to 21600 (6 hours).
|
||||
|
||||
## Experimental API features
|
||||
|
||||
Some DynamoDB API features are supported by Alternator, but considered
|
||||
**experimental** in this release. An experimental feature in ScyllaDB is a
|
||||
**experimental** in this release. An experimental feature in Scylla is a
|
||||
feature whose functionality is complete, or mostly complete, but it is not
|
||||
as thoroughly tested or optimized as regular features. Also, an experimental
|
||||
feature's implementation is still subject to change and upgrades may not be
|
||||
@@ -296,14 +290,6 @@ experimental:
|
||||
considered experimental so needs to be enabled explicitly with the
|
||||
`--experimental-features=alternator-streams` configuration option.
|
||||
|
||||
In this version, Alternator Streams is only supported if the base table
|
||||
uses vnodes instead of tablets. However, by default new tables use tablets
|
||||
so to create a table that can be used with Streams, you must set the tag
|
||||
`system:initial_tablets` set to `none` during CreateTable - so that the
|
||||
new table will use vnodes. Streams cannot be enabled on an already-existing
|
||||
table that uses tablets.
|
||||
See <https://github.com/scylladb/scylla/issues/23838>.
|
||||
|
||||
Alternator streams also differ in some respects from DynamoDB Streams:
|
||||
* The number of separate "shards" in Alternator's streams is significantly
|
||||
larger than is typical on DynamoDB.
|
||||
@@ -365,8 +351,8 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
|
||||
* The on-demand backup APIs are not supported: CreateBackup, DescribeBackup,
|
||||
DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
For now, users can use ScyllaDB's existing backup solutions such as snapshots
|
||||
or ScyllaDB Manager.
|
||||
For now, users can use Scylla's existing backup solutions such as snapshots
|
||||
or Scylla Manager.
|
||||
<https://github.com/scylladb/scylla/issues/5063>
|
||||
|
||||
* Continuous backup (the ability to restore any point in time) is also not
|
||||
@@ -384,21 +370,21 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
<https://github.com/scylladb/scylla/issues/5068>
|
||||
|
||||
* DAX (DynamoDB Accelerator), an in-memory cache for DynamoDB, is not
|
||||
available in for Alternator. Anyway, it should not be necessary - ScyllaDB's
|
||||
available in for Alternator. Anyway, it should not be necessary - Scylla's
|
||||
internal cache is already rather advanced and there is no need to place
|
||||
another cache in front of the it. We wrote more about this here:
|
||||
<https://www.scylladb.com/2017/07/31/database-caches-not-good/>
|
||||
|
||||
* The DescribeTable is missing some information about size estimates
|
||||
(IndexSizeBytes and ItemCount - TableSizeBytes is available), and also
|
||||
part of the information about indexes enabled on the table.
|
||||
* The DescribeTable is missing information about size estimates, and
|
||||
also part of the information about indexes enabled on the table.
|
||||
<https://github.com/scylladb/scylla/issues/5320>
|
||||
<https://github.com/scylladb/scylla/issues/7550>
|
||||
<https://github.com/scylladb/scylla/issues/7551>
|
||||
|
||||
* The PartiQL syntax (SQL-like SELECT/UPDATE/INSERT/DELETE expressions)
|
||||
and the operations ExecuteStatement, BatchExecuteStatement and
|
||||
ExecuteTransaction are not yet supported.
|
||||
A user that is interested in an SQL-like syntax can consider using ScyllaDB's
|
||||
A user that is interested in an SQL-like syntax can consider using Scylla's
|
||||
CQL protocol instead.
|
||||
This feature was added to DynamoDB in November 2020.
|
||||
<https://github.com/scylladb/scylla/issues/8787>
|
||||
@@ -407,7 +393,7 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
which is different from AWS's. In particular, the operations
|
||||
DescribeContributorInsights, ListContributorInsights and
|
||||
UpdateContributorInsights that configure Amazon's "CloudWatch Contributor
|
||||
Insights" are not yet supported. ScyllaDB has different ways to retrieve the
|
||||
Insights" are not yet supported. Scylla has different ways to retrieve the
|
||||
same information, such as which items were accessed most often.
|
||||
<https://github.com/scylladb/scylla/issues/8788>
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ This section will guide you through the steps for setting up the cluster:
|
||||
<https://hub.docker.com/r/scylladb/scylla/>, but add to every `docker run`
|
||||
command a `-p 8000:8000` before the image name and
|
||||
`--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port ScyllaDB will listen for
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
@@ -24,10 +24,10 @@ This section will guide you through the steps for setting up the cluster:
|
||||
By default, ScyllaDB run in this way will not have authentication or
|
||||
authorization enabled, and any DynamoDB API request will be honored without
|
||||
requiring them to be signed appropriately. See the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md#authentication-and-authorization)
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md#authentication-and-authorization)
|
||||
document on how to configure authentication and authorization.
|
||||
|
||||
## Testing ScyllaDB's DynamoDB API support:
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
1. Follow the instructions on the [AWS github page](https://github.com/awsdocs/amazon-dynamodb-developer-guide/blob/master/doc_source/TicTacToe.Phase1.md)
|
||||
2. Enjoy your tic-tac-toe game :-)
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
Alternator's primary goal is to be compatible with Amazon DynamoDB(TM)
|
||||
and its APIs, so that any application written to use Amazon DynamoDB could
|
||||
be run, unmodified, against ScyllaDB with Alternator enabled. The extent of
|
||||
be run, unmodified, against Scylla with Alternator enabled. The extent of
|
||||
Alternator's compatibility with DynamoDB is described in the
|
||||
[ScyllaDB Alternator for DynamoDB users](compatibility.md) document.
|
||||
[Scylla Alternator for DynamoDB users](compatibility.md) document.
|
||||
|
||||
But Alternator also adds several features and APIs that are not available in
|
||||
DynamoDB. These Alternator-specific APIs are documented here.
|
||||
@@ -15,7 +15,7 @@ _conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
Alternator could do this isolation by using ScyllaDB's LWT (lightweight
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
@@ -41,7 +41,7 @@ isolation policy for a specific table can be overridden by tagging the table
|
||||
which need a read before the write. An attempt to use such statements
|
||||
(e.g., UpdateItem with a ConditionExpression) will result in an error.
|
||||
In this mode, the remaining write requests which are allowed - pure writes
|
||||
without a read - are performed using standard ScyllaDB writes, not LWT,
|
||||
without a read - are performed using standard Scylla writes, not LWT,
|
||||
so they are significantly faster than they would have been in the
|
||||
`always_use_lwt`, but their isolation is still correct.
|
||||
|
||||
@@ -65,19 +65,19 @@ isolation policy for a specific table can be overridden by tagging the table
|
||||
read-modify-write updates. This mode is not recommended for any use case,
|
||||
and will likely be removed in the future.
|
||||
|
||||
## Accessing system tables from ScyllaDB
|
||||
ScyllaDB exposes lots of useful information via its internal system tables,
|
||||
## Accessing system tables from Scylla
|
||||
Scylla exposes lots of useful information via its internal system tables,
|
||||
which can be found in system keyspaces: 'system', 'system\_auth', etc.
|
||||
In order to access to these tables via alternator interface,
|
||||
Scan and Query requests can use a special table name:
|
||||
`.scylla.alternator.KEYSPACE_NAME.TABLE_NAME`
|
||||
which will return results fetched from corresponding ScyllaDB table.
|
||||
which will return results fetched from corresponding Scylla table.
|
||||
|
||||
This interface can be used only to fetch data from system tables.
|
||||
Attempts to read regular tables via the virtual interface will result
|
||||
in an error.
|
||||
|
||||
Example: in order to query the contents of ScyllaDB's `system.large_rows`,
|
||||
Example: in order to query the contents of Scylla's `system.large_rows`,
|
||||
pass `TableName='.scylla.alternator.system.large_rows'` to a Query/Scan
|
||||
request.
|
||||
|
||||
@@ -113,14 +113,14 @@ connection (either active or idle), not necessarily an active request as
|
||||
in Alternator.
|
||||
|
||||
## Service discovery
|
||||
As explained in [ScyllaDB Alternator for DynamoDB users](compatibility.md),
|
||||
As explained in [Scylla Alternator for DynamoDB users](compatibility.md),
|
||||
Alternator requires a load-balancer or a client-side load-balancing library
|
||||
to distribute requests between all ScyllaDB nodes. This load-balancer needs
|
||||
to be able to _discover_ the ScyllaDB nodes. Alternator provides two special
|
||||
to distribute requests between all Scylla nodes. This load-balancer needs
|
||||
to be able to _discover_ the Scylla nodes. Alternator provides two special
|
||||
requests, `/` and `/localnodes`, to help with this service discovery, which
|
||||
we will now explain.
|
||||
|
||||
Some setups know exactly which ScyllaDB nodes were brought up, so all that
|
||||
Some setups know exactly which Scylla nodes were brought up, so all that
|
||||
remains is to periodically verify that each node is still functional. The
|
||||
easiest way to do this is to make an HTTP (or HTTPS) GET request to the node,
|
||||
with URL `/`. This is a trivial GET request and does **not** need to be
|
||||
@@ -133,10 +133,10 @@ $ curl http://localhost:8000/
|
||||
healthy: localhost:8000
|
||||
```
|
||||
|
||||
In other setups, the load balancer might not know which ScyllaDB nodes exist.
|
||||
For example, it may be possible to add or remove ScyllaDB nodes without a
|
||||
In other setups, the load balancer might not know which Scylla nodes exist.
|
||||
For example, it may be possible to add or remove Scylla nodes without a
|
||||
client-side load balancer knowing. For these setups we have the `/localnodes`
|
||||
request that can be used to discover which ScyllaDB nodes exist: A load balancer
|
||||
request that can be used to discover which Scylla nodes exist: A load balancer
|
||||
that already knows at least one live node can discover the rest by sending
|
||||
a `/localnodes` request to the known node. It's again an unauthenticated
|
||||
HTTP (or HTTPS) GET request:
|
||||
@@ -160,7 +160,7 @@ list the nodes in a specific _data center_ or _rack_. These options are
|
||||
useful for certain use cases:
|
||||
|
||||
* A `dc` option (e.g., `/localnodes?dc=dc1`) can be passed to list the
|
||||
nodes in a specific ScyllaDB data center, not the data center of the node
|
||||
nodes in a specific Scylla data center, not the data center of the node
|
||||
being contacted. This is useful when a client knowns of _some_ Scylla
|
||||
node belonging to an unknown DC, but wants to list the nodes in _its_
|
||||
DC, which it knows by name.
|
||||
@@ -191,7 +191,7 @@ tells them to.
|
||||
|
||||
If you want to influence whether a specific Alternator table is created with tablets or vnodes,
|
||||
you can do this by specifying the `system:initial_tablets` tag
|
||||
(in earlier versions of ScyllaDB the tag was `experimental:initial_tablets`)
|
||||
(in earlier versions of Scylla the tag was `experimental:initial_tablets`)
|
||||
in the CreateTable operation. The value of this tag can be:
|
||||
|
||||
* Any valid integer as the value of this tag enables tablets.
|
||||
|
||||
@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
|
||||
@@ -1043,8 +1043,6 @@ The following modes are available:
|
||||
* - ``immediate``
|
||||
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
|
||||
@@ -102,7 +102,6 @@ Additional Information
|
||||
|
||||
To learn more about TTL, and see a hands-on example, check out `this lesson <https://university.scylladb.com/courses/data-modeling/lessons/advanced-data-modeling/topic/expiring-data-with-ttl-time-to-live/>`_ on ScyllaDB University.
|
||||
|
||||
* `Video: Managing data expiration with Time-To-Live <https://www.youtube.com/watch?v=SXkbu7mFHeA>`_
|
||||
* :doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`
|
||||
* :doc:`KB Article:How to Change gc_grace_seconds for a Table </kb/gc-grace-seconds/>`
|
||||
* :doc:`KB Article:Time to Live (TTL) and Compaction </kb/ttl-facts/>`
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Introduction
|
||||
|
||||
Similar to the approach described in CASSANDRA-12151, we add the
|
||||
Similar to the approach described in CASSANDRA-14471, we add the
|
||||
concept of an audit specification. An audit has a target (syslog or a
|
||||
table) and a set of events/actions that it wants recorded. We
|
||||
introduce new CQL syntax for Scylla users to describe and manipulate
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
|
||||
## What is ScyllaDB?
|
||||
|
||||
ScyllaDB is a high-performance NoSQL database optimized for speed and scalability.
|
||||
It is designed to efficiently handle large volumes of data with minimal latency,
|
||||
making it ideal for data-intensive applications.
|
||||
|
||||
ScyllaDB is distributed under the [ScyllaDB Source Available License](https://github.com/scylladb/scylladb/blob/master/LICENSE-ScyllaDB-Source-Available.md).
|
||||
ScyllaDB is a high-performance NoSQL database system, fully compatible with Apache Cassandra.
|
||||
ScyllaDB is released under the GNU Affero General Public License version 3 and the Apache License, ScyllaDB is free and open-source software.
|
||||
|
||||
> [ScyllaDB](http://www.scylladb.com/)
|
||||
|
||||
|
||||
@@ -20,7 +20,9 @@ command line option when launchgin scylla.
|
||||
You can define endpoint details in the `scylla.yaml` file. For example:
|
||||
```yaml
|
||||
object_storage_endpoints:
|
||||
- name: https://s3.us-east-1.amazonaws.com:443
|
||||
- name: s3.us-east-1.amazonaws.com
|
||||
port: 443
|
||||
https: true
|
||||
aws_region: us-east-1
|
||||
```
|
||||
|
||||
@@ -76,7 +78,9 @@ The examples above are intended for development or local environments. You shoul
|
||||
For the EC2 Instance Metadata Service to function correctly, no additional configuration is required. However, STS requires the IAM Role ARN to be defined in the `scylla.yaml` file, as shown below:
|
||||
```yaml
|
||||
object_storage_endpoints:
|
||||
- name: https://s3.us-east-1.amazonaws.com:443
|
||||
- name: s3.us-east-1.amazonaws.com
|
||||
port: 443
|
||||
https: true
|
||||
aws_region: us-east-1
|
||||
iam_role_arn: arn:aws:iam::123456789012:instance-profile/my-instance-instance-profile
|
||||
```
|
||||
@@ -96,7 +100,9 @@ in `scylla.yaml`:
|
||||
|
||||
```yaml
|
||||
object_storage_endpoints:
|
||||
- name: https://s3.us-east-2.amazonaws.com:443
|
||||
- name: s3.us-east-2.amazonaws.com
|
||||
port: 443
|
||||
https: true
|
||||
aws_region: us-east-2
|
||||
```
|
||||
|
||||
|
||||
@@ -74,8 +74,6 @@ The keys and values are:
|
||||
as an indicator to which shard client wants to connect. The desired shard number
|
||||
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
|
||||
Its value is a decimal representation of type `uint16_t`, by default `19142`.
|
||||
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
|
||||
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
|
||||
|
||||
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
|
||||
`biased-token-round-robin`. To apply the algorithm,
|
||||
@@ -238,26 +236,3 @@ the same mechanism for other protocol versions, such as CQLv4.
|
||||
|
||||
The feature is identified by the `SCYLLA_USE_METADATA_ID` key, which is meant to be sent
|
||||
in the SUPPORTED message.
|
||||
|
||||
## Sending the CLIENT_ROUTES_CHANGE event
|
||||
|
||||
This extension allows a driver to update its connections when the
|
||||
`system.client_routes` table is modified.
|
||||
|
||||
In some network topologies a specific mapping of addresses and ports is required (e.g.
|
||||
to support Private Link). This mapping can change dynamically even when no nodes are
|
||||
added or removed. The driver must adapt to those changes; otherwise connectivity can be
|
||||
lost.
|
||||
|
||||
The extension is implemented as a new `EVENT` type: `CLIENT_ROUTES_CHANGE`. The event
|
||||
body consists of:
|
||||
- [string] change
|
||||
- [string list] connection_ids
|
||||
- [string list] host_ids
|
||||
|
||||
There is only one change value: `UPDATE_NODES`, which means at least one client route
|
||||
was inserted, updated, or deleted.
|
||||
|
||||
Events already have a subscription mechanism similar to protocol extensions (that is,
|
||||
the driver only receives the events it explicitly subscribed to), so no additional
|
||||
`cql_protocol_extension` key is introduced for this feature.
|
||||
|
||||
@@ -372,8 +372,6 @@ Columns:
|
||||
* `storage_allocated_load` - Disk space allocated for tablets, assuming each tablet has a fixed size (target_tablet_size).
|
||||
* `storage_allocated_utilization` - Fraction of node's disk capacity taken for `storage_allocated_load`, where 1.0 means full utilization.
|
||||
* `storage_capacity` - Total disk capacity in bytes. Used to compute `storage_allocated_utilization`. By default equal to file system's capacity.
|
||||
* `storage_load` - Disk space allocated for tablets, computed with actual tablet sizes. Can be null if some of the tablet sizes are not known.
|
||||
* `storage_utilization` - Fraction of node's disk capacity taken for `storage_load` (with actual tablet sizes), where 1.0 means full utilization. Can be null if some of the tablet sizes are not known.
|
||||
* `tablets_allocated` - Number of tablet replicas on the node. Migrating tablets are accounted as if migration already finished.
|
||||
* `tablets_allocated_per_shard` - `tablets_allocated` divided by shard count on the node.
|
||||
|
||||
|
||||
@@ -45,6 +45,22 @@ immediately after it's finished.
|
||||
|
||||
A flag which determines if a task can be aborted through API.
|
||||
|
||||
# Task timing fields
|
||||
|
||||
Tasks have three timing fields that track different stages of their lifecycle:
|
||||
|
||||
- `creation_time` - When the task was created/queued. This is extracted from the task's
|
||||
UUID (which is a timeuuid) and represents the moment the task request was submitted.
|
||||
- `start_time` - When the task actually began executing. For tasks that are queued, this
|
||||
will be unspecified (equal to epoch) until execution starts. For node operations
|
||||
like decommission, this is set when the request is picked up for execution by the
|
||||
topology coordinator.
|
||||
- `end_time` - When the task completed (successfully or with an error). This is
|
||||
unspecified (equal to epoch) until the task finishes.
|
||||
|
||||
The difference between `creation_time` and `start_time` represents the time a task
|
||||
spent waiting in the queue before execution began.
|
||||
|
||||
# Type vs scope vs kind
|
||||
|
||||
`type` of a task describes what operation is covered by a task,
|
||||
|
||||
@@ -86,7 +86,6 @@ stateDiagram-v2
|
||||
de_left_token_ring --> [*]
|
||||
}
|
||||
state removing {
|
||||
re_left_token_ring : left_token_ring
|
||||
re_tablet_draining : tablet_draining
|
||||
re_tablet_migration : tablet_migration
|
||||
re_write_both_read_old : write_both_read_old
|
||||
@@ -99,8 +98,7 @@ stateDiagram-v2
|
||||
re_tablet_draining --> re_write_both_read_old
|
||||
re_write_both_read_old --> re_write_both_read_new: streaming completed
|
||||
re_write_both_read_old --> re_rollback_to_normal: rollback
|
||||
re_write_both_read_new --> re_left_token_ring
|
||||
re_left_token_ring --> [*]
|
||||
re_write_both_read_new --> [*]
|
||||
}
|
||||
rebuilding --> normal: streaming completed
|
||||
decommissioning --> left: operation succeeded
|
||||
@@ -124,10 +122,9 @@ Note that these are not all states, as there are other states specific to tablet
|
||||
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
|
||||
to modified token ring), reads are using old replicas.
|
||||
- `write_both_read_new` - as above, but reads are using new replicas.
|
||||
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
|
||||
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
|
||||
We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
|
||||
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
|
||||
moving the node we tried to decommission/remove back to the normal state.
|
||||
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
|
||||
@@ -144,9 +141,7 @@ reads that started before this point exist in the system. Finally we remove the
|
||||
transitioning state.
|
||||
|
||||
Decommission, removenode and replace work similarly, except they don't go through
|
||||
`commit_cdc_generation`. Both decommission and removenode go through the
|
||||
`left_token_ring` state to run a global barrier ensuring all nodes are aware
|
||||
of the topology change before the operation completes.
|
||||
`commit_cdc_generation`.
|
||||
|
||||
The state machine may also go only through the `commit_cdc_generation` state
|
||||
after getting a request from the user to create a new CDC generation if the
|
||||
|
||||
@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
|
||||
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
|
||||
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
|
||||
|
||||
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
|
||||
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
|
||||
- a keyspace/view was dropped
|
||||
- tablet operations (see [tablet operations section](#tablet-operations))
|
||||
In the first case we simply delete relevant view building tasks as they are no longer needed.
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
|
||||
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
|
||||
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
|
||||
|
||||
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user