Compare commits
149 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77ee7f3417 | ||
|
|
0ff89a58be | ||
|
|
f7ffa395a8 | ||
|
|
3fa3b920de | ||
|
|
e7ca52ee79 | ||
|
|
730eca5dac | ||
|
|
c8cff94a5a | ||
|
|
5fae4cdf80 | ||
|
|
8bbcaacba1 | ||
|
|
3dfa5ebd7f | ||
|
|
24264e24bb | ||
|
|
0c64e3be9a | ||
|
|
d5b63df46e | ||
|
|
f545ed37bc | ||
|
|
5f13880a91 | ||
|
|
0e51a1f812 | ||
|
|
8b807b299e | ||
|
|
07ff659849 | ||
|
|
d3e199984e | ||
|
|
8822c23ad4 | ||
|
|
be9992cfb3 | ||
|
|
daf00a7f24 | ||
|
|
62962f33bb | ||
|
|
060c2f7c0d | ||
|
|
64149b57c3 | ||
|
|
4b004fcdfc | ||
|
|
5e38b3071b | ||
|
|
225b3351fc | ||
|
|
80c6718ea8 | ||
|
|
855b91ec20 | ||
|
|
95e303faf3 | ||
|
|
8ba595e472 | ||
|
|
608eee0357 | ||
|
|
0bcc2977bb | ||
|
|
3a865fe991 | ||
|
|
fb32e1c7ee | ||
|
|
b6895f0fa7 | ||
|
|
c30b326033 | ||
|
|
807fc68dc5 | ||
|
|
51843195f7 | ||
|
|
7038b8b544 | ||
|
|
7df610b73d | ||
|
|
386309d6a0 | ||
|
|
a213e41250 | ||
|
|
58dc414912 | ||
|
|
d883ff2317 | ||
|
|
1f777da863 | ||
|
|
faad0167d7 | ||
|
|
0115a21b9a | ||
|
|
71834ce7dd | ||
|
|
df21112c39 | ||
|
|
bd339cc4d8 | ||
|
|
91bf23eea1 | ||
|
|
f925ed176b | ||
|
|
68dcd1b1b2 | ||
|
|
6fd5160947 | ||
|
|
3fc914ca59 | ||
|
|
6ef7ad9b5a | ||
|
|
581b8ace83 | ||
|
|
8192f45e84 | ||
|
|
c6eec4eeef | ||
|
|
9bef142328 | ||
|
|
74bf24a4a7 | ||
|
|
e60bcd0011 | ||
|
|
45c16553eb | ||
|
|
c984f557ef | ||
|
|
5e83311305 | ||
|
|
f00f7976c1 | ||
|
|
c428645d16 | ||
|
|
082342ecad | ||
|
|
47efbdffbc | ||
|
|
d811eeb4ca | ||
|
|
4784e39665 | ||
|
|
d4014b7970 | ||
|
|
92b5e4d63d | ||
|
|
e546143fd9 | ||
|
|
721434054b | ||
|
|
350cbd1d66 | ||
|
|
866c96f536 | ||
|
|
367633270a | ||
|
|
e97a504775 | ||
|
|
a5c971d21c | ||
|
|
a0809f0032 | ||
|
|
bb6e41f97a | ||
|
|
4df6b51ac2 | ||
|
|
0c8730ba05 | ||
|
|
bc2e83bc1f | ||
|
|
f4c3d5c1b7 | ||
|
|
e54abde3e8 | ||
|
|
9696ee64d0 | ||
|
|
8dd69f02a8 | ||
|
|
d000fa3335 | ||
|
|
4e289e8e6a | ||
|
|
19b6207f17 | ||
|
|
ff52550739 | ||
|
|
e654045755 | ||
|
|
07b92a1ee8 | ||
|
|
7504d10d9e | ||
|
|
28cb300d0a | ||
|
|
9b3fbedc8c | ||
|
|
420fb1fd53 | ||
|
|
7c62417b54 | ||
|
|
9d2f7c3f52 | ||
|
|
e3e81a9a7a | ||
|
|
86dde50c0d | ||
|
|
6a6bbbf1a6 | ||
|
|
b82f92b439 | ||
|
|
f00e00fde0 | ||
|
|
b0727d3f2a | ||
|
|
4169bdb7a6 | ||
|
|
c5580399a8 | ||
|
|
1c45ad7cee | ||
|
|
c18133b6cb | ||
|
|
1d42770936 | ||
|
|
d287b054b9 | ||
|
|
4f803aad22 | ||
|
|
a54bf50290 | ||
|
|
06dd3b2e64 | ||
|
|
6163fedd2e | ||
|
|
67f1c6d36c | ||
|
|
669286b1d6 | ||
|
|
b9199e8b24 | ||
|
|
1ff7f5941b | ||
|
|
3b70154f0a | ||
|
|
6ae72ed134 | ||
|
|
82f80478b8 | ||
|
|
a191503ddf | ||
|
|
619bf3ac4b | ||
|
|
6221c58325 | ||
|
|
6c115c691f | ||
|
|
5924c36b50 | ||
|
|
ad6a73c29b | ||
|
|
4ec0fa6eb5 | ||
|
|
c313b215e4 | ||
|
|
7c612e1789 | ||
|
|
f0e2941e34 | ||
|
|
62802b119b | ||
|
|
323e5cd171 | ||
|
|
dd461e0472 | ||
|
|
0c9b2e5332 | ||
|
|
b29c42adce | ||
|
|
ea3dc0b0de | ||
|
|
2a6bef96d6 | ||
|
|
19da1cb656 | ||
|
|
2cf1ca43b5 | ||
|
|
642f468c59 | ||
|
|
bd7c87731b | ||
|
|
4c667e87ec | ||
|
|
aacf883a8b |
9
.github/CODEOWNERS
vendored
9
.github/CODEOWNERS
vendored
@@ -1,5 +1,5 @@
|
||||
# AUTH
|
||||
auth/* @nuivall @ptrsmrn
|
||||
auth/* @nuivall
|
||||
|
||||
# CACHE
|
||||
row_cache* @tgrabiec
|
||||
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
|
||||
transport/*
|
||||
|
||||
# CQL QUERY LANGUAGE
|
||||
cql3/* @tgrabiec @nuivall @ptrsmrn
|
||||
cql3/* @tgrabiec @nuivall
|
||||
|
||||
# COUNTERS
|
||||
counters* @nuivall @ptrsmrn
|
||||
tests/counter_test* @nuivall @ptrsmrn
|
||||
counters* @nuivall
|
||||
tests/counter_test* @nuivall
|
||||
|
||||
# DOCS
|
||||
docs/* @annastuchlik @tzach
|
||||
@@ -57,7 +57,6 @@ repair/* @tgrabiec @asias
|
||||
|
||||
# SCHEMA MANAGEMENT
|
||||
db/schema_tables* @tgrabiec
|
||||
db/legacy_schema_migrator* @tgrabiec
|
||||
service/migration* @tgrabiec
|
||||
schema* @tgrabiec
|
||||
|
||||
|
||||
2
.github/scripts/auto-backport.py
vendored
2
.github/scripts/auto-backport.py
vendored
@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
|
||||
if is_draft:
|
||||
labels_to_add.append("conflicts")
|
||||
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
|
||||
pr_comment += "Please resolve them and mark this PR as ready for review"
|
||||
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
|
||||
backport_pr.create_issue_comment(pr_comment)
|
||||
|
||||
# Apply all labels at once if we have any
|
||||
|
||||
@@ -18,7 +18,7 @@ jobs:
|
||||
|
||||
// Regular expression pattern to check for "Fixes" prefix
|
||||
// Adjusted to dynamically insert the repository full name
|
||||
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
|
||||
const regex = new RegExp(pattern);
|
||||
|
||||
if (!regex.test(body)) {
|
||||
|
||||
5
.github/workflows/trigger-scylla-ci.yaml
vendored
5
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -3,10 +3,13 @@ name: Trigger Scylla CI Route
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
pull_request_target:
|
||||
types:
|
||||
- unlabeled
|
||||
|
||||
jobs:
|
||||
trigger-jenkins:
|
||||
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
|
||||
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger Scylla-CI-Route Jenkins Job
|
||||
|
||||
@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
|
||||
if (!comparison_operator.IsString()) {
|
||||
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
|
||||
}
|
||||
std::string op = comparison_operator.GetString();
|
||||
std::string op = rjson::to_string(comparison_operator);
|
||||
auto it = ops.find(op);
|
||||
if (it == ops.end()) {
|
||||
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
|
||||
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
|
||||
}
|
||||
if (kv1.name == "S") {
|
||||
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
|
||||
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
|
||||
return cmp(rjson::to_string_view(kv1.value),
|
||||
rjson::to_string_view(kv2.value));
|
||||
}
|
||||
if (kv1.name == "B") {
|
||||
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
|
||||
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "S") {
|
||||
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
|
||||
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
|
||||
return check_BETWEEN(rjson::to_string_view(kv_v.value),
|
||||
rjson::to_string_view(kv_lb.value),
|
||||
rjson::to_string_view(kv_ub.value),
|
||||
bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "B") {
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
|
||||
#include "consumed_capacity.hh"
|
||||
#include "error.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
@@ -32,12 +34,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
if (!return_consumed->IsString()) {
|
||||
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
|
||||
}
|
||||
std::string consumed = return_consumed->GetString();
|
||||
std::string_view consumed = rjson::to_string_view(*return_consumed);
|
||||
if (consumed == "INDEXES") {
|
||||
throw api_error::validation("INDEXES consumed capacity is not supported");
|
||||
}
|
||||
if (consumed != "TOTAL") {
|
||||
throw api_error::validation("Unknown consumed capacity "+ consumed);
|
||||
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
|
||||
if (!table_name_value->IsString()) {
|
||||
throw api_error::validation("Non-string TableName field in request");
|
||||
}
|
||||
std::string table_name = table_name_value->GetString();
|
||||
std::string table_name = rjson::to_string(*table_name_value);
|
||||
return table_name;
|
||||
}
|
||||
|
||||
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
// does exist but the index does not (ValidationException).
|
||||
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
|
||||
throw api_error::validation(
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
|
||||
} else {
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
|
||||
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
|
||||
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
|
||||
return rjson::to_string(*attribute_value);
|
||||
}
|
||||
|
||||
// Convenience function for getting the value of a boolean attribute, or a
|
||||
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
|
||||
}
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == name) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
data_type dt = parse_key_type(type);
|
||||
if (computed_column) {
|
||||
// Computed column for GSI (doesn't choose a real column as-is
|
||||
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First element of KeySchema must be an object");
|
||||
}
|
||||
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
|
||||
throw api_error::validation("First key in KeySchema must be a HASH key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[0], "AttributeName");
|
||||
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First key in KeySchema must have string AttributeName");
|
||||
}
|
||||
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
|
||||
std::string hash_key = v->GetString();
|
||||
std::string hash_key = rjson::to_string(*v);
|
||||
std::string range_key;
|
||||
if (key_schema->Size() == 2) {
|
||||
if (!(*key_schema)[1].IsObject()) {
|
||||
throw api_error::validation("Second element of KeySchema must be an object");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "KeyType");
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
|
||||
throw api_error::validation("Second key in KeySchema must be a RANGE key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "AttributeName");
|
||||
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
std::string def_type = type_to_string(def.type);
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (type != def_type) {
|
||||
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
|
||||
}
|
||||
@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
|
||||
|
||||
// The put_or_delete_item class builds the mutations needed by the PutItem and
|
||||
// DeleteItem operations - either as stand-alone commands or part of a list
|
||||
// of commands in BatchWriteItems.
|
||||
// of commands in BatchWriteItem.
|
||||
// put_or_delete_item splits each operation into two stages: Constructing the
|
||||
// object parses and validates the user input (throwing exceptions if there
|
||||
// are input errors). Later, build() generates the actual mutation, with a
|
||||
// specified timestamp. This split is needed because of the peculiar needs of
|
||||
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
|
||||
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
|
||||
// any writing happens (if one of the commands has an error, none of the
|
||||
// writes should be done). LWT makes it impossible for the parse step to
|
||||
// generate "mutation" objects, because the timestamp still isn't known.
|
||||
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
validate_value(it->value, "PutItem");
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
|
||||
@@ -2739,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
auto read_command = needs_read_before_write ?
|
||||
previous_item_read_command(proxy, schema(), _ck, selection) :
|
||||
nullptr;
|
||||
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
|
||||
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
|
||||
if (!is_applied) {
|
||||
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
|
||||
return;
|
||||
}
|
||||
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
|
||||
if (!used.contains(it->name.GetString())) {
|
||||
if (!used.contains(rjson::to_string(it->name))) {
|
||||
throw api_error::validation(
|
||||
format("{} has spurious '{}', not used in {}",
|
||||
field_name, it->name.GetString(), operation));
|
||||
field_name, rjson::to_string_view(it->name), operation));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
}
|
||||
|
||||
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
|
||||
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
|
||||
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
@@ -3026,17 +3026,20 @@ struct primary_key_equal {
|
||||
};
|
||||
|
||||
// This is a cas_request subclass for applying given put_or_delete_items to
|
||||
// one partition using LWT as part as BatchWriteItems. This is a write-only
|
||||
// one partition using LWT as part as BatchWriteItem. This is a write-only
|
||||
// operation, not needing the previous value of the item (the mutation to be
|
||||
// done is known prior to starting the operation). Nevertheless, we want to
|
||||
// do this mutation via LWT to ensure that it is serialized with other LWT
|
||||
// mutations to the same partition.
|
||||
//
|
||||
// The std::vector<put_or_delete_item> must remain alive until the
|
||||
// storage_proxy::cas() future is resolved.
|
||||
class put_or_delete_item_cas_request : public service::cas_request {
|
||||
schema_ptr schema;
|
||||
std::vector<put_or_delete_item> _mutation_builders;
|
||||
const std::vector<put_or_delete_item>& _mutation_builders;
|
||||
public:
|
||||
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
|
||||
schema(std::move(s)), _mutation_builders(std::move(b)) { }
|
||||
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
|
||||
schema(std::move(s)), _mutation_builders(b) { }
|
||||
virtual ~put_or_delete_item_cas_request() = default;
|
||||
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
|
||||
std::optional<mutation> ret;
|
||||
@@ -3052,20 +3055,48 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
|
||||
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
|
||||
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
if (!cas_shard.this_shard()) {
|
||||
_stats.shard_bounce_for_lwt++;
|
||||
return container().invoke_on(cas_shard.shard(), _ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = mutation_builders,
|
||||
&dk,
|
||||
ks = schema->ks_name(),
|
||||
cf = schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(executor& self) mutable {
|
||||
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt), &self]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
auto timeout = executor::default_timeout();
|
||||
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
|
||||
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
|
||||
auto* op_ptr = op.get();
|
||||
auto cdc_opts = cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility =
|
||||
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
};
|
||||
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
|
||||
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout, true, std::move(cdc_opts)).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
|
||||
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
|
||||
// does not need to support conditional updates.
|
||||
}
|
||||
|
||||
@@ -3087,13 +3118,11 @@ struct schema_decorated_key_equal {
|
||||
|
||||
// FIXME: if we failed writing some of the mutations, need to return a list
|
||||
// of these failed mutations rather than fail the whole write (issue #5650).
|
||||
static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
smp_service_group ssg,
|
||||
future<> executor::do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit,
|
||||
stats& stats) {
|
||||
service_permit permit) {
|
||||
if (mutation_builders.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -3115,7 +3144,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
any_cdc_enabled |= b.first->cdc_options().enabled();
|
||||
}
|
||||
return proxy.mutate(std::move(mutations),
|
||||
return _proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
executor::default_timeout(),
|
||||
trace_state,
|
||||
@@ -3124,55 +3153,48 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
false,
|
||||
cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
});
|
||||
} else {
|
||||
// Do the write via LWT:
|
||||
// Multiple mutations may be destined for the same partition, adding
|
||||
// or deleting different items of one partition. Join them together
|
||||
// because we can do them in one cas() call.
|
||||
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
|
||||
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto& b : mutation_builders) {
|
||||
auto dk = dht::decorate_key(*b.first, b.second.pk());
|
||||
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
|
||||
using map_type = std::unordered_map<schema_decorated_key,
|
||||
std::vector<put_or_delete_item>,
|
||||
schema_decorated_key_hash,
|
||||
schema_decorated_key_equal>;
|
||||
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto&& b : std::move(mutation_builders)) {
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
|
||||
.schema = b.first,
|
||||
.dk = dht::decorate_key(*b.first, b.second.pk())
|
||||
});
|
||||
it->second.push_back(std::move(b.second));
|
||||
}
|
||||
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto* key_builders_ptr = key_builders.get();
|
||||
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
|
||||
_stats.write_using_lwt++;
|
||||
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
|
||||
if (desired_shard.this_shard()) {
|
||||
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
|
||||
} else {
|
||||
stats.shard_bounce_for_lwt++;
|
||||
return proxy.container().invoke_on(desired_shard.shard(), ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
mb = e.second,
|
||||
dk = e.first.dk,
|
||||
ks = e.first.schema->ks_name(),
|
||||
cf = e.first.schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(service::storage_proxy& proxy) mutable {
|
||||
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.data_dictionary().find_schema(ks, cf);
|
||||
auto s = e.first.schema;
|
||||
|
||||
// The desired_shard on the original shard remains alive for the duration
|
||||
// of cas_write on this shard and prevents any tablet operations.
|
||||
// However, we need a local instance of cas_shard on this shard
|
||||
// to pass it to sp::cas, so we just create a new one.
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
}).finally([desired_shard = std::move(desired_shard)]{});
|
||||
}
|
||||
});
|
||||
static const auto* injection_name = "alternator_executor_batch_write_wait";
|
||||
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
|
||||
const auto ks = handler.get("keyspace");
|
||||
const auto cf = handler.get("table");
|
||||
const auto shard = std::atoll(handler.get("shard")->data());
|
||||
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
|
||||
elogger.info("{}: hit", injection_name);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
elogger.info("{}: continue", injection_name);
|
||||
}
|
||||
}).then([&e, desired_shard = std::move(desired_shard),
|
||||
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
|
||||
{
|
||||
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
|
||||
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
|
||||
});
|
||||
}).finally([key_builders = std::move(key_builders)]{});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3319,7 +3341,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
|
||||
_stats.api_operations.batch_write_item_batch_total += total_items;
|
||||
_stats.api_operations.batch_write_item_histogram.add(total_items);
|
||||
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
|
||||
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
|
||||
// FIXME: Issue #5650: If we failed writing some of the updates,
|
||||
// need to return a list of these failed updates in UnprocessedItems
|
||||
// rather than fail the whole write (issue #5650).
|
||||
@@ -3364,7 +3386,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
}
|
||||
rjson::value newv = rjson::empty_object();
|
||||
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
|
||||
std::string attr = it->name.GetString();
|
||||
std::string attr = rjson::to_string(it->name);
|
||||
auto x = members.find(attr);
|
||||
if (x != members.end()) {
|
||||
if (x->second) {
|
||||
@@ -3584,7 +3606,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
|
||||
const rjson::value& attributes_to_get = req["AttributesToGet"];
|
||||
attrs_to_get ret;
|
||||
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
|
||||
attribute_path_map_add("AttributesToGet", ret, it->GetString());
|
||||
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
|
||||
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
|
||||
}
|
||||
if (ret.empty()) {
|
||||
@@ -4250,12 +4272,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
|
||||
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
// Note that it.key() is the name of the column, *it is the operation
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
const column_definition* cdef = _schema->get_column_definition(column_name);
|
||||
if (cdef && cdef->is_primary_key()) {
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
|
||||
}
|
||||
std::string action = (it->value)["Action"].GetString();
|
||||
std::string action = rjson::to_string((it->value)["Action"]);
|
||||
if (action == "DELETE") {
|
||||
// The DELETE operation can do two unrelated tasks. Without a
|
||||
// "Value" option, it is used to delete an attribute. With a
|
||||
@@ -5452,7 +5474,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
std::vector<query::clustering_range> ck_bounds;
|
||||
|
||||
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
|
||||
std::string key = it->name.GetString();
|
||||
sstring key = rjson::to_sstring(it->name);
|
||||
const rjson::value& condition = it->value;
|
||||
|
||||
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
|
||||
@@ -5460,13 +5482,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
|
||||
const column_definition& pk_cdef = schema->partition_key_columns().front();
|
||||
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
|
||||
if (sstring(key) == pk_cdef.name_as_text()) {
|
||||
if (key == pk_cdef.name_as_text()) {
|
||||
if (!partition_ranges.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
|
||||
}
|
||||
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
|
||||
if (ck_cdef && key == ck_cdef->name_as_text()) {
|
||||
if (!ck_bounds.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
@@ -5867,7 +5889,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
|
||||
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
|
||||
rjson::value* limit_json = rjson::find(request, "Limit");
|
||||
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
|
||||
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
|
||||
int limit = limit_json ? limit_json->GetInt() : 100;
|
||||
if (limit < 1 || limit > 100) {
|
||||
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
|
||||
|
||||
@@ -40,6 +40,7 @@ namespace cql3::selection {
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class cas_shard;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -57,6 +58,7 @@ class schema_builder;
|
||||
namespace alternator {
|
||||
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
@@ -219,6 +221,16 @@ private:
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
future<> do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit);
|
||||
|
||||
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
|
||||
public:
|
||||
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
|
||||
@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
|
||||
return {"", nullptr};
|
||||
}
|
||||
auto it = v.MemberBegin();
|
||||
const std::string it_key = it->name.GetString();
|
||||
const std::string it_key = rjson::to_string(it->name);
|
||||
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
|
||||
return {std::move(it_key), nullptr};
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
|
||||
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
|
||||
}
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
sstring attribute_name = rjson::to_sstring(*v);
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
|
||||
@@ -3051,7 +3051,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -9,6 +9,7 @@ target_sources(scylla_auth
|
||||
allow_all_authorizer.cc
|
||||
authenticated_user.cc
|
||||
authenticator.cc
|
||||
cache.cc
|
||||
certificate_authenticator.cc
|
||||
common.cc
|
||||
default_authorizer.cc
|
||||
|
||||
@@ -23,6 +23,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
|
||||
180
auth/cache.cc
Normal file
180
auth/cache.cc
Normal file
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "auth/roles-metadata.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include <iterator>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/core/format.hh>
|
||||
|
||||
namespace auth {
|
||||
|
||||
logging::logger logger("auth-cache");
|
||||
|
||||
cache::cache(cql3::query_processor& qp) noexcept
|
||||
: _current_version(0)
|
||||
, _qp(qp) {
|
||||
}
|
||||
|
||||
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
|
||||
auto it = _roles.find(role);
|
||||
if (it == _roles.end()) {
|
||||
return {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
|
||||
auto rec = make_lw_shared<role_record>();
|
||||
rec->version = _current_version;
|
||||
|
||||
auto fetch = [this, &role](const sstring& q) {
|
||||
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
|
||||
internal_distributed_query_state(), {role},
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
};
|
||||
// roles
|
||||
{
|
||||
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
|
||||
auto rs = co_await fetch(q);
|
||||
if (!rs->empty()) {
|
||||
auto& r = rs->one();
|
||||
rec->is_superuser = r.get_or<bool>("is_superuser", false);
|
||||
rec->can_login = r.get_or<bool>("can_login", false);
|
||||
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
|
||||
if (r.has("member_of")) {
|
||||
auto mo = r.get_set<sstring>("member_of");
|
||||
rec->member_of.insert(
|
||||
std::make_move_iterator(mo.begin()),
|
||||
std::make_move_iterator(mo.end()));
|
||||
}
|
||||
} else {
|
||||
// role got deleted
|
||||
co_return nullptr;
|
||||
}
|
||||
}
|
||||
// members
|
||||
{
|
||||
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
rec->members.insert(r.get_as<sstring>("member"));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
// attributes
|
||||
{
|
||||
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
rec->attributes[r.get_as<sstring>("name")] =
|
||||
r.get_as<sstring>("value");
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
// permissions
|
||||
{
|
||||
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
auto resource = r.get_as<sstring>("resource");
|
||||
auto perms_strings = r.get_set<sstring>("permissions");
|
||||
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
|
||||
auto pset = permissions::from_strings(perms_set);
|
||||
rec->permissions[std::move(resource)] = std::move(pset);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return rec;
|
||||
}
|
||||
|
||||
future<> cache::prune_all() noexcept {
|
||||
for (auto it = _roles.begin(); it != _roles.end(); ) {
|
||||
if (it->second->version != _current_version) {
|
||||
_roles.erase(it++);
|
||||
co_await coroutine::maybe_yield();
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> cache::load_all() {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
++_current_version;
|
||||
|
||||
logger.info("Loading all roles");
|
||||
const uint32_t page_size = 128;
|
||||
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
|
||||
const auto name = r.get_as<sstring>("role");
|
||||
auto role = co_await fetch_role(name);
|
||||
if (role) {
|
||||
_roles[name] = role;
|
||||
}
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
|
||||
db::system_keyspace::NAME, meta::roles_table::name),
|
||||
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
|
||||
|
||||
co_await prune_all();
|
||||
for (const auto& [name, role] : _roles) {
|
||||
co_await distribute_role(name, role);
|
||||
}
|
||||
co_await container().invoke_on_others([this](cache& c) -> future<> {
|
||||
c._current_version = _current_version;
|
||||
co_await c.prune_all();
|
||||
});
|
||||
}
|
||||
|
||||
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
for (const auto& name : roles) {
|
||||
logger.info("Loading role {}", name);
|
||||
auto role = co_await fetch_role(name);
|
||||
if (role) {
|
||||
_roles[name] = role;
|
||||
} else {
|
||||
_roles.erase(name);
|
||||
}
|
||||
co_await distribute_role(name, role);
|
||||
}
|
||||
}
|
||||
|
||||
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
|
||||
auto role_ptr = role.get();
|
||||
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
|
||||
if (!role_ptr) {
|
||||
c._roles.erase(name);
|
||||
return;
|
||||
}
|
||||
auto role_copy = make_lw_shared<role_record>(*role_ptr);
|
||||
c._roles[name] = std::move(role_copy);
|
||||
});
|
||||
}
|
||||
|
||||
bool cache::includes_table(const table_id& id) noexcept {
|
||||
return id == db::system_keyspace::roles()->id()
|
||||
|| id == db::system_keyspace::role_members()->id()
|
||||
|| id == db::system_keyspace::role_attributes()->id()
|
||||
|| id == db::system_keyspace::role_permissions()->id();
|
||||
}
|
||||
|
||||
} // namespace auth
|
||||
61
auth/cache.hh
Normal file
61
auth/cache.hh
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include "auth/permission.hh"
|
||||
#include "auth/common.hh"
|
||||
|
||||
namespace cql3 { class query_processor; }
|
||||
|
||||
namespace auth {
|
||||
|
||||
class cache : public peering_sharded_service<cache> {
|
||||
public:
|
||||
using role_name_t = sstring;
|
||||
using version_tag_t = char;
|
||||
|
||||
struct role_record {
|
||||
bool can_login = false;
|
||||
bool is_superuser = false;
|
||||
std::unordered_set<role_name_t> member_of;
|
||||
std::unordered_set<role_name_t> members;
|
||||
sstring salted_hash;
|
||||
std::unordered_map<sstring, sstring> attributes;
|
||||
std::unordered_map<sstring, permission_set> permissions;
|
||||
version_tag_t version; // used for seamless cache reloads
|
||||
};
|
||||
|
||||
explicit cache(cql3::query_processor& qp) noexcept;
|
||||
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
|
||||
future<> load_all();
|
||||
future<> load_roles(std::unordered_set<role_name_t> roles);
|
||||
static bool includes_table(const table_id&) noexcept;
|
||||
|
||||
private:
|
||||
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
|
||||
roles_map _roles;
|
||||
version_tag_t _current_version;
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
|
||||
future<> prune_all() noexcept;
|
||||
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
|
||||
};
|
||||
|
||||
} // namespace auth
|
||||
@@ -8,6 +8,7 @@
|
||||
*/
|
||||
|
||||
#include "auth/certificate_authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
#include <boost/regex.hpp>
|
||||
#include <fmt/ranges.h>
|
||||
@@ -34,13 +35,14 @@ static const class_registrator<auth::authenticator
|
||||
, cql3::query_processor&
|
||||
, ::service::raft_group0_client&
|
||||
, ::service::migration_manager&
|
||||
, auth::cache&
|
||||
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
|
||||
enum class auth::certificate_authenticator::query_source {
|
||||
subject, altname
|
||||
};
|
||||
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
|
||||
: _queries([&] {
|
||||
auto& conf = qp.db().get_config();
|
||||
auto queries = conf.auth_certificate_role_queries();
|
||||
@@ -75,9 +77,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
|
||||
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
|
||||
}
|
||||
continue;
|
||||
} catch (std::out_of_range&) {
|
||||
} catch (const std::out_of_range&) {
|
||||
// just fallthrough
|
||||
} catch (boost::regex_error&) {
|
||||
} catch (const boost::regex_error&) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,13 +26,15 @@ class raft_group0_client;
|
||||
|
||||
namespace auth {
|
||||
|
||||
class cache;
|
||||
|
||||
extern const std::string_view certificate_authenticator_name;
|
||||
|
||||
class certificate_authenticator : public authenticator {
|
||||
enum class query_source;
|
||||
std::vector<std::pair<query_source, boost::regex>> _queries;
|
||||
public:
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
~certificate_authenticator();
|
||||
|
||||
future<> start() override;
|
||||
|
||||
@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
|
||||
try {
|
||||
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
|
||||
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
|
||||
} catch (exceptions::already_exists_exception&) {}
|
||||
} catch (const exceptions::already_exists_exception&) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,10 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
|
||||
|
||||
} // namespace meta
|
||||
|
||||
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
|
||||
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
|
||||
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
|
||||
|
||||
// This is a helper to check whether auth-v2 is on.
|
||||
bool legacy_mode(cql3::query_processor& qp);
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ std::string_view default_authorizer::qualified_java_name() const {
|
||||
static constexpr std::string_view ROLE_NAME = "role";
|
||||
static constexpr std::string_view RESOURCE_NAME = "resource";
|
||||
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
|
||||
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
|
||||
|
||||
static logging::logger alogger("default_authorizer");
|
||||
|
||||
@@ -257,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
|
||||
} else {
|
||||
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
|
||||
}
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
|
||||
}
|
||||
}
|
||||
@@ -294,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -83,17 +83,18 @@ static const class_registrator<
|
||||
ldap_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration(ldap_role_manager_full_name);
|
||||
::service::migration_manager&,
|
||||
cache&> registration(ldap_role_manager_full_name);
|
||||
|
||||
ldap_role_manager::ldap_role_manager(
|
||||
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
|
||||
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
|
||||
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
|
||||
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
|
||||
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
|
||||
, _bind_password(bind_password)
|
||||
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
|
||||
}
|
||||
|
||||
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
|
||||
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
|
||||
: ldap_role_manager(
|
||||
qp.db().get_config().ldap_url_template(),
|
||||
qp.db().get_config().ldap_attr_role(),
|
||||
@@ -101,7 +102,8 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
|
||||
qp.db().get_config().ldap_bind_passwd(),
|
||||
qp,
|
||||
rg0c,
|
||||
mm) {
|
||||
mm,
|
||||
cache) {
|
||||
}
|
||||
|
||||
std::string_view ldap_role_manager::qualified_java_name() const noexcept {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#include "ent/ldap/ldap_connection.hh"
|
||||
#include "standard_role_manager.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -43,12 +44,13 @@ class ldap_role_manager : public role_manager {
|
||||
std::string_view bind_password, ///< LDAP bind credentials.
|
||||
cql3::query_processor& qp, ///< Passed to standard_role_manager.
|
||||
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
|
||||
::service::migration_manager& mm ///< Passed to standard_role_manager.
|
||||
::service::migration_manager& mm, ///< Passed to standard_role_manager.
|
||||
cache& cache ///< Passed to standard_role_manager.
|
||||
);
|
||||
|
||||
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
|
||||
/// class_registrator<role_manager>.
|
||||
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
|
||||
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
|
||||
|
||||
/// Thrown when query-template parsing fails.
|
||||
struct url_error : public std::runtime_error {
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <stdexcept>
|
||||
#include <string_view>
|
||||
#include "auth/cache.hh"
|
||||
#include "cql3/description.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
@@ -23,7 +24,8 @@ static const class_registrator<
|
||||
maintenance_socket_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
|
||||
::service::migration_manager&,
|
||||
cache&> registration(sstring{maintenance_socket_role_manager_name});
|
||||
|
||||
|
||||
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
|
||||
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
|
||||
class maintenance_socket_role_manager final : public role_manager {
|
||||
public:
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
|
||||
|
||||
virtual std::string_view qualified_java_name() const noexcept override;
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
@@ -63,10 +64,11 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
password_authenticator::~password_authenticator() {
|
||||
}
|
||||
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
, _hashing_worker(hashing_worker)
|
||||
@@ -315,24 +317,33 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
const sstring password = credentials.at(PASSWORD_KEY);
|
||||
|
||||
try {
|
||||
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
|
||||
if (!salted_hash) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
std::optional<sstring> salted_hash;
|
||||
if (legacy_mode(_qp)) {
|
||||
salted_hash = co_await get_password_hash(username);
|
||||
if (!salted_hash) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
} else {
|
||||
auto role = _cache.get(username);
|
||||
if (!role || role->salted_hash.empty()) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
salted_hash = role->salted_hash;
|
||||
}
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
|
||||
return passwords::check(password, *salted_hash);
|
||||
});
|
||||
if (!password_match) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
co_return username;
|
||||
} catch (std::system_error &) {
|
||||
} catch (const std::system_error &) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.what()));
|
||||
} catch (exceptions::authentication_exception& e) {
|
||||
} catch (const exceptions::authentication_exception& e) {
|
||||
std::throw_with_nested(e);
|
||||
} catch (exceptions::unavailable_exception& e) {
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/passwords.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
@@ -41,6 +42,7 @@ class password_authenticator : public authenticator {
|
||||
cql3::query_processor& _qp;
|
||||
::service::raft_group0_client& _group0_client;
|
||||
::service::migration_manager& _migration_manager;
|
||||
cache& _cache;
|
||||
future<> _stopped;
|
||||
abort_source _as;
|
||||
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
|
||||
@@ -53,7 +55,7 @@ public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
|
||||
~password_authenticator();
|
||||
|
||||
|
||||
@@ -35,9 +35,10 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
@@ -29,7 +30,7 @@ namespace auth {
|
||||
class saslauthd_authenticator : public authenticator {
|
||||
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
|
||||
public:
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
|
||||
|
||||
future<> start() override;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include <chrono>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
@@ -157,6 +158,7 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
|
||||
|
||||
service::service(
|
||||
utils::loading_cache_config c,
|
||||
cache& cache,
|
||||
cql3::query_processor& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
::service::migration_notifier& mn,
|
||||
@@ -166,6 +168,7 @@ service::service(
|
||||
maintenance_socket_enabled used_by_maintenance_socket)
|
||||
: _loading_cache_config(std::move(c))
|
||||
, _permissions_cache(nullptr)
|
||||
, _cache(cache)
|
||||
, _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _mnotifier(mn)
|
||||
@@ -188,15 +191,17 @@ service::service(
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache,
|
||||
utils::alien_worker& hashing_worker)
|
||||
: service(
|
||||
std::move(c),
|
||||
cache,
|
||||
qp,
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
|
||||
@@ -221,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
|
||||
try {
|
||||
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
|
||||
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
|
||||
} catch (::service::group0_concurrent_modification&) {
|
||||
} catch (const ::service::group0_concurrent_modification&) {
|
||||
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
|
||||
}
|
||||
}
|
||||
@@ -232,6 +237,9 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
auto auth_version = co_await sys_ks.get_auth_version();
|
||||
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
|
||||
_qp.auth_version = auth_version;
|
||||
if (this_shard_id() == 0) {
|
||||
co_await _cache.load_all();
|
||||
}
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// this legacy keyspace is only used by cqlsh
|
||||
// it's needed when executing `list roles` or `list users`
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "auth/permissions_cache.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "cql3/description.hh"
|
||||
@@ -77,6 +78,7 @@ public:
|
||||
class service final : public seastar::peering_sharded_service<service> {
|
||||
utils::loading_cache_config _loading_cache_config;
|
||||
std::unique_ptr<permissions_cache> _permissions_cache;
|
||||
cache& _cache;
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
@@ -107,6 +109,7 @@ class service final : public seastar::peering_sharded_service<service> {
|
||||
public:
|
||||
service(
|
||||
utils::loading_cache_config,
|
||||
cache& cache,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_notifier&,
|
||||
@@ -128,6 +131,7 @@ public:
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
maintenance_socket_enabled,
|
||||
cache&,
|
||||
utils::alien_worker&);
|
||||
|
||||
future<> start(::service::migration_manager&, db::system_keyspace&);
|
||||
|
||||
@@ -41,21 +41,6 @@
|
||||
|
||||
namespace auth {
|
||||
|
||||
namespace meta {
|
||||
|
||||
namespace role_members_table {
|
||||
|
||||
constexpr std::string_view name{"role_members" , 12};
|
||||
|
||||
}
|
||||
|
||||
namespace role_attributes_table {
|
||||
|
||||
constexpr std::string_view name{"role_attributes", 15};
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static logging::logger log("standard_role_manager");
|
||||
|
||||
@@ -64,7 +49,8 @@ static const class_registrator<
|
||||
standard_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
|
||||
struct record final {
|
||||
sstring name;
|
||||
@@ -121,10 +107,11 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
|
||||
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
|
||||
}
|
||||
|
||||
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
|
||||
{}
|
||||
@@ -136,7 +123,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
|
||||
const resource_set& standard_role_manager::protected_resources() const {
|
||||
static const resource_set resources({
|
||||
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
|
||||
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
|
||||
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
|
||||
|
||||
return resources;
|
||||
}
|
||||
@@ -160,7 +147,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
" PRIMARY KEY (role, member)"
|
||||
")",
|
||||
meta::legacy::AUTH_KS,
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
static const sstring create_role_attributes_query = seastar::format(
|
||||
"CREATE TABLE {}.{} ("
|
||||
" role text,"
|
||||
@@ -169,7 +156,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
" PRIMARY KEY(role, name)"
|
||||
")",
|
||||
meta::legacy::AUTH_KS,
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
return when_all_succeed(
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::roles_table::name,
|
||||
@@ -177,12 +164,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
create_roles_query,
|
||||
_migration_manager),
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::role_members_table::name,
|
||||
ROLE_MEMBERS_CF,
|
||||
_qp,
|
||||
create_role_members_query,
|
||||
_migration_manager),
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::role_attributes_table::name,
|
||||
ROLE_ATTRIBUTES_CF,
|
||||
_qp,
|
||||
create_role_attributes_query,
|
||||
_migration_manager)).discard_result();
|
||||
@@ -205,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
log.warn("Skipped default role setup: some nodes were not ready; will retry");
|
||||
throw e;
|
||||
}
|
||||
@@ -429,7 +416,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
|
||||
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
|
||||
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
const auto members = co_await _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -461,7 +448,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
|
||||
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
|
||||
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name)},
|
||||
cql3::query_processor::cache_internal::yes).discard_result();
|
||||
@@ -517,7 +504,7 @@ standard_role_manager::legacy_modify_membership(
|
||||
case membership_change::add: {
|
||||
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
co_return co_await _qp.execute_internal(
|
||||
insert_query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -529,7 +516,7 @@ standard_role_manager::legacy_modify_membership(
|
||||
case membership_change::remove: {
|
||||
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
co_return co_await _qp.execute_internal(
|
||||
delete_query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -567,12 +554,12 @@ standard_role_manager::modify_membership(
|
||||
case membership_change::add:
|
||||
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
break;
|
||||
case membership_change::remove:
|
||||
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
break;
|
||||
default:
|
||||
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
|
||||
@@ -666,7 +653,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{}",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
@@ -731,15 +718,21 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
|
||||
}
|
||||
|
||||
future<bool> standard_role_manager::can_login(std::string_view role_name) {
|
||||
return require_record(_qp, role_name).then([](record r) {
|
||||
return r.can_login;
|
||||
});
|
||||
if (legacy_mode(_qp)) {
|
||||
const auto r = co_await require_record(_qp, role_name);
|
||||
co_return r.can_login;
|
||||
}
|
||||
auto role = _cache.get(sstring(role_name));
|
||||
if (!role) {
|
||||
throw nonexistant_role(role_name);
|
||||
}
|
||||
co_return role->can_login;
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
if (!result_set->empty()) {
|
||||
const cql3::untyped_result_set_row &row = result_set->one();
|
||||
@@ -770,7 +763,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
|
||||
}
|
||||
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
|
||||
} else {
|
||||
@@ -785,7 +778,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
|
||||
}
|
||||
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
|
||||
} else {
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
#include <string_view>
|
||||
|
||||
@@ -36,13 +37,14 @@ class standard_role_manager final : public role_manager {
|
||||
cql3::query_processor& _qp;
|
||||
::service::raft_group0_client& _group0_client;
|
||||
::service::migration_manager& _migration_manager;
|
||||
cache& _cache;
|
||||
future<> _stopped;
|
||||
abort_source _as;
|
||||
std::string _superuser;
|
||||
shared_promise<> _superuser_created_promise;
|
||||
|
||||
public:
|
||||
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
|
||||
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
virtual std::string_view qualified_java_name() const noexcept override;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/default_authorizer.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
@@ -37,8 +38,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
@@ -80,7 +81,7 @@ public:
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -125,7 +126,7 @@ public:
|
||||
virtual bytes evaluate_response(bytes_view client_response) override {
|
||||
try {
|
||||
return _sasl->evaluate_response(client_response);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
_complete = true;
|
||||
return {};
|
||||
}
|
||||
@@ -140,7 +141,7 @@ public:
|
||||
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -240,6 +241,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&,
|
||||
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<
|
||||
|
||||
@@ -445,6 +445,7 @@ ldap_tests = set([
|
||||
scylla_tests = set([
|
||||
'test/boost/combined_tests',
|
||||
'test/boost/UUID_test',
|
||||
'test/boost/url_parse_test',
|
||||
'test/boost/advanced_rpc_compressor_test',
|
||||
'test/boost/allocation_strategy_test',
|
||||
'test/boost/alternator_unit_test',
|
||||
@@ -1061,7 +1062,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'db/hints/resource_manager.cc',
|
||||
'db/hints/sync_point.cc',
|
||||
'db/large_data_handler.cc',
|
||||
'db/legacy_schema_migrator.cc',
|
||||
'db/marshal/type_parser.cc',
|
||||
'db/per_partition_rate_limit_options.cc',
|
||||
'db/rate_limiter.cc',
|
||||
@@ -1195,6 +1195,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'auth/allow_all_authorizer.cc',
|
||||
'auth/authenticated_user.cc',
|
||||
'auth/authenticator.cc',
|
||||
'auth/cache.cc',
|
||||
'auth/common.cc',
|
||||
'auth/default_authorizer.cc',
|
||||
'auth/resource.cc',
|
||||
@@ -1646,6 +1647,7 @@ deps['test/boost/bytes_ostream_test'] = [
|
||||
]
|
||||
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
|
||||
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
|
||||
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
|
||||
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
|
||||
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
|
||||
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']
|
||||
|
||||
12
cql3/Cql.g
12
cql3/Cql.g
@@ -575,6 +575,15 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
|
||||
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
|
||||
;
|
||||
|
||||
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
|
||||
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
|
||||
;
|
||||
|
||||
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
|
||||
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
|
||||
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
|
||||
;
|
||||
|
||||
/**
|
||||
* UPDATE <CF>
|
||||
* USING TIMESTAMP <long>
|
||||
@@ -666,7 +675,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
|
||||
auto attrs = std::make_unique<cql3::attributes::raw>();
|
||||
expression wclause = conjunction{};
|
||||
}
|
||||
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
|
||||
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
|
||||
{
|
||||
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
|
||||
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
|
||||
@@ -2370,6 +2379,7 @@ K_LIKE: L I K E;
|
||||
|
||||
K_TIMEOUT: T I M E O U T;
|
||||
K_PRUNE: P R U N E;
|
||||
K_CONCURRENCY: C O N C U R R E N C Y;
|
||||
|
||||
K_EXECUTE: E X E C U T E;
|
||||
|
||||
|
||||
@@ -20,19 +20,21 @@
|
||||
namespace cql3 {
|
||||
|
||||
std::unique_ptr<attributes> attributes::none() {
|
||||
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
|
||||
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
|
||||
}
|
||||
|
||||
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
|
||||
std::optional<cql3::expr::expression>&& time_to_live,
|
||||
std::optional<cql3::expr::expression>&& timeout,
|
||||
std::optional<sstring> service_level)
|
||||
std::optional<sstring> service_level,
|
||||
std::optional<cql3::expr::expression>&& concurrency)
|
||||
: _timestamp_unset_guard(timestamp)
|
||||
, _timestamp{std::move(timestamp)}
|
||||
, _time_to_live_unset_guard(time_to_live)
|
||||
, _time_to_live{std::move(time_to_live)}
|
||||
, _timeout{std::move(timeout)}
|
||||
, _service_level(std::move(service_level))
|
||||
, _concurrency{std::move(concurrency)}
|
||||
{ }
|
||||
|
||||
bool attributes::is_timestamp_set() const {
|
||||
@@ -51,6 +53,10 @@ bool attributes::is_service_level_set() const {
|
||||
return bool(_service_level);
|
||||
}
|
||||
|
||||
bool attributes::is_concurrency_set() const {
|
||||
return bool(_concurrency);
|
||||
}
|
||||
|
||||
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
|
||||
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
|
||||
return now;
|
||||
@@ -123,6 +129,27 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
|
||||
return sl_controller.get_service_level(sl_name).slo;
|
||||
}
|
||||
|
||||
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
|
||||
if (!_concurrency.has_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
|
||||
if (concurrency_raw.is_null()) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
|
||||
}
|
||||
int32_t concurrency;
|
||||
try {
|
||||
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
|
||||
} catch (marshal_exception& e) {
|
||||
throw exceptions::invalid_request_exception("Invalid concurrency value");
|
||||
}
|
||||
if (concurrency <= 0) {
|
||||
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
|
||||
}
|
||||
return concurrency;
|
||||
}
|
||||
|
||||
void attributes::fill_prepare_context(prepare_context& ctx) {
|
||||
if (_timestamp.has_value()) {
|
||||
expr::fill_prepare_context(*_timestamp, ctx);
|
||||
@@ -133,10 +160,13 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
|
||||
if (_timeout.has_value()) {
|
||||
expr::fill_prepare_context(*_timeout, ctx);
|
||||
}
|
||||
if (_concurrency.has_value()) {
|
||||
expr::fill_prepare_context(*_concurrency, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
|
||||
std::optional<expr::expression> ts, ttl, to;
|
||||
std::optional<expr::expression> ts, ttl, to, conc;
|
||||
|
||||
if (timestamp.has_value()) {
|
||||
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
|
||||
@@ -153,7 +183,12 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
|
||||
verify_no_aggregate_functions(*timeout, "USING clause");
|
||||
}
|
||||
|
||||
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
|
||||
if (concurrency.has_value()) {
|
||||
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
|
||||
verify_no_aggregate_functions(*concurrency, "USING clause");
|
||||
}
|
||||
|
||||
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
|
||||
@@ -168,4 +203,8 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
|
||||
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
|
||||
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -36,13 +36,15 @@ private:
|
||||
std::optional<cql3::expr::expression> _time_to_live;
|
||||
std::optional<cql3::expr::expression> _timeout;
|
||||
std::optional<sstring> _service_level;
|
||||
std::optional<cql3::expr::expression> _concurrency;
|
||||
public:
|
||||
static std::unique_ptr<attributes> none();
|
||||
private:
|
||||
attributes(std::optional<cql3::expr::expression>&& timestamp,
|
||||
std::optional<cql3::expr::expression>&& time_to_live,
|
||||
std::optional<cql3::expr::expression>&& timeout,
|
||||
std::optional<sstring> service_level);
|
||||
std::optional<sstring> service_level,
|
||||
std::optional<cql3::expr::expression>&& concurrency);
|
||||
public:
|
||||
bool is_timestamp_set() const;
|
||||
|
||||
@@ -52,6 +54,8 @@ public:
|
||||
|
||||
bool is_service_level_set() const;
|
||||
|
||||
bool is_concurrency_set() const;
|
||||
|
||||
int64_t get_timestamp(int64_t now, const query_options& options);
|
||||
|
||||
std::optional<int32_t> get_time_to_live(const query_options& options);
|
||||
@@ -60,6 +64,8 @@ public:
|
||||
|
||||
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
|
||||
|
||||
std::optional<int32_t> get_concurrency(const query_options& options) const;
|
||||
|
||||
void fill_prepare_context(prepare_context& ctx);
|
||||
|
||||
class raw final {
|
||||
@@ -68,6 +74,7 @@ public:
|
||||
std::optional<cql3::expr::expression> time_to_live;
|
||||
std::optional<cql3::expr::expression> timeout;
|
||||
std::optional<sstring> service_level;
|
||||
std::optional<cql3::expr::expression> concurrency;
|
||||
|
||||
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
|
||||
private:
|
||||
@@ -76,6 +83,8 @@ public:
|
||||
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
|
||||
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
|
||||
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -331,7 +331,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
if (!cl_for_paxos) [[unlikely]] {
|
||||
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
|
||||
}
|
||||
seastar::shared_ptr<cas_request> request;
|
||||
std::unique_ptr<cas_request> request;
|
||||
schema_ptr schema;
|
||||
|
||||
db::timeout_clock::time_point now = db::timeout_clock::now();
|
||||
@@ -354,9 +354,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
if (keys.empty()) {
|
||||
continue;
|
||||
}
|
||||
if (request.get() == nullptr) {
|
||||
if (!request) {
|
||||
schema = statement.s;
|
||||
request = seastar::make_shared<cas_request>(schema, std::move(keys));
|
||||
request = std::make_unique<cas_request>(schema, std::move(keys));
|
||||
} else if (keys.size() != 1 || keys.front().equal(request->key().front(), dht::ring_position_comparator(*schema)) == false) {
|
||||
throw exceptions::invalid_request_exception("BATCH with conditions cannot span multiple partitions");
|
||||
}
|
||||
@@ -366,7 +366,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
|
||||
request->add_row_update(statement, std::move(ranges), std::move(json_cache), statement_options);
|
||||
}
|
||||
if (request.get() == nullptr) {
|
||||
if (!request) {
|
||||
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
|
||||
}
|
||||
|
||||
@@ -377,9 +377,10 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
);
|
||||
}
|
||||
|
||||
return qp.proxy().cas(schema, std::move(cas_shard), request, request->read_command(qp), request->key(),
|
||||
auto* request_ptr = request.get();
|
||||
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
|
||||
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -279,11 +279,15 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
|
||||
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
|
||||
}
|
||||
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
// Regular secondary indexes require rf-rack-validity.
|
||||
// Custom indexes need to validate this property themselves, if they need it.
|
||||
if (!_properties || !_properties->custom_class) {
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
validate_for_local_index(*schema);
|
||||
|
||||
@@ -401,7 +401,8 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
|
||||
type.is_update() ? "update" : "deletion"));
|
||||
}
|
||||
|
||||
auto request = seastar::make_shared<cas_request>(s, std::move(keys));
|
||||
auto request = std::make_unique<cas_request>(s, std::move(keys));
|
||||
auto* request_ptr = request.get();
|
||||
// cas_request can be used for batches as well single statements; Here we have just a single
|
||||
// modification in the list of CAS commands, since we're handling single-statement execution.
|
||||
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
|
||||
@@ -427,9 +428,9 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
|
||||
tablet_info = erm->check_locality(token);
|
||||
}
|
||||
|
||||
return qp.proxy().cas(s, std::move(cas_shard), request, request->read_command(qp), request->key(),
|
||||
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
|
||||
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
|
||||
result->add_tablet_info(tablet_replicas, token_range);
|
||||
return result;
|
||||
|
||||
@@ -21,7 +21,7 @@ namespace cql3 {
|
||||
namespace statements {
|
||||
|
||||
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
|
||||
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
|
||||
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
|
||||
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
|
||||
view->all_columns()
|
||||
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
|
||||
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
|
||||
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
|
||||
|
||||
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
|
||||
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
|
||||
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
|
||||
|
||||
int32_t page_size = std::max(options.get_page_size(), 1000);
|
||||
auto now = gc_clock::now();
|
||||
@@ -62,7 +62,8 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
|
||||
auto timeout_duration = get_timeout(state.get_client_state(), options);
|
||||
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
|
||||
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
|
||||
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
|
||||
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ target_sources(db
|
||||
schema_applier.cc
|
||||
schema_tables.cc
|
||||
cql_type_parser.cc
|
||||
legacy_schema_migrator.cc
|
||||
commitlog/commitlog.cc
|
||||
commitlog/commitlog_replayer.cc
|
||||
commitlog/commitlog_entry.cc
|
||||
|
||||
18
db/config.cc
18
db/config.cc
@@ -1172,6 +1172,17 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
|
||||
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
|
||||
/**
|
||||
* @Group Vector search settings
|
||||
* @GroupDescription Settings for configuring and tuning vector search functionality.
|
||||
*/
|
||||
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
|
||||
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
|
||||
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
|
||||
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
|
||||
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
|
||||
/**
|
||||
* @Group Security properties
|
||||
* @GroupDescription Server and client security settings.
|
||||
*/
|
||||
@@ -1459,13 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
|
||||
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
|
||||
, vector_store_primary_uri(
|
||||
this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
|
||||
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
|
||||
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
|
||||
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri. The available options are:\n"
|
||||
"* truststore: (Default: <not set. use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
|
||||
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
|
||||
|
||||
@@ -344,6 +344,9 @@ public:
|
||||
named_value<sstring> request_scheduler;
|
||||
named_value<sstring> request_scheduler_id;
|
||||
named_value<string_map> request_scheduler_options;
|
||||
named_value<sstring> vector_store_primary_uri;
|
||||
named_value<sstring> vector_store_secondary_uri;
|
||||
named_value<string_map> vector_store_encryption_options;
|
||||
named_value<sstring> authenticator;
|
||||
named_value<sstring> internode_authenticator;
|
||||
named_value<sstring> authorizer;
|
||||
@@ -471,10 +474,6 @@ public:
|
||||
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
|
||||
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
|
||||
|
||||
named_value<sstring> vector_store_primary_uri;
|
||||
named_value<sstring> vector_store_secondary_uri;
|
||||
named_value<string_map> vector_store_encryption_options;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<bool> sanitizer_report_backtrace;
|
||||
|
||||
@@ -1,602 +0,0 @@
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
// Since Scylla 2.0, we use system tables whose schemas were introduced in
|
||||
// Cassandra 3. If Scylla boots to find a data directory with system tables
|
||||
// with older schemas - produced by pre-2.0 Scylla or by pre-3.0 Cassandra,
|
||||
// we need to migrate these old tables to the new format.
|
||||
//
|
||||
// We provide here a function, db::legacy_schema_migrator::migrate(),
|
||||
// for a one-time migration from old to new system tables. The function
|
||||
// reads old system tables, write them back in the new format, and finally
|
||||
// delete the old system tables. Scylla's main should call this function and
|
||||
// wait for the returned future, before starting to serve the database.
|
||||
|
||||
#include <boost/iterator/filter_iterator.hpp>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
#include <map>
|
||||
#include <unordered_set>
|
||||
#include <chrono>
|
||||
|
||||
#include "replica/database.hh"
|
||||
#include "legacy_schema_migrator.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "schema_tables.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "cql3/statements/property_definitions.hh"
|
||||
|
||||
static seastar::logger mlogger("legacy_schema_migrator");
|
||||
|
||||
namespace db {
|
||||
namespace legacy_schema_migrator {
|
||||
|
||||
// local data carriers
|
||||
|
||||
class migrator {
|
||||
public:
|
||||
static const std::unordered_set<sstring> legacy_schema_tables;
|
||||
|
||||
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
|
||||
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
|
||||
}
|
||||
migrator(migrator&&) = default;
|
||||
|
||||
typedef db_clock::time_point time_point;
|
||||
|
||||
// TODO: we don't support triggers.
|
||||
// this is a placeholder.
|
||||
struct trigger {
|
||||
time_point timestamp;
|
||||
sstring name;
|
||||
std::unordered_map<sstring, sstring> options;
|
||||
};
|
||||
|
||||
struct table {
|
||||
time_point timestamp;
|
||||
schema_ptr metadata;
|
||||
std::vector<trigger> triggers;
|
||||
};
|
||||
|
||||
struct type {
|
||||
time_point timestamp;
|
||||
user_type metadata;
|
||||
};
|
||||
|
||||
struct function {
|
||||
time_point timestamp;
|
||||
sstring ks_name;
|
||||
sstring fn_name;
|
||||
std::vector<sstring> arg_names;
|
||||
std::vector<sstring> arg_types;
|
||||
sstring return_type;
|
||||
bool called_on_null_input;
|
||||
sstring language;
|
||||
sstring body;
|
||||
};
|
||||
|
||||
struct aggregate {
|
||||
time_point timestamp;
|
||||
sstring ks_name;
|
||||
sstring fn_name;
|
||||
std::vector<sstring> arg_names;
|
||||
std::vector<sstring> arg_types;
|
||||
sstring return_type;
|
||||
sstring final_func;
|
||||
sstring initcond;
|
||||
sstring state_func;
|
||||
sstring state_type;
|
||||
};
|
||||
|
||||
struct keyspace {
|
||||
time_point timestamp;
|
||||
sstring name;
|
||||
bool durable_writes;
|
||||
std::map<sstring, sstring> replication_params;
|
||||
|
||||
std::vector<table> tables;
|
||||
std::vector<type> types;
|
||||
std::vector<function> functions;
|
||||
std::vector<aggregate> aggregates;
|
||||
};
|
||||
|
||||
class unsupported_feature : public std::runtime_error {
|
||||
public:
|
||||
using runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
static sstring fmt_query(const char* fmt, const char* table) {
|
||||
return fmt::format(fmt::runtime(fmt), db::system_keyspace::NAME, table);
|
||||
}
|
||||
|
||||
typedef ::shared_ptr<cql3::untyped_result_set> result_set_type;
|
||||
typedef const cql3::untyped_result_set::row row_type;
|
||||
|
||||
future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) {
|
||||
auto fmt = "SELECT * FROM {}.{} WHERE keyspace_name = ? AND columnfamily_name = ?";
|
||||
auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES);
|
||||
auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS);
|
||||
auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS);
|
||||
|
||||
typedef std::tuple<future<result_set_type>, future<result_set_type>, future<result_set_type>, future<db::schema_tables::legacy::schema_mutations>> result_tuple;
|
||||
|
||||
return when_all(_qp.execute_internal(tq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
|
||||
_qp.execute_internal(cq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
|
||||
_qp.execute_internal(zq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
|
||||
db::schema_tables::legacy::read_table_mutations(_sp, dst.name, cf_name, db::system_keyspace::legacy::column_families()))
|
||||
.then([&dst, cf_name, timestamp](result_tuple&& t) {
|
||||
|
||||
result_set_type tables = std::get<0>(t).get();
|
||||
result_set_type columns = std::get<1>(t).get();
|
||||
result_set_type triggers = std::get<2>(t).get();
|
||||
db::schema_tables::legacy::schema_mutations sm = std::get<3>(t).get();
|
||||
|
||||
row_type& td = tables->one();
|
||||
|
||||
auto ks_name = td.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = td.get_as<sstring>("columnfamily_name");
|
||||
auto id = table_id(td.get_or("cf_id", generate_legacy_id(ks_name, cf_name).uuid()));
|
||||
|
||||
schema_builder builder(dst.name, cf_name, id);
|
||||
|
||||
builder.with_version(sm.digest());
|
||||
|
||||
cf_type cf = sstring_to_cf_type(td.get_or("type", sstring("standard")));
|
||||
if (cf == cf_type::super) {
|
||||
fail(unimplemented::cause::SUPER);
|
||||
}
|
||||
|
||||
auto comparator = td.get_as<sstring>("comparator");
|
||||
bool is_compound = cell_comparator::check_compound(comparator);
|
||||
builder.set_is_compound(is_compound);
|
||||
cell_comparator::read_collections(builder, comparator);
|
||||
|
||||
bool filter_sparse = false;
|
||||
|
||||
data_type default_validator = {};
|
||||
if (td.has("default_validator")) {
|
||||
default_validator = db::schema_tables::parse_type(td.get_as<sstring>("default_validator"));
|
||||
if (default_validator->is_counter()) {
|
||||
builder.set_is_counter(true);
|
||||
}
|
||||
builder.set_default_validation_class(default_validator);
|
||||
}
|
||||
|
||||
/*
|
||||
* Determine whether or not the table is *really* dense
|
||||
* We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
|
||||
* but we can trust is_dense value of false.
|
||||
*/
|
||||
auto is_dense = td.get_opt<bool>("is_dense");
|
||||
if (!is_dense || *is_dense) {
|
||||
is_dense = [&] {
|
||||
/*
|
||||
* As said above, this method is only here because we need to deal with thrift upgrades.
|
||||
* Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
|
||||
* then we'll have saved the "is_dense" value and will be good to go.
|
||||
*
|
||||
* But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
|
||||
* to infer that information without relying on it in that case. And for the most part this is
|
||||
* easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
|
||||
* having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
|
||||
* PRIMARY KEY defined.
|
||||
*
|
||||
* So we need to recognize those special case CQL3 table with only a primary key. If we have some
|
||||
* clustering columns, we're fine as said above. So the only problem is that we cannot decide for
|
||||
* sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
|
||||
* has been created in CQL3 by say:
|
||||
* CREATE TABLE test (k int PRIMARY KEY)
|
||||
* in which case it should not be dense. However, we can limit our margin of error by assuming we are
|
||||
* in the latter case only if the comparator is exactly CompositeType(UTF8Type).
|
||||
*/
|
||||
std::optional<column_id> max_cl_idx;
|
||||
const cql3::untyped_result_set::row * regular = nullptr;
|
||||
for (auto& row : *columns) {
|
||||
auto kind_str = row.get_as<sstring>("type");
|
||||
if (kind_str == "compact_value") {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto kind = db::schema_tables::deserialize_kind(kind_str);
|
||||
|
||||
if (kind == column_kind::regular_column) {
|
||||
if (regular != nullptr) {
|
||||
return false;
|
||||
}
|
||||
regular = &row;
|
||||
continue;
|
||||
}
|
||||
if (kind == column_kind::clustering_key) {
|
||||
max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id()));
|
||||
}
|
||||
}
|
||||
|
||||
auto is_cql3_only_pk_comparator = [](const sstring& comparator) {
|
||||
if (!cell_comparator::check_compound(comparator)) {
|
||||
return false;
|
||||
}
|
||||
// CMH. We don't have composites, nor a parser for it. This is a simple way of c
|
||||
// checking the same.
|
||||
auto comma = comparator.find(',');
|
||||
if (comma != sstring::npos) {
|
||||
return false;
|
||||
}
|
||||
auto off = comparator.find('(');
|
||||
auto end = comparator.find(')');
|
||||
|
||||
return comparator.compare(off, end - off, utf8_type->name()) == 0;
|
||||
};
|
||||
|
||||
if (max_cl_idx) {
|
||||
auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1
|
||||
return *max_cl_idx == n;
|
||||
}
|
||||
|
||||
if (regular) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return !is_cql3_only_pk_comparator(comparator);
|
||||
|
||||
}();
|
||||
|
||||
// now, if switched to sparse, remove redundant compact_value column and the last clustering column,
|
||||
// directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
|
||||
|
||||
filter_sparse = !*is_dense;
|
||||
}
|
||||
builder.set_is_dense(*is_dense);
|
||||
|
||||
auto is_cql = !*is_dense && is_compound;
|
||||
auto is_static_compact = !*is_dense && !is_compound;
|
||||
|
||||
// org.apache.cassandra.schema.LegacySchemaMigrator#isEmptyCompactValueColumn
|
||||
auto is_empty_compact_value = [](const cql3::untyped_result_set::row& column_row) {
|
||||
auto kind_str = column_row.get_as<sstring>("type");
|
||||
// Cassandra only checks for "compact_value", but Scylla generates "regular" instead (#2586)
|
||||
return (kind_str == "compact_value" || kind_str == "regular")
|
||||
&& column_row.get_as<sstring>("column_name").empty();
|
||||
};
|
||||
|
||||
for (auto& row : *columns) {
|
||||
auto kind_str = row.get_as<sstring>("type");
|
||||
auto kind = db::schema_tables::deserialize_kind(kind_str);
|
||||
auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0));
|
||||
auto name = row.get_or<sstring>("column_name", sstring());
|
||||
auto validator = db::schema_tables::parse_type(row.get_as<sstring>("validator"));
|
||||
|
||||
if (is_empty_compact_value(row)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (filter_sparse) {
|
||||
if (kind_str == "compact_value") {
|
||||
continue;
|
||||
}
|
||||
if (kind == column_kind::clustering_key) {
|
||||
if (cf == cf_type::super && component_index != 0) {
|
||||
continue;
|
||||
}
|
||||
if (cf != cf_type::super && !is_compound) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<index_metadata_kind> index_kind;
|
||||
sstring index_name;
|
||||
index_options_map options;
|
||||
if (row.has("index_type")) {
|
||||
index_kind = schema_tables::deserialize_index_kind(row.get_as<sstring>("index_type"));
|
||||
}
|
||||
if (row.has("index_name")) {
|
||||
index_name = row.get_as<sstring>("index_name");
|
||||
}
|
||||
if (row.has("index_options")) {
|
||||
sstring index_options_str = row.get_as<sstring>("index_options");
|
||||
options = rjson::parse_to_map<index_options_map>(std::string_view(index_options_str));
|
||||
sstring type;
|
||||
auto i = options.find("index_keys");
|
||||
if (i != options.end()) {
|
||||
options.erase(i);
|
||||
type = "KEYS";
|
||||
}
|
||||
i = options.find("index_keys_and_values");
|
||||
if (i != options.end()) {
|
||||
options.erase(i);
|
||||
type = "KEYS_AND_VALUES";
|
||||
}
|
||||
if (type.empty()) {
|
||||
if (validator->is_collection() && validator->is_multi_cell()) {
|
||||
type = "FULL";
|
||||
} else {
|
||||
type = "VALUES";
|
||||
}
|
||||
}
|
||||
auto column = cql3::util::maybe_quote(name);
|
||||
options["target"] = validator->is_collection()
|
||||
? type + "(" + column + ")"
|
||||
: column;
|
||||
}
|
||||
if (index_kind) {
|
||||
// Origin assumes index_name is always set, so let's do the same
|
||||
builder.with_index(index_metadata(index_name, options, *index_kind, index_metadata::is_local_index::no));
|
||||
}
|
||||
|
||||
data_type column_name_type = [&] {
|
||||
if (is_static_compact && kind == column_kind::regular_column) {
|
||||
return db::schema_tables::parse_type(comparator);
|
||||
}
|
||||
return utf8_type;
|
||||
}();
|
||||
auto column_name = [&] {
|
||||
try {
|
||||
return column_name_type->from_string(name);
|
||||
} catch (marshal_exception&) {
|
||||
// #2597: Scylla < 2.0 writes names in serialized form, try to recover
|
||||
column_name_type->validate(to_bytes_view(name));
|
||||
return to_bytes(name);
|
||||
}
|
||||
}();
|
||||
builder.with_column_ordered(column_definition(std::move(column_name), std::move(validator), kind, component_index));
|
||||
}
|
||||
|
||||
if (is_static_compact) {
|
||||
builder.set_regular_column_name_type(db::schema_tables::parse_type(comparator));
|
||||
}
|
||||
|
||||
if (td.has("gc_grace_seconds")) {
|
||||
builder.set_gc_grace_seconds(td.get_as<int32_t>("gc_grace_seconds"));
|
||||
}
|
||||
if (td.has("min_compaction_threshold")) {
|
||||
builder.set_min_compaction_threshold(td.get_as<int32_t>("min_compaction_threshold"));
|
||||
}
|
||||
if (td.has("max_compaction_threshold")) {
|
||||
builder.set_max_compaction_threshold(td.get_as<int32_t>("max_compaction_threshold"));
|
||||
}
|
||||
if (td.has("comment")) {
|
||||
builder.set_comment(td.get_as<sstring>("comment"));
|
||||
}
|
||||
if (td.has("memtable_flush_period_in_ms")) {
|
||||
builder.set_memtable_flush_period(td.get_as<int32_t>("memtable_flush_period_in_ms"));
|
||||
}
|
||||
if (td.has("caching")) {
|
||||
builder.set_caching_options(caching_options::from_sstring(td.get_as<sstring>("caching")));
|
||||
}
|
||||
if (td.has("default_time_to_live")) {
|
||||
builder.set_default_time_to_live(gc_clock::duration(td.get_as<int32_t>("default_time_to_live")));
|
||||
}
|
||||
if (td.has("speculative_retry")) {
|
||||
builder.set_speculative_retry(td.get_as<sstring>("speculative_retry"));
|
||||
}
|
||||
if (td.has("compaction_strategy_class")) {
|
||||
auto strategy = td.get_as<sstring>("compaction_strategy_class");
|
||||
try {
|
||||
builder.set_compaction_strategy(compaction::compaction_strategy::type(strategy));
|
||||
} catch (const exceptions::configuration_exception& e) {
|
||||
// If compaction strategy class isn't supported, fallback to incremental.
|
||||
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
|
||||
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
|
||||
}
|
||||
}
|
||||
if (td.has("compaction_strategy_options")) {
|
||||
sstring strategy_options_str = td.get_as<sstring>("compaction_strategy_options");
|
||||
builder.set_compaction_strategy_options(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options_str)));
|
||||
}
|
||||
auto comp_param = td.get_as<sstring>("compression_parameters");
|
||||
compression_parameters cp(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(comp_param)));
|
||||
builder.set_compressor_params(cp);
|
||||
|
||||
if (td.has("min_index_interval")) {
|
||||
builder.set_min_index_interval(td.get_as<int32_t>("min_index_interval"));
|
||||
} else if (td.has("index_interval")) { // compatibility
|
||||
builder.set_min_index_interval(td.get_as<int32_t>("index_interval"));
|
||||
}
|
||||
if (td.has("max_index_interval")) {
|
||||
builder.set_max_index_interval(td.get_as<int32_t>("max_index_interval"));
|
||||
}
|
||||
if (td.has("bloom_filter_fp_chance")) {
|
||||
builder.set_bloom_filter_fp_chance(td.get_as<double>("bloom_filter_fp_chance"));
|
||||
} else {
|
||||
builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance());
|
||||
}
|
||||
if (td.has("dropped_columns")) {
|
||||
auto map = td.get_map<sstring, int64_t>("dropped_columns");
|
||||
for (auto&& e : map) {
|
||||
builder.without_column(e.first, api::timestamp_type(e.second));
|
||||
};
|
||||
}
|
||||
|
||||
// ignore version. we're transient
|
||||
if (!triggers->empty()) {
|
||||
throw unsupported_feature("triggers");
|
||||
}
|
||||
|
||||
dst.tables.emplace_back(table{timestamp, builder.build() });
|
||||
});
|
||||
}
|
||||
|
||||
future<> read_tables(keyspace& dst) {
|
||||
auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM {}.{} WHERE keyspace_name = ?",
|
||||
db::system_keyspace::legacy::COLUMNFAMILIES);
|
||||
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
|
||||
return parallel_for_each(*result, [this, &dst](row_type& row) {
|
||||
return read_table(dst, row.get_as<sstring>("columnfamily_name"), row.get_as<time_point>("timestamp"));
|
||||
}).finally([result] {});
|
||||
});
|
||||
}
|
||||
|
||||
future<time_point> read_type_timestamp(keyspace& dst, sstring type_name) {
|
||||
// TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
|
||||
// use the writeTime() CQL function, and must resort to a lower level.
|
||||
// Origin digs up the actual cells of target partition and gets timestamp from there.
|
||||
// We should do the same, but g-dam that's messy. Lets give back dung value for now.
|
||||
return make_ready_future<time_point>(dst.timestamp);
|
||||
}
|
||||
|
||||
future<> read_types(keyspace& dst) {
|
||||
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
|
||||
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
|
||||
return parallel_for_each(*result, [this, &dst](row_type& row) {
|
||||
auto name = row.get_blob_unfragmented("type_name");
|
||||
auto columns = row.get_list<bytes>("field_names");
|
||||
auto types = row.get_list<sstring>("field_types");
|
||||
std::vector<data_type> field_types;
|
||||
for (auto&& value : types) {
|
||||
field_types.emplace_back(db::schema_tables::parse_type(value));
|
||||
}
|
||||
auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types, false);
|
||||
return read_type_timestamp(dst, value_cast<sstring>(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) {
|
||||
dst.types.emplace_back(type{timestamp, ut});
|
||||
});
|
||||
}).finally([result] {});
|
||||
});
|
||||
}
|
||||
|
||||
future<> read_functions(keyspace& dst) {
|
||||
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS);
|
||||
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
|
||||
if (!result->empty()) {
|
||||
throw unsupported_feature("functions");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> read_aggregates(keyspace& dst) {
|
||||
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES);
|
||||
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
|
||||
if (!result->empty()) {
|
||||
throw unsupported_feature("aggregates");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<keyspace> read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) {
|
||||
auto map = rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options));
|
||||
map.emplace("class", std::move(strategy_class));
|
||||
auto ks = ::make_lw_shared<keyspace>(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) });
|
||||
|
||||
return read_tables(*ks).then([this, ks] {
|
||||
//Collection<Type> types = readTypes(keyspaceName);
|
||||
return read_types(*ks);
|
||||
}).then([this, ks] {
|
||||
return read_functions(*ks);
|
||||
}).then([this, ks] {
|
||||
return read_aggregates(*ks);
|
||||
}).then([ks] {
|
||||
return make_ready_future<keyspace>(std::move(*ks));
|
||||
});
|
||||
}
|
||||
|
||||
future<> read_all_keyspaces() {
|
||||
static auto ks_filter = [](row_type& row) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME;
|
||||
};
|
||||
|
||||
auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM {}.{}",
|
||||
db::system_keyspace::legacy::KEYSPACES);
|
||||
|
||||
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this](result_set_type result) {
|
||||
auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end());
|
||||
auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end());
|
||||
return parallel_for_each(i, e, [this](row_type& row) {
|
||||
return read_keyspace(row.get_as<sstring>("keyspace_name")
|
||||
, row.get_as<bool>("durable_writes")
|
||||
, row.get_as<sstring>("strategy_class")
|
||||
, row.get_as<sstring>("strategy_options")
|
||||
, row.get_as<db_clock::time_point>("timestamp")
|
||||
).then([this](keyspace ks) {
|
||||
_keyspaces.emplace_back(std::move(ks));
|
||||
});
|
||||
}).finally([result] {});
|
||||
});
|
||||
}
|
||||
|
||||
future<> drop_legacy_tables() {
|
||||
mlogger.info("Dropping legacy schema tables");
|
||||
auto with_snapshot = !_keyspaces.empty();
|
||||
for (const sstring& cfname : legacy_schema_tables) {
|
||||
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
future<> store_keyspaces_in_new_schema_tables() {
|
||||
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
|
||||
_keyspaces.size(), db::schema_tables::v3::NAME);
|
||||
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
|
||||
for (auto& ks : _keyspaces) {
|
||||
auto ksm = ::make_lw_shared<keyspace_metadata>(ks.name
|
||||
, ks.replication_params["class"] // TODO, make ksm like c3?
|
||||
, cql3::statements::property_definitions::to_extended_map(ks.replication_params)
|
||||
, std::nullopt
|
||||
, std::nullopt
|
||||
, ks.durable_writes);
|
||||
|
||||
// we want separate time stamps for tables/types, so cannot bulk them into the ksm.
|
||||
for (auto&& m : db::schema_tables::make_create_keyspace_mutations(schema_features::full(), ksm, ks.timestamp.time_since_epoch().count(), false)) {
|
||||
mutations.emplace_back(std::move(m));
|
||||
}
|
||||
for (auto& t : ks.tables) {
|
||||
db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations);
|
||||
}
|
||||
for (auto& t : ks.types) {
|
||||
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
|
||||
}
|
||||
}
|
||||
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
future<> flush_schemas() {
|
||||
auto& db = _qp.db().real_database().container();
|
||||
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::all_table_infos(schema_features::full()));
|
||||
}
|
||||
|
||||
future<> migrate() {
|
||||
return read_all_keyspaces().then([this]() {
|
||||
// write metadata to the new schema tables
|
||||
return store_keyspaces_in_new_schema_tables()
|
||||
.then(std::bind(&migrator::flush_schemas, this))
|
||||
.then(std::bind(&migrator::drop_legacy_tables, this))
|
||||
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
|
||||
});
|
||||
}
|
||||
|
||||
sharded<service::storage_proxy>& _sp;
|
||||
sharded<replica::database>& _db;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
cql3::query_processor& _qp;
|
||||
std::vector<keyspace> _keyspaces;
|
||||
};
|
||||
|
||||
const std::unordered_set<sstring> migrator::legacy_schema_tables = {
|
||||
db::system_keyspace::legacy::KEYSPACES,
|
||||
db::system_keyspace::legacy::COLUMNFAMILIES,
|
||||
db::system_keyspace::legacy::COLUMNS,
|
||||
db::system_keyspace::legacy::TRIGGERS,
|
||||
db::system_keyspace::legacy::USERTYPES,
|
||||
db::system_keyspace::legacy::FUNCTIONS,
|
||||
db::system_keyspace::legacy::AGGREGATES,
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
db::legacy_schema_migrator::migrate(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp) {
|
||||
return do_with(migrator(sp, db, sys_ks, qp), std::bind(&migrator::migrate, std::placeholders::_1));
|
||||
}
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace replica {
|
||||
class database;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
class system_keyspace;
|
||||
|
||||
namespace legacy_schema_migrator {
|
||||
|
||||
future<> migrate(sharded<service::storage_proxy>&, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor&);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -542,6 +542,7 @@ public:
|
||||
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
|
||||
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
|
||||
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
|
||||
// Note that this is different from the meaning of rows_entry::range_tombstone(), which includes the row itself.
|
||||
tombstone range_tombstone() const { return _range_tombstone; }
|
||||
|
||||
// Can be called when cursor is pointing at a row.
|
||||
|
||||
@@ -1287,6 +1287,15 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
|
||||
, _partitions(dht::raw_token_less_comparator{})
|
||||
, _underlying(src())
|
||||
, _snapshot_source(std::move(src))
|
||||
, _update_section(abstract_formatter([this] (fmt::context& ctx) {
|
||||
fmt::format_to(ctx.out(), "cache.update {}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
}))
|
||||
, _populate_section(abstract_formatter([this] (fmt::context& ctx) {
|
||||
fmt::format_to(ctx.out(), "cache.populate {}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
}))
|
||||
, _read_section(abstract_formatter([this] (fmt::context& ctx) {
|
||||
fmt::format_to(ctx.out(), "cache.read {}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
}))
|
||||
{
|
||||
try {
|
||||
with_allocator(_tracker.allocator(), [this, cont] {
|
||||
|
||||
@@ -404,10 +404,7 @@ const std::unordered_set<table_id>& schema_tables_holding_schema_mutations() {
|
||||
computed_columns(),
|
||||
dropped_columns(),
|
||||
indexes(),
|
||||
scylla_tables(),
|
||||
db::system_keyspace::legacy::column_families(),
|
||||
db::system_keyspace::legacy::columns(),
|
||||
db::system_keyspace::legacy::triggers()}) {
|
||||
scylla_tables()}) {
|
||||
SCYLLA_ASSERT(s->clustering_key_size() > 0);
|
||||
auto&& first_column_name = s->clustering_column_at(0).name_as_text();
|
||||
SCYLLA_ASSERT(first_column_name == "table_name"
|
||||
@@ -2840,26 +2837,6 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
|
||||
}
|
||||
|
||||
|
||||
namespace legacy {
|
||||
|
||||
table_schema_version schema_mutations::digest() const {
|
||||
md5_hasher h;
|
||||
const db::schema_features no_features;
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
|
||||
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
|
||||
}
|
||||
|
||||
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
|
||||
sstring keyspace_name, sstring table_name, schema_ptr s)
|
||||
{
|
||||
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
|
||||
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
|
||||
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
|
||||
}
|
||||
|
||||
} // namespace legacy
|
||||
|
||||
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
|
||||
|
||||
@@ -155,24 +155,6 @@ schema_ptr scylla_table_schema_history();
|
||||
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
|
||||
}
|
||||
|
||||
namespace legacy {
|
||||
|
||||
class schema_mutations {
|
||||
mutation _columnfamilies;
|
||||
mutation _columns;
|
||||
public:
|
||||
schema_mutations(mutation columnfamilies, mutation columns)
|
||||
: _columnfamilies(std::move(columnfamilies))
|
||||
, _columns(std::move(columns))
|
||||
{ }
|
||||
table_schema_version digest() const;
|
||||
};
|
||||
|
||||
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
|
||||
sstring keyspace_name, sstring table_name, schema_ptr s);
|
||||
|
||||
}
|
||||
|
||||
struct qualified_name {
|
||||
sstring keyspace_name;
|
||||
sstring table_name;
|
||||
|
||||
@@ -847,8 +847,6 @@ schema_ptr system_keyspace::corrupt_data() {
|
||||
return corrupt_data;
|
||||
}
|
||||
|
||||
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
|
||||
|
||||
/*static*/ schema_ptr system_keyspace::scylla_local() {
|
||||
static thread_local auto scylla_local = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
|
||||
@@ -1360,289 +1358,6 @@ schema_ptr system_keyspace::role_permissions() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::hints() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
|
||||
// partition key
|
||||
{{"target_id", uuid_type}},
|
||||
// clustering key
|
||||
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
|
||||
// regular columns
|
||||
{{"mutation", bytes_type}},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* hints awaiting delivery"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
|
||||
builder.set_compaction_strategy_options({{"enabled", "false"}});
|
||||
builder.with(schema_builder::compact_storage::yes);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::batchlog() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
|
||||
// partition key
|
||||
{{"id", uuid_type}},
|
||||
// clustering key
|
||||
{},
|
||||
// regular columns
|
||||
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* batchlog entries"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
|
||||
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::keyspaces() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, KEYSPACES), NAME, KEYSPACES,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{},
|
||||
// regular columns
|
||||
{
|
||||
{"durable_writes", boolean_type},
|
||||
{"strategy_class", utf8_type},
|
||||
{"strategy_options", utf8_type}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* keyspace definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::yes);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::column_families() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, COLUMNFAMILIES), NAME, COLUMNFAMILIES,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"columnfamily_name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"bloom_filter_fp_chance", double_type},
|
||||
{"caching", utf8_type},
|
||||
{"cf_id", uuid_type},
|
||||
{"comment", utf8_type},
|
||||
{"compaction_strategy_class", utf8_type},
|
||||
{"compaction_strategy_options", utf8_type},
|
||||
{"comparator", utf8_type},
|
||||
{"compression_parameters", utf8_type},
|
||||
{"default_time_to_live", int32_type},
|
||||
{"default_validator", utf8_type},
|
||||
{"dropped_columns", map_type_impl::get_instance(utf8_type, long_type, true)},
|
||||
{"gc_grace_seconds", int32_type},
|
||||
{"is_dense", boolean_type},
|
||||
{"key_validator", utf8_type},
|
||||
{"max_compaction_threshold", int32_type},
|
||||
{"max_index_interval", int32_type},
|
||||
{"memtable_flush_period_in_ms", int32_type},
|
||||
{"min_compaction_threshold", int32_type},
|
||||
{"min_index_interval", int32_type},
|
||||
{"speculative_retry", utf8_type},
|
||||
{"subcomparator", utf8_type},
|
||||
{"type", utf8_type},
|
||||
// The following 4 columns are only present up until 2.1.8 tables
|
||||
{"key_aliases", utf8_type},
|
||||
{"value_alias", utf8_type},
|
||||
{"column_aliases", utf8_type},
|
||||
{"index_interval", int32_type},},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* table definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::columns() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, COLUMNS), NAME, COLUMNS,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"columnfamily_name", utf8_type}, {"column_name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"component_index", int32_type},
|
||||
{"index_name", utf8_type},
|
||||
{"index_options", utf8_type},
|
||||
{"index_type", utf8_type},
|
||||
{"type", utf8_type},
|
||||
{"validator", utf8_type},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"column definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::triggers() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, TRIGGERS), NAME, TRIGGERS,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"columnfamily_name", utf8_type}, {"trigger_name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"trigger_options", map_type_impl::get_instance(utf8_type, utf8_type, true)},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"trigger definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::usertypes() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, USERTYPES), NAME, USERTYPES,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"type_name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"field_names", list_type_impl::get_instance(utf8_type, true)},
|
||||
{"field_types", list_type_impl::get_instance(utf8_type, true)},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"user defined type definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::functions() {
|
||||
/**
|
||||
* Note: we have our own "legacy" version of this table (in schema_tables),
|
||||
* but it is (afaik) not used, and differs slightly from the origin one.
|
||||
* This is based on the origin schema, since we're more likely to encounter
|
||||
* installations of that to migrate, rather than our own (if we dont use the table).
|
||||
*/
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, FUNCTIONS), NAME, FUNCTIONS,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"function_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
|
||||
// regular columns
|
||||
{
|
||||
{"argument_names", list_type_impl::get_instance(utf8_type, true)},
|
||||
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
|
||||
{"body", utf8_type},
|
||||
{"language", utf8_type},
|
||||
{"return_type", utf8_type},
|
||||
{"called_on_null_input", boolean_type},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* user defined type definitions"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::aggregates() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, AGGREGATES), NAME, AGGREGATES,
|
||||
// partition key
|
||||
{{"keyspace_name", utf8_type}},
|
||||
// clustering key
|
||||
{{"aggregate_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
|
||||
// regular columns
|
||||
{
|
||||
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
|
||||
{"final_func", utf8_type},
|
||||
{"initcond", bytes_type},
|
||||
{"return_type", utf8_type},
|
||||
{"state_func", utf8_type},
|
||||
{"state_type", utf8_type},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"*DEPRECATED* user defined aggregate definition"
|
||||
);
|
||||
builder.set_gc_grace_seconds(schema_gc_grace);
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::dicts() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, DICTS);
|
||||
@@ -2615,13 +2330,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
||||
r.insert(r.end(), {sstables_registry()});
|
||||
}
|
||||
// legacy schema
|
||||
r.insert(r.end(), {
|
||||
// TODO: once we migrate hints/batchlog and add converter
|
||||
// legacy::hints(), legacy::batchlog(),
|
||||
legacy::keyspaces(), legacy::column_families(),
|
||||
legacy::columns(), legacy::triggers(), legacy::usertypes(),
|
||||
legacy::functions(), legacy::aggregates(), });
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
@@ -241,28 +241,6 @@ public:
|
||||
static schema_ptr cdc_local();
|
||||
};
|
||||
|
||||
struct legacy {
|
||||
static constexpr auto HINTS = "hints";
|
||||
static constexpr auto BATCHLOG = "batchlog";
|
||||
static constexpr auto KEYSPACES = "schema_keyspaces";
|
||||
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
|
||||
static constexpr auto COLUMNS = "schema_columns";
|
||||
static constexpr auto TRIGGERS = "schema_triggers";
|
||||
static constexpr auto USERTYPES = "schema_usertypes";
|
||||
static constexpr auto FUNCTIONS = "schema_functions";
|
||||
static constexpr auto AGGREGATES = "schema_aggregates";
|
||||
|
||||
static schema_ptr keyspaces();
|
||||
static schema_ptr column_families();
|
||||
static schema_ptr columns();
|
||||
static schema_ptr triggers();
|
||||
static schema_ptr usertypes();
|
||||
static schema_ptr functions();
|
||||
static schema_ptr aggregates();
|
||||
static schema_ptr hints();
|
||||
static schema_ptr batchlog();
|
||||
};
|
||||
|
||||
// Partition estimates for a given range of tokens.
|
||||
struct range_estimates {
|
||||
schema_ptr schema;
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
#include "query/query-result-reader.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
@@ -25,8 +27,14 @@ class delete_ghost_rows_visitor {
|
||||
replica::table& _view_table;
|
||||
schema_ptr _base_schema;
|
||||
std::optional<partition_key> _view_pk;
|
||||
db::timeout_semaphore _concurrency_semaphore;
|
||||
seastar::gate _gate;
|
||||
std::exception_ptr& _ex;
|
||||
|
||||
public:
|
||||
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
|
||||
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
|
||||
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
|
||||
~delete_ghost_rows_visitor() noexcept;
|
||||
|
||||
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
|
||||
}
|
||||
@@ -45,6 +53,9 @@ public:
|
||||
uint32_t accept_partition_end(const query::result_row_view& static_row) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_accept_new_row(partition_key pk, clustering_key ck);
|
||||
};
|
||||
|
||||
} //namespace db::view
|
||||
|
||||
438
db/view/view.cc
438
db/view/view.cc
@@ -1744,6 +1744,115 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
|
||||
&& std::ranges::contains(shards, this_shard_id());
|
||||
}
|
||||
|
||||
static endpoints_to_update get_view_natural_endpoint_vnodes(
|
||||
locator::host_id me,
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
locator::endpoint_dc_rack my_location,
|
||||
const locator::network_topology_strategy* network_topology,
|
||||
replica::cf_stats& cf_stats) {
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
auto& my_datacenter = my_location.dc;
|
||||
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (!network_topology || node.get().dc() == my_datacenter) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
}
|
||||
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
|
||||
}
|
||||
|
||||
static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
replica::cf_stats& cf_stats) {
|
||||
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
|
||||
for (auto&& base_node : base_nodes) {
|
||||
if (base_dc_racks.contains(base_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
|
||||
base_node.get().dc(), base_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
base_dc_racks.insert(base_node.get().dc_rack());
|
||||
}
|
||||
|
||||
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
|
||||
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
|
||||
view_node.get().dc(), view_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
// Track unpaired replicas in both sets
|
||||
if (base_dc_racks.contains(view_node.get().dc_rack())) {
|
||||
paired_view_dc_racks.insert(view_node.get().dc_rack());
|
||||
} else {
|
||||
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
|
||||
}
|
||||
}
|
||||
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// There are view replicas that can't be paired with any base replica
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
|
||||
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
unpaired_view_dc_rack_replicas | std::views::values);
|
||||
}
|
||||
return extra_replica;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// Calculate the node ("natural endpoint") to which this node should send
|
||||
// a view update.
|
||||
//
|
||||
@@ -1756,29 +1865,19 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
|
||||
// of this function is to find, assuming that this node is one of the base
|
||||
// replicas for a given partition, the paired view replica.
|
||||
//
|
||||
// In the past, we used an optimization called "self-pairing" that if a single
|
||||
// node was both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node would send the update to itself. This self-
|
||||
// pairing optimization could cause the pairing to change after view ranges
|
||||
// are moved between nodes, so currently we only use it if
|
||||
// use_legacy_self_pairing is set to true. When using tablets - where range
|
||||
// movements are common - it is strongly recommended to set it to false.
|
||||
// When using vnodes, we have an optimization called "self-pairing" - if a single
|
||||
// node is both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node sends the update to itself and this node is removed
|
||||
// from the lists of nodes paired by index. This self-pairing optimization can
|
||||
// cause the pairing to change after view ranges are moved between nodes.
|
||||
//
|
||||
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
|
||||
// we pair only nodes in the same datacenter.
|
||||
//
|
||||
// When use_legacy_self_pairing is enabled, if one of the base replicas
|
||||
// also happens to be a view replica, it is paired with itself
|
||||
// (with the other nodes paired by order in the list
|
||||
// after taking this node out).
|
||||
//
|
||||
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
|
||||
// and the replication factor in the node's datacenter is a multiple of the number
|
||||
// of racks in the datacenter, then pairing is rack-aware. In this case,
|
||||
// all racks have the same number of replicas, and those are never migrated
|
||||
// outside their racks. Therefore, the base replicas are naturally paired with the
|
||||
// view replicas that are in the same rack, based on the ordinal position.
|
||||
// Note that typically, there is a single replica per rack and pairing is trivial.
|
||||
// If the table uses tablets, then pairing is rack-aware. In this case, in each
|
||||
// rack where we have a base replica there is also one replica of each view tablet.
|
||||
// Therefore, the base replicas are naturally paired with the view replicas that
|
||||
// are in the same rack.
|
||||
//
|
||||
// If the assumption that the given base token belongs to this replica
|
||||
// does not hold, we return an empty optional.
|
||||
@@ -1806,19 +1905,12 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_rack_aware_view_pairing,
|
||||
bool use_tablets,
|
||||
replica::cf_stats& cf_stats) {
|
||||
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& my_location = topology.get_location(me);
|
||||
auto& my_datacenter = my_location.dc;
|
||||
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
|
||||
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
|
||||
bool simple_rack_aware_pairing = false;
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector orig_base_endpoints, orig_view_endpoints;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
|
||||
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
|
||||
if (auto* np = topology.find_node(ep)) {
|
||||
@@ -1829,6 +1921,7 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
|
||||
// We need to use get_replicas() for pairing to be stable in case base or view tablet
|
||||
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
|
||||
return resolve(topology, ep, false);
|
||||
}) | std::ranges::to<node_vector>();
|
||||
@@ -1852,231 +1945,43 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
|
||||
auto leaving_base = it->get().host_id();
|
||||
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
|
||||
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
view_token, use_tablets, cf_stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::function<bool(const locator::node&)> is_candidate;
|
||||
if (network_topology) {
|
||||
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
|
||||
} else {
|
||||
is_candidate = [&] (const locator::node&) { return true; };
|
||||
}
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (is_candidate(node)) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
if (!use_tablets) {
|
||||
return get_view_natural_endpoint_vnodes(
|
||||
me,
|
||||
base_nodes,
|
||||
view_nodes,
|
||||
my_location,
|
||||
network_topology,
|
||||
cf_stats);
|
||||
}
|
||||
|
||||
if (use_legacy_self_pairing) {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (is_candidate(view_node)) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
process_candidate(view_endpoints, view_node);
|
||||
std::optional<locator::host_id> paired_replica;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (view_node.get().dc_rack() == my_location) {
|
||||
paired_replica = view_node.get().host_id();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Try optimizing for simple rack-aware pairing
|
||||
// If the numbers of base and view replica differ, that means an RF change is taking place
|
||||
// and we can't use simple rack-aware pairing.
|
||||
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
|
||||
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
|
||||
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
|
||||
// Simple rack-aware pairing is possible when the datacenter replication factor
|
||||
// is a multiple of the number of racks in the datacenter.
|
||||
if (dc_rf % racks.size() == 0) {
|
||||
simple_rack_aware_pairing = true;
|
||||
size_t rack_rf = dc_rf / racks.size();
|
||||
// If any rack doesn't have enough nodes to satisfy the per-rack rf
|
||||
// simple rack-aware pairing is disabled.
|
||||
for (const auto& [rack, nodes] : racks) {
|
||||
if (nodes.size() < rack_rf) {
|
||||
simple_rack_aware_pairing = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (dc_rf != base_endpoints.size()) {
|
||||
// If the datacenter replication factor is not equal to the number of base replicas,
|
||||
// we're in progress of a RF change and we can't use simple rack-aware pairing.
|
||||
simple_rack_aware_pairing = false;
|
||||
}
|
||||
if (simple_rack_aware_pairing) {
|
||||
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
}
|
||||
if (paired_replica && base_nodes.size() == view_nodes.size()) {
|
||||
// We don't need to find any extra replicas, so we can return early
|
||||
return {.natural_endpoint = paired_replica};
|
||||
}
|
||||
|
||||
orig_base_endpoints = base_endpoints;
|
||||
orig_view_endpoints = view_endpoints;
|
||||
|
||||
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
|
||||
// Use best-match, for the minimum number of base and view replicas in each rack,
|
||||
// and ordinal match for the rest.
|
||||
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
|
||||
if (rack_aware_pairing && !simple_rack_aware_pairing) {
|
||||
struct indexed_replica {
|
||||
size_t idx;
|
||||
std::reference_wrapper<const locator::node> node;
|
||||
};
|
||||
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
|
||||
|
||||
// First, index all replicas by rack
|
||||
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
|
||||
size_t idx = 0;
|
||||
for (const auto& r: replicas) {
|
||||
racks[r.get().rack()].emplace_back(idx++, r);
|
||||
}
|
||||
};
|
||||
index_replica_set(base_racks, base_endpoints);
|
||||
index_replica_set(view_racks, view_endpoints);
|
||||
|
||||
// Try optimistically pairing `me` first
|
||||
const auto& my_base_replicas = base_racks[my_location.rack];
|
||||
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
|
||||
if (base_it == my_base_replicas.end()) {
|
||||
return {};
|
||||
}
|
||||
const auto& my_view_replicas = view_racks[my_location.rack];
|
||||
size_t idx = base_it - my_base_replicas.begin();
|
||||
if (idx < my_view_replicas.size()) {
|
||||
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
|
||||
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
|
||||
} else {
|
||||
// If the number of view replicas is larger than the number of base replicas,
|
||||
// we need to find the unpaired view replica, so we can't return yet.
|
||||
paired_replica = my_view_replicas[idx].node;
|
||||
}
|
||||
}
|
||||
|
||||
// Collect all unpaired base and view replicas,
|
||||
// where the number of replicas in the base rack is different than the respective view rack
|
||||
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
|
||||
for (const auto& [rack, base_replicas] : base_racks) {
|
||||
const auto& view_replicas = view_racks[rack];
|
||||
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
|
||||
unpaired_base_replicas.emplace_back(base_replicas[i]);
|
||||
}
|
||||
}
|
||||
for (const auto& [rack, view_replicas] : view_racks) {
|
||||
const auto& base_replicas = base_racks[rack];
|
||||
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
|
||||
unpaired_view_replicas.emplace_back(view_replicas[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by the original ordinality, and copy the sorted results
|
||||
// back into {base,view}_endpoints, for backward compatible processing below.
|
||||
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
base_endpoints.clear();
|
||||
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
|
||||
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
view_endpoints.clear();
|
||||
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (!paired_replica && base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
|
||||
if (!paired_replica && idx >= view_endpoints.size()) {
|
||||
// There are fewer view replicas than base replicas
|
||||
// FIXME: This might still happen when reducing replication factor with tablets,
|
||||
// see https://github.com/scylladb/scylladb/issues/21492
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
|
||||
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
} else if (base_endpoints.size() < view_endpoints.size()) {
|
||||
// There are fewer base replicas than view replicas.
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_endpoints.back();
|
||||
if (base_endpoints.size() < view_endpoints.size() - 1) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
}
|
||||
}
|
||||
|
||||
if (!paired_replica) {
|
||||
paired_replica = view_endpoints[idx];
|
||||
// We couldn't find any view replica in our rack
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
|
||||
me,
|
||||
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
}
|
||||
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
|
||||
// This can happen when the view replica with no pairing is in another DC.
|
||||
// We need to send an update to it if there are no base replicas in that DC yet,
|
||||
// as it won't receive updates otherwise.
|
||||
std::unordered_set<sstring> dcs_with_base_replicas;
|
||||
for (const auto& base_node : base_nodes) {
|
||||
dcs_with_base_replicas.insert(base_node.get().dc());
|
||||
}
|
||||
for (const auto& view_node : view_nodes) {
|
||||
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// https://github.com/scylladb/scylladb/issues/19439
|
||||
// With tablets, a node being replaced might transition to "left" state
|
||||
// but still be kept as a replica.
|
||||
// As of writing this hints are not prepared to handle nodes that are left
|
||||
// but are still replicas. Therefore, there is no other sensible option
|
||||
// right now but to give up attempt to send the update or write a hint
|
||||
// to the paired, permanently down replica.
|
||||
// We use the same workaround for the extra replica.
|
||||
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
|
||||
if (!replica) {
|
||||
return std::nullopt;
|
||||
}
|
||||
const auto& node = replica->get();
|
||||
if (!node.left()) {
|
||||
return node.host_id();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
|
||||
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
|
||||
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
|
||||
return {.natural_endpoint = paired_replica,
|
||||
.endpoint_with_no_pairing = no_pairing_replica};
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
@@ -2136,12 +2041,6 @@ future<> view_update_generator::mutate_MV(
|
||||
{
|
||||
auto& ks = _db.find_keyspace(base->ks_name());
|
||||
auto& replication = ks.get_replication_strategy();
|
||||
// We set legacy self-pairing for old vnode-based tables (for backward
|
||||
// compatibility), and unset it for tablets - where range movements
|
||||
// are more frequent and backward compatibility is less important.
|
||||
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
|
||||
// on a view, like we have the synchronous_updates_flag.
|
||||
bool use_legacy_self_pairing = !ks.uses_tablets();
|
||||
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
|
||||
auto get_erm = [&] (table_id id) {
|
||||
auto it = erms.find(id);
|
||||
@@ -2154,10 +2053,6 @@ future<> view_update_generator::mutate_MV(
|
||||
for (const auto& mut : view_updates) {
|
||||
(void)get_erm(mut.s->id());
|
||||
}
|
||||
// Enable rack-aware view updates pairing for tablets
|
||||
// when the cluster feature is enabled so that all replicas agree
|
||||
// on the pairing algorithm.
|
||||
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
|
||||
auto me = base_ermp->get_topology().my_host_id();
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
||||
@@ -2165,7 +2060,7 @@ future<> view_update_generator::mutate_MV(
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto view_ermp = erms.at(mut.s->id());
|
||||
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
|
||||
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
ks.uses_tablets(), cf_stats);
|
||||
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
|
||||
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
|
||||
if (no_pairing_endpoint) {
|
||||
@@ -3597,7 +3492,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
|
||||
})
|
||||
{ }
|
||||
|
||||
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
|
||||
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
|
||||
: _proxy(proxy)
|
||||
, _state(state)
|
||||
, _timeout_duration(timeout_duration)
|
||||
@@ -3605,8 +3500,20 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
|
||||
, _view_table(_proxy.get_db().local().find_column_family(view))
|
||||
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
|
||||
, _view_pk()
|
||||
, _concurrency_semaphore(concurrency)
|
||||
, _ex(ex)
|
||||
{}
|
||||
|
||||
|
||||
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
|
||||
try {
|
||||
_gate.close().get();
|
||||
} catch (...) {
|
||||
// Closing the gate should never throw, but if it does anyway, capture the exception.
|
||||
_ex = std::current_exception();
|
||||
}
|
||||
}
|
||||
|
||||
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
_view_pk = key;
|
||||
@@ -3614,7 +3521,18 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
|
||||
|
||||
// Assumes running in seastar::thread
|
||||
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto view_exploded_pk = _view_pk->explode();
|
||||
auto units = get_units(_concurrency_semaphore, 1).get();
|
||||
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
|
||||
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
|
||||
if (f.failed()) {
|
||||
_ex = f.get_exception();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
|
||||
auto view_exploded_pk = pk.explode();
|
||||
auto view_exploded_ck = ck.explode();
|
||||
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
|
||||
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
|
||||
@@ -3649,17 +3567,17 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
|
||||
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
auto timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
|
||||
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
|
||||
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
|
||||
query::result& result = *base_qr.query_result;
|
||||
auto delete_ghost_row = [&]() {
|
||||
mutation m(_view, *_view_pk);
|
||||
auto delete_ghost_row = [&]() -> future<> {
|
||||
mutation m(_view, pk);
|
||||
auto& row = m.partition().clustered_row(*_view, ck);
|
||||
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
|
||||
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
|
||||
};
|
||||
if (result.row_count().value_or(0) == 0) {
|
||||
delete_ghost_row();
|
||||
co_await delete_ghost_row();
|
||||
} else if (!view_key_cols_not_in_base_key.empty()) {
|
||||
if (result.row_count().value_or(0) != 1) {
|
||||
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
|
||||
@@ -3669,7 +3587,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
|
||||
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
|
||||
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
|
||||
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
|
||||
delete_ghost_row();
|
||||
co_await delete_ghost_row();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,8 +305,7 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_basic_rack_aware_view_pairing,
|
||||
bool use_tablets,
|
||||
replica::cf_stats& cf_stats);
|
||||
|
||||
/// Verify that the provided keyspace is eligible for storing materialized views.
|
||||
|
||||
2
dist/common/sysconfig/scylla-node-exporter
vendored
2
dist/common/sysconfig/scylla-node-exporter
vendored
@@ -1 +1 @@
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -2,7 +2,6 @@ etc/default/scylla-server
|
||||
etc/default/scylla-housekeeping
|
||||
etc/scylla.d/*.conf
|
||||
etc/bash_completion.d/nodetool-completion
|
||||
opt/scylladb/share/p11-kit/modules/*
|
||||
opt/scylladb/share/doc/scylla/*
|
||||
opt/scylladb/share/doc/scylla/licenses/
|
||||
usr/lib/systemd/system/*.timer
|
||||
|
||||
1
dist/redhat/scylla.spec
vendored
1
dist/redhat/scylla.spec
vendored
@@ -122,7 +122,6 @@ ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
%config(noreplace) %{_sysconfdir}/sysconfig/scylla-housekeeping
|
||||
%attr(0755,root,root) %dir %{_sysconfdir}/scylla.d
|
||||
%config(noreplace) %{_sysconfdir}/scylla.d/*.conf
|
||||
/opt/scylladb/share/p11-kit/modules/*
|
||||
/opt/scylladb/share/doc/scylla/*
|
||||
%{_unitdir}/scylla-fstrim.service
|
||||
%{_unitdir}/scylla-housekeeping-daily.service
|
||||
|
||||
@@ -1,6 +1,18 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
# Move the diver information to another project
|
||||
|
||||
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
|
||||
/stable/using-scylla/drivers/dynamo-drivers/index.html: https://docs.scylladb.com/stable/drivers/dynamo-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/index.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-python-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-java-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-go-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-gocqlx-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-cpp-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-rust-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
|
||||
# Redirect 2025.1 upgrade guides that are not on master but were indexed by Google (404 reported)
|
||||
|
||||
/master/upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/upgrade-guide-from-2024.x-to-2025.1.html: https://docs.scylladb.com/manual/stable/upgrade/index.html
|
||||
|
||||
@@ -106,6 +106,15 @@ which is recommended in order to make the operation less heavyweight
|
||||
and allow for running multiple parallel pruning statements for non-overlapping
|
||||
token ranges.
|
||||
|
||||
By default, the PRUNE MATERIALIZED VIEW statement is relatively slow, only
|
||||
performing one base read or write at a time. This can be changed with the
|
||||
USING CONCURRENCY clause. If the clause is used, the concurrency of reads
|
||||
and writes from the base table will be allowed to increase up to the specified
|
||||
value. For example, to run the PRUNE with 100 parallel reads/writes, you can use:
|
||||
```cql
|
||||
PRUNE MATERIALIZED VIEW my_view WHERE v = 19 USING CONCURRENCY 100;
|
||||
```
|
||||
|
||||
## Synchronous materialized views
|
||||
|
||||
Usually, when a table with materialized views is updated, the update to the
|
||||
|
||||
@@ -28,7 +28,8 @@ Incremental Repair is only supported for tables that use the tablets architectur
|
||||
Incremental Repair Modes
|
||||
------------------------
|
||||
|
||||
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
|
||||
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
|
||||
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
|
||||
|
||||
The available modes are:
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`ScyllaDB Drivers</using-scylla/drivers/index>`
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
|
||||
* `Get Started Lesson on ScyllaDB University <https://university.scylladb.com/courses/scylla-essentials-overview/lessons/quick-wins-install-and-run-scylla/>`_
|
||||
* :doc:`CQL Reference </cql/index>`
|
||||
* :doc:`cqlsh - the CQL shell </cql/cqlsh/>`
|
||||
|
||||
@@ -35,7 +35,7 @@ Documentation Highlights
|
||||
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
|
||||
* :doc:`Upgrade ScyllaDB </upgrade/index>`
|
||||
* :doc:`CQL Reference </cql/index>`
|
||||
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
|
||||
* :doc:`Features </features/index>`
|
||||
|
||||
ScyllaDB Support
|
||||
|
||||
@@ -172,7 +172,7 @@ For example:
|
||||
* `ScyllaDB Java Driver <https://github.com/scylladb/java-driver/tree/3.7.1-scylla/manual/compression>`_
|
||||
* `Go Driver <https://godoc.org/github.com/gocql/gocql#Compressor>`_
|
||||
|
||||
Refer to the :doc:`Drivers Page </using-scylla/drivers/index>` for more drivers.
|
||||
Refer to `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ for more drivers.
|
||||
|
||||
.. _internode-compression:
|
||||
|
||||
|
||||
@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
|
||||
|
||||
nodetool cluster repair --tablet-tokens 1,10474535988
|
||||
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
|
||||
|
||||
For example:
|
||||
|
||||
::
|
||||
|
||||
nodetool cluster repair --incremental-mode regular
|
||||
nodetool cluster repair --incremental-mode disabled
|
||||
|
||||
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.
|
||||
|
||||
|
||||
@@ -110,7 +110,6 @@ To display the log classes (output changes with each version so your display may
|
||||
keys
|
||||
keyspace_utils
|
||||
large_data
|
||||
legacy_schema_migrator
|
||||
lister
|
||||
load_balancer
|
||||
load_broadcaster
|
||||
|
||||
@@ -206,7 +206,7 @@ This is 19% of the latency compared to no batching.
|
||||
Driver Guidelines
|
||||
-----------------
|
||||
|
||||
Use the :doc:`ScyllaDB drivers </using-scylla/drivers/index>` that are available for Java, Python, Go, and C/C++.
|
||||
Use the `ScyllaDB drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ that are available for Java, Python, Go, and C/C++.
|
||||
They provide much better performance than third-party drivers because they are shard aware &emdash; they can route requests to the right CPU core (shard).
|
||||
When the driver starts, it gets the topology of the cluster and therefore it knows exactly which CPU core should get a request.
|
||||
Our latest shard-aware drivers also improve the efficiency of our Change Data Capture (CDC) feature.
|
||||
|
||||
@@ -121,7 +121,7 @@ Driver Compression
|
||||
|
||||
This refers to compressing traffic between the client and ScyllaDB.
|
||||
Verify your client driver is using compressed traffic when connected to ScyllaDB.
|
||||
As compression is driver settings dependent, please check your client driver manual or :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`.
|
||||
As compression is driver settings dependent, please check your client driver manual. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
|
||||
|
||||
|
||||
Connectivity
|
||||
@@ -130,7 +130,7 @@ Connectivity
|
||||
Drivers Settings
|
||||
================
|
||||
|
||||
* Use shard aware drivers wherever possible. :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` (not third-party drivers) are shard aware.
|
||||
* Use shard aware drivers wherever possible. `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ (not third-party drivers) are shard aware.
|
||||
* Configure connection pool - open more connections (>3 per shard) and/Or more clients. See `this blog <https://www.scylladb.com/2019/11/20/maximizing-performance-via-concurrency-while-minimizing-timeouts-in-distributed-databases/>`_.
|
||||
|
||||
Management
|
||||
|
||||
8
docs/poetry.lock
generated
8
docs/poetry.lock
generated
@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
|
||||
|
||||
[[package]]
|
||||
name = "sphinx-scylladb-theme"
|
||||
version = "1.8.9"
|
||||
version = "1.8.10"
|
||||
description = "A Sphinx Theme for ScyllaDB documentation projects"
|
||||
optional = false
|
||||
python-versions = "<4.0,>=3.10"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
|
||||
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
|
||||
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
|
||||
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -1603,4 +1603,4 @@ files = [
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.10"
|
||||
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"
|
||||
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"
|
||||
|
||||
@@ -9,7 +9,7 @@ package-mode = false
|
||||
python = "^3.10"
|
||||
pygments = "^2.18.0"
|
||||
redirects_cli ="^0.1.3"
|
||||
sphinx-scylladb-theme = "^1.8.9"
|
||||
sphinx-scylladb-theme = "^1.8.10"
|
||||
sphinx-sitemap = "^2.6.0"
|
||||
sphinx-autobuild = "^2024.4.19"
|
||||
Sphinx = "^7.3.7"
|
||||
|
||||
@@ -25,8 +25,8 @@ Actions
|
||||
|
||||
If your cluster is having timeouts during overload, check first if you are not making the overload situation worse through retries, and pay attention to the following:
|
||||
|
||||
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
|
||||
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults.
|
||||
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
|
||||
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults.
|
||||
* Make sure the server neither runs speculative retry nor runs it based on percentiles (as those can fluctuate aggressively). Server-side speculative retries are a per-table setting that can be changed with the ALTER TABLE command. See the :ref:`documentation <speculative-retry-options>` for details.
|
||||
|
||||
|
||||
|
||||
@@ -9,9 +9,19 @@ To ensure a successful upgrade, follow
|
||||
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
|
||||
ScyllaDB. This means that:
|
||||
|
||||
* You should perform the upgrades consecutively - to each successive X.Y
|
||||
version, **without skipping any major or minor version**, unless there is
|
||||
a documented upgrade procedure to bypass a version.
|
||||
* You should follow the upgrade policy:
|
||||
|
||||
* Starting with version **2025.4**, upgrades can skip minor versions as long
|
||||
as they remain within the same major version (for example, upgrading directly
|
||||
from 2025.1 → 2025.4 is supported).
|
||||
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
|
||||
each successive X.Y version must be installed in order, **without skipping
|
||||
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
|
||||
is not supported).
|
||||
* You cannot skip major versions. Upgrades must move from one major version to
|
||||
the next using the documented major-version upgrade path.
|
||||
* You should upgrade to a supported version of ScyllaDB.
|
||||
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
|
||||
* Before you upgrade to the next version, the whole cluster (each node) must
|
||||
be upgraded to the previous version.
|
||||
* You cannot perform an upgrade by replacing the nodes in the cluster with new
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 28 KiB |
@@ -1,141 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB CQL Drivers
|
||||
=====================
|
||||
|
||||
.. toctree::
|
||||
:titlesonly:
|
||||
:hidden:
|
||||
|
||||
scylla-python-driver
|
||||
scylla-java-driver
|
||||
scylla-go-driver
|
||||
scylla-gocqlx-driver
|
||||
scylla-cpp-driver
|
||||
scylla-rust-driver
|
||||
|
||||
ScyllaDB Drivers
|
||||
-----------------
|
||||
|
||||
The following ScyllaDB drivers are available:
|
||||
|
||||
* :doc:`Python Driver</using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
* :doc:`Java Driver </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
* :doc:`Go Driver </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
* :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
* :doc:`C++ Driver </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
* `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
* :doc:`Rust Driver </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
* `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
|
||||
We recommend using ScyllaDB drivers. All ScyllaDB drivers are shard-aware and provide additional
|
||||
benefits over third-party drivers.
|
||||
|
||||
ScyllaDB supports the CQL binary protocol version 3, so any Apache Cassandra/CQL driver that implements
|
||||
the same version works with ScyllaDB.
|
||||
|
||||
CDC Integration with ScyllaDB Drivers
|
||||
-------------------------------------------
|
||||
|
||||
The following table specifies which ScyllaDB drivers include a library for
|
||||
:doc:`CDC </features/cdc/cdc-intro>`.
|
||||
|
||||
.. list-table::
|
||||
:widths: 40 60
|
||||
:header-rows: 1
|
||||
|
||||
* - ScyllaDB Driver
|
||||
- CDC Connector
|
||||
* - :doc:`Python </using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
- |x|
|
||||
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
- |v|
|
||||
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
- |v|
|
||||
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
- |x|
|
||||
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
- |x|
|
||||
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
- |x|
|
||||
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
- |v|
|
||||
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
- |x|
|
||||
|
||||
Support for Tablets
|
||||
-------------------------
|
||||
|
||||
The following table specifies which ScyllaDB drivers support
|
||||
:doc:`tablets </architecture/tablets>` and since which version.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 35 35
|
||||
:header-rows: 1
|
||||
|
||||
* - ScyllaDB Driver
|
||||
- Support for Tablets
|
||||
- Since Version
|
||||
* - :doc:`Python</using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
- |v|
|
||||
- 3.26.5
|
||||
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
- |v|
|
||||
- 4.18.0 (Java Driver 4.x)
|
||||
|
||||
3.11.5.2 (Java Driver 3.x)
|
||||
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
- |v|
|
||||
- 1.13.0
|
||||
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
- |x|
|
||||
- N/A
|
||||
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
- |x|
|
||||
- N/A
|
||||
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
- |v|
|
||||
- All versions
|
||||
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
- |v|
|
||||
- 0.13.0
|
||||
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
- |v|
|
||||
- All versions
|
||||
|
||||
Driver Support Policy
|
||||
-------------------------------
|
||||
|
||||
We support the **two most recent minor releases** of our drivers.
|
||||
|
||||
* We test and validate the latest two minor versions.
|
||||
* We typically patch only the latest minor release.
|
||||
|
||||
We recommend staying up to date with the latest supported versions to receive
|
||||
updates and fixes.
|
||||
|
||||
At a minimum, upgrade your driver when upgrading to a new ScyllaDB version
|
||||
to ensure compatibility between the driver and the database.
|
||||
|
||||
Third-party Drivers
|
||||
----------------------
|
||||
|
||||
You can find the third-party driver documentation on the GitHub pages for each driver:
|
||||
|
||||
* `DataStax Java Driver <https://github.com/datastax/java-driver/>`_
|
||||
* `DataStax Python Driver <https://github.com/datastax/python-driver/>`_
|
||||
* `DataStax C# Driver <https://github.com/datastax/csharp-driver/>`_
|
||||
* `DataStax Ruby Driver <https://github.com/datastax/ruby-driver/>`_
|
||||
* `DataStax Node.js Driver <https://github.com/datastax/nodejs-driver/>`_
|
||||
* `DataStax C++ Driver <https://github.com/datastax/cpp-driver/>`_
|
||||
* `DataStax PHP Driver (Supported versions: 7.1) <https://github.com/datastax/php-driver>`_
|
||||
* `He4rt PHP Driver (Supported versions: 8.1 and 8.2) <https://github.com/he4rt/scylladb-php-driver/>`_
|
||||
* `Scala Phantom Project <https://github.com/outworkers/phantom>`_
|
||||
* `Xandra Elixir Driver <https://github.com/lexhide/xandra>`_
|
||||
* `Exandra Elixir Driver <https://github.com/vinniefranco/exandra>`_
|
||||
|
||||
Learn about ScyllaDB Drivers on ScyllaDB University
|
||||
----------------------------------------------------
|
||||
|
||||
The free `Using ScyllaDB Drivers course <https://university.scylladb.com/courses/using-scylla-drivers/>`_
|
||||
on ScyllaDB University covers the use of drivers in multiple languages to interact with a ScyllaDB
|
||||
cluster. The languages covered include Java, CPP, Rust, Golang, Python, Node.JS, Scala, and others.
|
||||
@@ -1,16 +0,0 @@
|
||||
===================
|
||||
ScyllaDB C++ Driver
|
||||
===================
|
||||
|
||||
The ScyllaDB C++ driver is a modern, feature-rich and **shard-aware** C/C++ client library for ScyllaDB using exclusively Cassandra’s binary protocol and Cassandra Query Language v3.
|
||||
This driver is forked from Datastax cpp-driver.
|
||||
|
||||
Read the `documentation <https://cpp-driver.docs.scylladb.com>`_ to get started or visit the Github project `ScyllaDB C++ driver <https://github.com/scylladb/cpp-driver>`_.
|
||||
|
||||
|
||||
More Information
|
||||
----------------
|
||||
|
||||
* `C++ Driver Documentation <https://cpp-driver.docs.scylladb.com>`_
|
||||
* `C/C++ Driver course at ScyllaDB University <https://university.scylladb.com/courses/using-scylla-drivers/lessons/cpp-driver-part-1/>`_
|
||||
* `Blog: A Shard-Aware ScyllaDB C/C++ Driver <https://www.scylladb.com/2021/03/18/a-shard-aware-scylla-c-c-driver/>`_
|
||||
@@ -1,28 +0,0 @@
|
||||
==================
|
||||
ScyllaDB Go Driver
|
||||
==================
|
||||
|
||||
The `ScyllaDB Go driver <https://github.com/scylladb/gocql>`_ is shard aware and contains extensions for a tokenAwareHostPolicy supported by ScyllaDB 2.3 and onwards.
|
||||
It is is a fork of the `GoCQL Driver <https://github.com/gocql/gocql>`_ but has been enhanced with capabilities that take advantage of ScyllaDB's unique architecture.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
The protocol extension spec is `available here <https://github.com/scylladb/scylla/blob/master/docs/dev/protocol-extensions.md>`_.
|
||||
The ScyllaDB Go Driver is a drop-in replacement for gocql.
|
||||
As such, no code changes are needed to use this driver.
|
||||
All you need to do is rebuild using the ``replace`` directive in your ``mod`` file.
|
||||
|
||||
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/gocql>`_.
|
||||
|
||||
|
||||
Using CDC with Go
|
||||
-----------------
|
||||
|
||||
When writing applications, you can now use our `Go Library <https://github.com/scylladb/scylla-cdc-go>`_ to simplify writing applications that read from ScyllaDB CDC.
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Gocql Driver project page on GitHub <https://github.com/scylladb/gocql>`_ - contains the source code as well as a readme and documentation files.
|
||||
* `ScyllaDB University: Golang and ScyllaDB <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-1/>`_
|
||||
A three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Gocql driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Go application.
|
||||
@@ -1,16 +0,0 @@
|
||||
=========================
|
||||
ScyllaDB Gocql Extension
|
||||
=========================
|
||||
|
||||
The ScyllaDB Gocqlx is an extension to gocql that provides usability features.
|
||||
With gocqlx, you can bind the query parameters from maps and structs, use named query parameters (``:identifier``), and scan the query results into structs and slices.
|
||||
The driver includes a fluent and flexible CQL query builder and a database migrations module.
|
||||
|
||||
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Gocqlx Driver project page on GitHub <https://github.com/scylladb/gocqlx>`_ - contains the source code as well as a readme and documentation files.
|
||||
* `ScyllaDB University: Golang and ScyllaDB Part 3 – GoCQLX <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-3-gocqlx/>`_ - part three of the Golang three-part course which focuses on how to create a sample Go application that executes a few basic CQL statements with a ScyllaDB cluster using the GoCQLX package
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB Java Driver
|
||||
=====================
|
||||
|
||||
ScyllaDB Java Driver is forked from `DataStax Java Driver <https://github.com/datastax/java-driver>`_ with enhanced capabilities, taking advantage of ScyllaDB's unique architecture.
|
||||
|
||||
The ScyllaDB Java driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
Use the ScyllaDB Java driver for better compatibility and support for ScyllaDB with Java-based applications.
|
||||
|
||||
Read the `documentation <https://java-driver.docs.scylladb.com/>`_ to get started or visit the `Github project <https://github.com/scylladb/java-driver>`_.
|
||||
|
||||
The driver architecture is based on layers. At the bottom lies the driver core.
|
||||
This core handles everything related to the connections to a ScyllaDB cluster (for example, connection pool, discovering new nodes, etc.) and exposes a simple, relatively low-level API on top of which higher-level layers can be built.
|
||||
|
||||
The ScyllaDB Java Driver is a drop-in replacement for the DataStax Java Driver.
|
||||
As such, no code changes are needed to use this driver.
|
||||
|
||||
Using CDC with Java
|
||||
-------------------
|
||||
|
||||
When writing applications, you can now use our `Java Library <https://github.com/scylladb/scylla-cdc-java>`_ to simplify writing applications that read from ScyllaDB CDC.
|
||||
|
||||
More information
|
||||
----------------
|
||||
* `ScyllaDB Java Driver Docs <https://java-driver.docs.scylladb.com/>`_
|
||||
* `ScyllaDB Java Driver project page on GitHub <https://github.com/scylladb/java-driver/>`_ - Source Code
|
||||
* `ScyllaDB University: Coding with Java <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-java-part-1/>`_ - a three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Java driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Java application.
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
======================
|
||||
ScyllaDB Python Driver
|
||||
======================
|
||||
|
||||
The ScyllaDB Python driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
Read the `documentation <https://python-driver.docs.scylladb.com/>`_ to get started or visit the Github project `ScyllaDB Python driver <https://github.com/scylladb/python-driver/>`_.
|
||||
|
||||
As the ScyllaDB Python Driver is a drop-in replacement for DataStax Python Driver, no code changes are needed to use the driver.
|
||||
Use the ScyllaDB Python driver for better compatibility and support for ScyllaDB with Python-based applications.
|
||||
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Python Driver Documentation <https://python-driver.docs.scylladb.com/>`_
|
||||
* `ScyllaDB Python Driver on GitHub <https://github.com/scylladb/python-driver/>`_
|
||||
* `ScyllaDB University: Coding with Python <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-python/>`_
|
||||
@@ -1,24 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB Rust Driver
|
||||
=====================
|
||||
|
||||
The ScyllaDB Rust driver is a client-side, shard-aware driver written in pure Rust with a fully async API using Tokio.
|
||||
Optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
|
||||
|
||||
|
||||
.. image:: ./images/monster-rust.png
|
||||
:width: 150pt
|
||||
|
||||
|
||||
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/scylla-rust-driver>`_.
|
||||
|
||||
Read the `Documentation <https://rust-driver.docs.scylladb.com>`_.
|
||||
|
||||
Using CDC with Rust
|
||||
----------------------
|
||||
|
||||
When writing applications, you can use ScyllaDB's `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_
|
||||
to simplify writing applications that read from ScyllaDB's CDC.
|
||||
|
||||
Use `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_ to read
|
||||
:doc:`ScyllaDB's CDC </features/cdc/index>` update streams.
|
||||
@@ -1,9 +0,0 @@
|
||||
========================
|
||||
AWS DynamoDB Drivers
|
||||
========================
|
||||
|
||||
|
||||
|
||||
|
||||
ScyllaDB AWS DynamoDB Compatible API can be used with any AWS DynamoDB Driver.
|
||||
For a list of AWS AWS DynamoDB drivers see `here <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.html>`_
|
||||
@@ -1,21 +0,0 @@
|
||||
================
|
||||
ScyllaDB Drivers
|
||||
================
|
||||
|
||||
.. toctree::
|
||||
:titlesonly:
|
||||
:hidden:
|
||||
|
||||
ScyllaDB CQL Drivers <cql-drivers/index>
|
||||
ScyllaDB DynamoDB Drivers <dynamo-drivers/index>
|
||||
|
||||
|
||||
|
||||
You can use ScyllaDB with:
|
||||
|
||||
* :doc:`Apache Cassandra CQL Compatible Drivers <cql-drivers/index>`
|
||||
* :doc:`Amazon DynamoDB Compatible API Drivers <dynamo-drivers/index>`
|
||||
|
||||
Additional drivers coming soon!
|
||||
|
||||
If you are looking for a ScyllaDB Integration Solution or a Connector, refer to :doc:`ScyllaDB Integrations </using-scylla/integrations/index>`.
|
||||
@@ -9,7 +9,7 @@ ScyllaDB for Developers
|
||||
Tutorials and Example Projects <https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html>
|
||||
Learn to Use ScyllaDB <https://docs.scylladb.com/stable/get-started/learn-resources/index.html>
|
||||
ScyllaDB Alternator <alternator/index>
|
||||
ScyllaDB Drivers <drivers/index>
|
||||
ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>
|
||||
|
||||
|
||||
.. panel-box::
|
||||
@@ -26,7 +26,7 @@ ScyllaDB for Developers
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` - ScyllaDB and third-party drivers for CQL and DynamoDB
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ - ScyllaDB and third-party drivers for CQL and DynamoDB
|
||||
* :doc:`ScyllaDB Alternator </using-scylla/alternator/index>` - The Open Source DynamoDB-compatible API
|
||||
* :doc:`CQL Reference </cql/index>` - Reference for the Apache Cassandra Query Language (CQL) and its ScyllaDB extensions
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ ScyllaDB Integrations and Connectors
|
||||
:class: my-panel
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`).
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
|
||||
Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains links to integration projects using ScyllaDB with third-party projects.
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
Integrate ScyllaDB with Databricks
|
||||
==================================
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
|
||||
|
||||
Resource list
|
||||
-------------
|
||||
|
||||
@@ -3,7 +3,7 @@ Integrate ScyllaDB with Elasticsearch
|
||||
=====================================
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains integration projects using ScyllaDB with Elasticsearch. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@ The Jaeger Query service offers a web-based UI and API for users to explore, vis
|
||||
Jaeger also supports integration with other observability tools like Prometheus and Grafana,
|
||||
making it a popular choice for monitoring modern distributed applications.
|
||||
|
||||
Jaeger Server `can also be run <https://github.com/jaegertracing/jaeger/tree/main/plugin/storage/scylladb>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
|
||||
Jaeger Server `can also be run <https://www.jaegertracing.io/docs/2.11/storage/cassandra/#compatible-backends>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
|
||||
As a drop-in replacement for Cassandra, ScyllaDB implements the same protocol and provides a high-performance,
|
||||
low-latency alternative. This compatibility allows Jaeger users to easily switch to ScyllaDB without making significant changes to their setup.
|
||||
|
||||
Using ScyllaDB as the storage backend for Jaeger Server can offer additional benefits,
|
||||
such as improved performance, scalability, and resource efficiency.
|
||||
This makes Jaeger even more effective for monitoring and troubleshooting distributed applications,
|
||||
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
|
||||
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
|
||||
|
||||
@@ -3,7 +3,7 @@ Integrate ScyllaDB with Spark
|
||||
=============================
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains integration projects using ScyllaDB with Spark. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/base64.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
@@ -267,7 +268,6 @@ std::tuple<std::string, std::string> azure_host::impl::parse_key(std::string_vie
|
||||
|
||||
std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std::string_view vault) {
|
||||
static const boost::regex vault_name_re(R"([a-zA-Z0-9-]+)");
|
||||
static const boost::regex vault_endpoint_re(R"((https?)://([^/:]+)(?::(\d+))?)");
|
||||
|
||||
boost::smatch match;
|
||||
std::string tmp{vault};
|
||||
@@ -277,16 +277,12 @@ std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std
|
||||
return {"https", fmt::format(AKV_HOST_TEMPLATE, vault), 443};
|
||||
}
|
||||
|
||||
if (boost::regex_match(tmp, match, vault_endpoint_re)) {
|
||||
std::string scheme = match[1];
|
||||
std::string host = match[2];
|
||||
std::string port_str = match[3];
|
||||
|
||||
unsigned port = (port_str.empty()) ? (scheme == "https" ? 443 : 80) : std::stoi(port_str);
|
||||
return {scheme, host, port};
|
||||
try {
|
||||
auto info = utils::http::parse_simple_url(tmp);
|
||||
return {info.scheme, info.host, info.port};
|
||||
} catch (...) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault)));
|
||||
}
|
||||
|
||||
throw std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault));
|
||||
}
|
||||
|
||||
future<shared_ptr<tls::certificate_credentials>> azure_host::impl::make_creds() {
|
||||
|
||||
@@ -816,6 +816,7 @@ public:
|
||||
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
|
||||
switch (type) {
|
||||
case sstables::component_type::Scylla:
|
||||
case sstables::component_type::TemporaryScylla:
|
||||
case sstables::component_type::TemporaryTOC:
|
||||
case sstables::component_type::TOC:
|
||||
co_return sink;
|
||||
@@ -844,6 +845,7 @@ public:
|
||||
sstables::component_type type,
|
||||
data_source src) override {
|
||||
switch (type) {
|
||||
case sstables::component_type::TemporaryScylla:
|
||||
case sstables::component_type::Scylla:
|
||||
case sstables::component_type::TemporaryTOC:
|
||||
case sstables::component_type::TOC:
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "encryption_exceptions.hh"
|
||||
#include "symmetric_key.hh"
|
||||
#include "utils.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -163,6 +164,8 @@ private:
|
||||
shared_ptr<seastar::tls::certificate_credentials> _creds;
|
||||
std::unordered_map<bytes, shared_ptr<symmetric_key>> _cache;
|
||||
bool _initialized = false;
|
||||
|
||||
abort_source _as;
|
||||
};
|
||||
|
||||
template<typename T, typename C>
|
||||
@@ -251,24 +254,50 @@ future<rjson::value> encryption::gcp_host::impl::gcp_auth_post_with_retry(std::s
|
||||
|
||||
auto& creds = i->second;
|
||||
|
||||
int retries = 0;
|
||||
static constexpr auto max_retries = 10;
|
||||
|
||||
for (;;) {
|
||||
try {
|
||||
co_await creds.refresh(KMS_SCOPE, _certs);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(permission_error("Error refreshing credentials"));
|
||||
exponential_backoff_retry exr(10ms, 10000ms);
|
||||
bool do_backoff = false;
|
||||
bool did_auth_retry = false;
|
||||
|
||||
for (int retry = 0; ; ++retry) {
|
||||
if (std::exchange(do_backoff, false)) {
|
||||
co_await exr.retry(_as);
|
||||
}
|
||||
|
||||
bool refreshing = true;
|
||||
|
||||
try {
|
||||
co_await creds.refresh(KMS_SCOPE, _certs);
|
||||
refreshing = false;
|
||||
|
||||
auto res = co_await send_request(uri, _certs, body, httpd::operation_type::POST, key_values({
|
||||
{ utils::gcp::AUTHORIZATION, utils::gcp::format_bearer(creds.token) },
|
||||
}));
|
||||
}), &_as);
|
||||
co_return res;
|
||||
} catch (httpd::unexpected_status_error& e) {
|
||||
gcp_log.debug("{}: Got unexpected response: {}", uri, e.status());
|
||||
if (e.status() == http::reply::status_type::unauthorized && retries++ < 3) {
|
||||
// refresh access token and retry.
|
||||
switch (e.status()) {
|
||||
default:
|
||||
if (http::reply::classify_status(e.status()) != http::reply::status_class::server_error) {
|
||||
break;
|
||||
}
|
||||
[[fallthrough]];
|
||||
case httpclient::reply_status::request_timeout:
|
||||
if (retry < max_retries) {
|
||||
// service unavailable etc -> backoff + retry
|
||||
do_backoff = true;
|
||||
did_auth_retry = false; // reset this, since we might cause expiration due to backoff (not really, but...)
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (refreshing) {
|
||||
std::throw_with_nested(permission_error("Error refreshing credentials"));
|
||||
}
|
||||
if (e.status() == http::reply::status_type::unauthorized && retry < max_retries && !did_auth_retry) {
|
||||
// refresh access token and retry. no backoff
|
||||
did_auth_retry = true;
|
||||
continue;
|
||||
}
|
||||
if (e.status() == http::reply::status_type::unauthorized) {
|
||||
@@ -322,6 +351,7 @@ future<> encryption::gcp_host::impl::init() {
|
||||
}
|
||||
|
||||
future<> encryption::gcp_host::impl::stop() {
|
||||
_as.request_abort();
|
||||
co_await _attr_cache.stop();
|
||||
co_await _id_cache.stop();
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "marshal_exception.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
@@ -322,17 +323,26 @@ future<> kmip_host::impl::connection::connect() {
|
||||
f = f.then([this, cred] {
|
||||
return cred->set_x509_trust_file(_options.truststore, seastar::tls::x509_crt_format::PEM);
|
||||
});
|
||||
} else {
|
||||
f = f.then([cred] {
|
||||
return cred->set_system_trust();
|
||||
});
|
||||
}
|
||||
return f.then([this, cred] {
|
||||
// TODO, find if we should do hostname verification
|
||||
// TODO: connect all failovers already?
|
||||
|
||||
auto i = _host.find_last_of(':');
|
||||
auto name = _host.substr(0, i);
|
||||
auto port = i != sstring::npos ? std::stoul(_host.substr(i + 1)) : kmip_port;
|
||||
// Use the URL parser to handle ipv6 etc proper.
|
||||
// Turn host arg into a URL.
|
||||
auto info = utils::http::parse_simple_url("kmip://" + _host);
|
||||
auto name = info.host;
|
||||
auto port = info.port != 80 ? info.port : kmip_port;
|
||||
|
||||
return seastar::net::dns::resolve_name(name).then([this, cred, port](seastar::net::inet_address addr) {
|
||||
return seastar::tls::connect(cred, seastar::ipv4_addr{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
|
||||
return seastar::net::dns::resolve_name(name).then([this, cred, port, name](seastar::net::inet_address addr) {
|
||||
kmip_log.debug("Try connect {}:{}", addr, port);
|
||||
// TODO: should we verify non-numeric hosts here? (opts.server_name)
|
||||
// Adding this might break existing users with half-baked certs.
|
||||
return seastar::tls::connect(cred, seastar::socket_address{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
|
||||
kmip_log.debug("Successfully connected {}", _host);
|
||||
// #998 Set keepalive to try avoiding connection going stale in between commands.
|
||||
s.set_keepalive_parameters(net::tcp_keepalive_params{60s, 60s, 10});
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/rjson.hh"
|
||||
@@ -151,15 +152,10 @@ public:
|
||||
{
|
||||
// check if we have an explicit endpoint set.
|
||||
if (!_options.endpoint.empty()) {
|
||||
static std::regex simple_url(R"foo((https?):\/\/(?:([\w\.]+)|\[([\w:]+)\]):?(\d+)?\/?)foo");
|
||||
std::transform(_options.endpoint.begin(), _options.endpoint.end(), _options.endpoint.begin(), ::tolower);
|
||||
std::smatch m;
|
||||
if (!std::regex_match(_options.endpoint, m, simple_url)) {
|
||||
throw std::invalid_argument(fmt::format("Could not parse URL: {}", _options.endpoint));
|
||||
}
|
||||
_options.https = m[1].str() == "https";
|
||||
_options.host = m[2].length() > 0 ? m[2].str() : m[3].str();
|
||||
_options.port = m[4].length() > 0 ? std::stoi(m[4].str()) : 0;
|
||||
auto info = utils::http::parse_simple_url(_options.endpoint);
|
||||
_options.https = info.is_https();
|
||||
_options.host = info.host;
|
||||
_options.port = info.port;
|
||||
}
|
||||
if (_options.endpoint.empty() && _options.host.empty() && _options.aws_region.empty() && !_options.aws_use_ec2_region) {
|
||||
throw std::invalid_argument("No AWS region or endpoint specified");
|
||||
|
||||
@@ -129,6 +129,6 @@ struct direct_fd_ping_reply {
|
||||
std::variant<std::monostate, service::wrong_destination, service::group_liveness_info> result;
|
||||
};
|
||||
|
||||
verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
|
||||
verb [[with_client_info, with_timeout, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -38,6 +38,7 @@ debian_base_packages=(
|
||||
python3-aiohttp
|
||||
python3-pyparsing
|
||||
python3-colorama
|
||||
python3-dev
|
||||
python3-tabulate
|
||||
python3-pytest
|
||||
python3-pytest-asyncio
|
||||
@@ -55,6 +56,7 @@ debian_base_packages=(
|
||||
librapidxml-dev
|
||||
libcrypto++-dev
|
||||
libxxhash-dev
|
||||
zlib1g-dev
|
||||
slapd
|
||||
ldap-utils
|
||||
libcpp-jwt-dev
|
||||
@@ -64,6 +66,7 @@ debian_base_packages=(
|
||||
git-lfs
|
||||
e2fsprogs
|
||||
fuse3
|
||||
libev-dev # for python driver
|
||||
)
|
||||
|
||||
fedora_packages=(
|
||||
@@ -89,6 +92,7 @@ fedora_packages=(
|
||||
patchelf
|
||||
python3
|
||||
python3-aiohttp
|
||||
python3-devel
|
||||
python3-pip
|
||||
python3-file-magic
|
||||
python3-colorama
|
||||
@@ -117,6 +121,7 @@ fedora_packages=(
|
||||
makeself
|
||||
libzstd-static libzstd-devel
|
||||
lz4-static lz4-devel
|
||||
zlib-ng-compat-devel
|
||||
rpm-build
|
||||
devscripts
|
||||
debhelper
|
||||
@@ -152,6 +157,8 @@ fedora_packages=(
|
||||
https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
|
||||
elfutils
|
||||
jq
|
||||
|
||||
libev-devel # for python driver
|
||||
)
|
||||
|
||||
fedora_python3_packages=(
|
||||
|
||||
13
install.sh
13
install.sh
@@ -157,6 +157,7 @@ adjust_bin() {
|
||||
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
|
||||
export LD_LIBRARY_PATH="$prefix/libreloc"
|
||||
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
|
||||
${p11_trust_paths:+export SCYLLA_P11_TRUST_PATHS="$p11_trust_paths"}
|
||||
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
|
||||
EOF
|
||||
chmod 755 "$root/$prefix/bin/$bin"
|
||||
@@ -330,7 +331,6 @@ if ! $nonroot; then
|
||||
rsysconfdir=$(realpath -m "$root/$sysconfdir")
|
||||
rusr=$(realpath -m "$root/usr")
|
||||
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
|
||||
rshare="$rprefix/share"
|
||||
rdoc="$rprefix/share/doc"
|
||||
rdata=$(realpath -m "$root/var/lib/scylla")
|
||||
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
|
||||
@@ -338,7 +338,6 @@ else
|
||||
retc="$rprefix/etc"
|
||||
rsysconfdir="$rprefix/$sysconfdir"
|
||||
rsystemd="$HOME/.config/systemd/user"
|
||||
rshare="$rprefix/share"
|
||||
rdoc="$rprefix/share/doc"
|
||||
rdata="$rprefix"
|
||||
fi
|
||||
@@ -522,16 +521,6 @@ PRODUCT="$product"
|
||||
EOS
|
||||
chmod 644 "$rprefix"/scripts/scylla_product.py
|
||||
|
||||
install -d -m755 "$rshare"/p11-kit/modules
|
||||
cat << EOS > "$rshare"/p11-kit/modules/p11-kit-trust.module
|
||||
module: $prefix/libreloc/pkcs11/p11-kit-trust.so
|
||||
priority: 1
|
||||
trust-policy: yes
|
||||
x-trust-lookup: pkcs11:library-description=PKCS%2311%20Kit%20Trust%20Module
|
||||
disable-in: p11-kit-proxy
|
||||
x-init-reserved: paths=$p11_trust_paths
|
||||
EOS
|
||||
|
||||
if ! $nonroot && ! $without_systemd; then
|
||||
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
|
||||
install -m644 dist/common/systemd/scylla-server.service.d/dependencies.conf -Dt "$retc"/systemd/system/scylla-server.service.d
|
||||
|
||||
@@ -200,7 +200,10 @@ enum class tablet_repair_incremental_mode : uint8_t {
|
||||
disabled,
|
||||
};
|
||||
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
|
||||
// FIXME: Incremental repair is disabled by default due to
|
||||
// https://github.com/scylladb/scylladb/issues/26041 and
|
||||
// https://github.com/scylladb/scylladb/issues/27414
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
|
||||
|
||||
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
|
||||
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);
|
||||
|
||||
61
main.cc
61
main.cc
@@ -10,6 +10,8 @@
|
||||
#include <functional>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include <gnutls/pkcs11.h>
|
||||
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "db/view/view_building_worker.hh"
|
||||
@@ -37,7 +39,6 @@
|
||||
#include "api/api_init.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "db/legacy_schema_migrator.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
@@ -118,15 +119,11 @@
|
||||
#include "message/dictionary_service.hh"
|
||||
#include "sstable_dict_autotrainer.hh"
|
||||
#include "utils/disk_space_monitor.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "tools/utils.hh"
|
||||
|
||||
|
||||
#define P11_KIT_FUTURE_UNSTABLE_API
|
||||
extern "C" {
|
||||
#include <p11-kit/p11-kit.h>
|
||||
}
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
#include <seastar/core/metrics_api.hh>
|
||||
#include <seastar/core/relabel_config.hh>
|
||||
@@ -708,14 +705,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
print_starting_message(ac, av, parsed_opts);
|
||||
}
|
||||
|
||||
// We have to override p11-kit config path before p11-kit initialization.
|
||||
// And the initialization will invoke on seastar initialization, so it has to
|
||||
// be before app.run()
|
||||
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe"));
|
||||
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
|
||||
auto p11_modules_str = p11_modules.string<char>();
|
||||
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
|
||||
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
sharded<locator::effective_replication_map_factory> erm_factory;
|
||||
sharded<service::migration_notifier> mm_notifier;
|
||||
@@ -727,6 +716,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
|
||||
service::load_meter load_meter;
|
||||
sharded<service::storage_proxy> proxy;
|
||||
sharded<auth::cache> auth_cache;
|
||||
sharded<service::storage_service> ss;
|
||||
sharded<service::migration_manager> mm;
|
||||
sharded<tasks::task_manager> task_manager;
|
||||
@@ -789,7 +779,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
|
||||
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
|
||||
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
|
||||
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
|
||||
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
|
||||
&hashing_worker, &vector_store_client] {
|
||||
try {
|
||||
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
|
||||
@@ -1650,7 +1640,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
fd.start(
|
||||
std::ref(fd_pinger), std::ref(fd_clock),
|
||||
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
|
||||
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count()).get();
|
||||
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count(), dbcfg.gossip_scheduling_group).get();
|
||||
|
||||
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
|
||||
fd.stop().get();
|
||||
@@ -1802,6 +1792,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting auth cache");
|
||||
auth_cache.start(std::ref(qp)).get();
|
||||
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [&] {
|
||||
auth_cache.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing storage service");
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
@@ -1810,6 +1806,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(view_building_worker), std::ref(qp), std::ref(sl_controller),
|
||||
std::ref(auth_cache),
|
||||
std::ref(tsm), std::ref(vbsm), std::ref(task_manager), std::ref(gossip_address_map),
|
||||
compression_dict_updated_callback,
|
||||
only_on_shard0(&*disk_space_monitor_shard0)
|
||||
@@ -1825,11 +1822,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.stop().get();
|
||||
});
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing query processor remote part");
|
||||
// TODO: do this together with proxy.start_remote(...)
|
||||
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
|
||||
@@ -1858,8 +1850,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
group0_client.init().get();
|
||||
|
||||
checkpoint(stop_signal, "initializing system schema");
|
||||
// schema migration, if needed, is also done on shard 0
|
||||
db::legacy_schema_migrator::migrate(proxy, db, sys_ks, qp.local()).get();
|
||||
db::schema_tables::save_system_schema(qp.local()).get();
|
||||
db::schema_tables::recalculate_schema_version(sys_ks, proxy, feature_service.local()).get();
|
||||
|
||||
@@ -2070,7 +2060,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
|
||||
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
|
||||
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(hashing_worker)).get();
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
|
||||
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
|
||||
|
||||
@@ -2184,6 +2174,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// This will also disable migration manager schema pulls if needed.
|
||||
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return messaging.invoke_on_all([&] (auto& ms) {
|
||||
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
|
||||
@@ -2341,7 +2336,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(hashing_worker)).get();
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
|
||||
std::any stop_auth_service;
|
||||
// Has to be called after node joined the cluster (join_cluster())
|
||||
@@ -2687,13 +2682,15 @@ int main(int ac, char** av) {
|
||||
// #3583 - need to potentially ensure this for tools as well, since at least
|
||||
// sstable* might need crypto libraries.
|
||||
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe")); // could just be argv[0] I guess...
|
||||
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
|
||||
// Note: must be in scope for application lifetime. p11_kit_override_system_files does _not_
|
||||
// copy input strings.
|
||||
auto p11_modules_str = p11_modules.string<char>();
|
||||
// #3392 only do this if we are actually packaged and the path exists.
|
||||
if (fs::exists(p11_modules)) {
|
||||
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
|
||||
auto p11_trust_paths_from_env = std::getenv("SCYLLA_P11_TRUST_PATHS");
|
||||
auto trust_module_path = scylla_path.parent_path().parent_path().append("libreloc/pkcs11/p11-kit-trust.so");
|
||||
if (fs::exists(trust_module_path) && p11_trust_paths_from_env) {
|
||||
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL, nullptr);
|
||||
auto trust_config = fmt::format("p11-kit:paths={} trusted=yes", p11_trust_paths_from_env);
|
||||
auto ret = gnutls_pkcs11_add_provider(trust_module_path.string().c_str(), trust_config.c_str());
|
||||
if (ret != GNUTLS_E_SUCCESS) {
|
||||
startlog.warn("Could not initialize p11-kit trust module: {}\n", gnutls_strerror(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return main_func(ac, av);
|
||||
|
||||
@@ -686,6 +686,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::RAFT_MODIFY_CONFIG:
|
||||
case messaging_verb::RAFT_PULL_SNAPSHOT:
|
||||
case messaging_verb::NOTIFY_BANNED:
|
||||
case messaging_verb::DIRECT_FD_PING:
|
||||
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
|
||||
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
|
||||
// only verbs which are 'rare' and 'cheap'.
|
||||
@@ -747,7 +748,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::PAXOS_ACCEPT:
|
||||
case messaging_verb::PAXOS_LEARN:
|
||||
case messaging_verb::PAXOS_PRUNE:
|
||||
case messaging_verb::DIRECT_FD_PING:
|
||||
return 2;
|
||||
case messaging_verb::MUTATION_DONE:
|
||||
case messaging_verb::MUTATION_FAILED:
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user