Compare commits
196 Commits
copilot/cl
...
migration-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edda66886e | ||
|
|
3e09d3cc97 | ||
|
|
3ec4f67407 | ||
|
|
a41d7a9240 | ||
|
|
30d6f3b8e0 | ||
|
|
09fde82a33 | ||
|
|
4a307d931a | ||
|
|
97b1340a68 | ||
|
|
fe9237fdc9 | ||
|
|
c4c2f87be7 | ||
|
|
c45244b235 | ||
|
|
f5ed3e9fea | ||
|
|
35c9a00275 | ||
|
|
c493a66668 | ||
|
|
4e984139b2 | ||
|
|
c681b3363d | ||
|
|
927aebef37 | ||
|
|
d5009882c6 | ||
|
|
1f7a65904e | ||
|
|
29d090845a | ||
|
|
0bc95bcf87 | ||
|
|
57e7a4fa4f | ||
|
|
c89957b725 | ||
|
|
0aa881f190 | ||
|
|
6b3ce5fdcc | ||
|
|
cb6ee05391 | ||
|
|
67045b5f17 | ||
|
|
e11450ccca | ||
|
|
e0cc6ca7e6 | ||
|
|
7d2e6c0170 | ||
|
|
d6557764b9 | ||
|
|
dc9093303d | ||
|
|
91df129e21 | ||
|
|
5e90fbb9d2 | ||
|
|
49a3e0914d | ||
|
|
d9fc3b1c11 | ||
|
|
0f014e2916 | ||
|
|
0d82e56078 | ||
|
|
24040efc54 | ||
|
|
9e0f5410ae | ||
|
|
e503340efc | ||
|
|
4281d18c2e | ||
|
|
5c1e525618 | ||
|
|
1f0f694c9e | ||
|
|
843da2bbb8 | ||
|
|
925d86fefc | ||
|
|
59a876cebb | ||
|
|
1f170d2566 | ||
|
|
a5d611866e | ||
|
|
493ebe2add | ||
|
|
ccf90cfde8 | ||
|
|
989566e8a3 | ||
|
|
b94fd11939 | ||
|
|
801bf82a34 | ||
|
|
7d111f2396 | ||
|
|
4413142f25 | ||
|
|
4902186ede | ||
|
|
4eee5bc273 | ||
|
|
a8350b274e | ||
|
|
dc4896b69b | ||
|
|
6dd74be684 | ||
|
|
53f93eb830 | ||
|
|
cab3e1eea5 | ||
|
|
9015bed794 | ||
|
|
998ee5b7fb | ||
|
|
6b0d757f28 | ||
|
|
18b5a49b0c | ||
|
|
cf4133c62d | ||
|
|
ea8a661119 | ||
|
|
8962093d90 | ||
|
|
ecb6fb00f0 | ||
|
|
09d3b6c98b | ||
|
|
d16f9c821d | ||
|
|
4a7e20953a | ||
|
|
7a3f51b304 | ||
|
|
d6226500f6 | ||
|
|
376c70be75 | ||
|
|
41dcf12463 | ||
|
|
b268eda67e | ||
|
|
c5945b1ef4 | ||
|
|
262a8cef0b | ||
|
|
f27ef79d0d | ||
|
|
e62cb29b7d | ||
|
|
91e2b027ce | ||
|
|
58d3052ad4 | ||
|
|
84e9b94503 | ||
|
|
f54a4010c0 | ||
|
|
a53f989d2f | ||
|
|
c7dda5500c | ||
|
|
fc2aecea69 | ||
|
|
2be5ee9f9d | ||
|
|
3fe596d556 | ||
|
|
f0dbf6135d | ||
|
|
7dc371f312 | ||
|
|
761ace4f05 | ||
|
|
e7ec87382e | ||
|
|
8ecd4d73ac | ||
|
|
5e369c0439 | ||
|
|
874322f95e | ||
|
|
e07fe2536e | ||
|
|
6084f250ae | ||
|
|
345458c2d8 | ||
|
|
97fe3f2a2c | ||
|
|
3ca6b59f80 | ||
|
|
70b3cd0540 | ||
|
|
36347c3ce9 | ||
|
|
4cde34f6f2 | ||
|
|
7977c97694 | ||
|
|
be8a30230b | ||
|
|
2e4d0e42f0 | ||
|
|
8953a143e5 | ||
|
|
d2c266eb47 | ||
|
|
99e8a92aef | ||
|
|
807da53583 | ||
|
|
e01041d3ee | ||
|
|
ce57ef94bd | ||
|
|
2ccb8ff666 | ||
|
|
b52a3f3a43 | ||
|
|
324b829263 | ||
|
|
bca17290f4 | ||
|
|
e347f6d0d4 | ||
|
|
24b037e8e3 | ||
|
|
b8e91ee6ae | ||
|
|
dd0fc35c63 | ||
|
|
3e270a49f7 | ||
|
|
823d1b9c03 | ||
|
|
6f5f42305a | ||
|
|
829bd9b598 | ||
|
|
b7bc48e7b7 | ||
|
|
d86d5b33aa | ||
|
|
19efd7f6f9 | ||
|
|
65cba0c3e7 | ||
|
|
c8811387e1 | ||
|
|
7d637b14e8 | ||
|
|
1136a3f398 | ||
|
|
3e0362ec67 | ||
|
|
3e138a2685 | ||
|
|
3b0df29ceb | ||
|
|
85140cdf7e | ||
|
|
5c93e12373 | ||
|
|
478b8f09df | ||
|
|
e082e32cc7 | ||
|
|
baea12c9cb | ||
|
|
1b784e98f3 | ||
|
|
2d954f4b19 | ||
|
|
d9e1a6006f | ||
|
|
bbd293d440 | ||
|
|
576ebcdd30 | ||
|
|
629d6d98fa | ||
|
|
7446eb7e8d | ||
|
|
091ed4d54b | ||
|
|
a009644c7d | ||
|
|
e38ee160fc | ||
|
|
1c2e47e059 | ||
|
|
a37b1ce832 | ||
|
|
77bd00bf9f | ||
|
|
a24c3fc229 | ||
|
|
d3ee82ea51 | ||
|
|
34d28475d9 | ||
|
|
eb76858369 | ||
|
|
6eca7e4ff6 | ||
|
|
8aca7b0eb9 | ||
|
|
1e09a34686 | ||
|
|
0aebc17c4c | ||
|
|
ad2381923f | ||
|
|
504290902c | ||
|
|
3ed8701301 | ||
|
|
580dfd63e5 | ||
|
|
eb7be9010d | ||
|
|
383f9e6e56 | ||
|
|
67d3454d2b | ||
|
|
a54be82536 | ||
|
|
2a38794b8e | ||
|
|
304e908e3b | ||
|
|
a84d1361db | ||
|
|
3fb7719277 | ||
|
|
2fd6ca4c46 | ||
|
|
7fa1f87355 | ||
|
|
1e37781d86 | ||
|
|
d5ec66bc0c | ||
|
|
5b4aa4b6a6 | ||
|
|
76b2d0f961 | ||
|
|
4ec7a064a9 | ||
|
|
da17e8b18b | ||
|
|
8dc20e6aaf | ||
|
|
7a04dd2d22 | ||
|
|
d910e6ea63 | ||
|
|
e5dee2aab8 | ||
|
|
2b7aa3211d | ||
|
|
663831ebd7 | ||
|
|
c4c5ed5aba | ||
|
|
5e6935f276 | ||
|
|
6936704677 | ||
|
|
c8098e07c9 | ||
|
|
c6d1c63ddb | ||
|
|
83c1103917 |
@@ -17,6 +17,7 @@
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/tombstone.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "utils/log.hh"
|
||||
@@ -1875,23 +1876,34 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
utils::chunked_vector<mutation> schema_mutations;
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
if (rs->uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
|
||||
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
|
||||
if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
// Creating an index in tablets mode requires the keyspace to be RF-rack-valid.
|
||||
// GSI and LSI indexes are based on materialized views which require RF-rack-validity to avoid consistency issues.
|
||||
if (!view_builders.empty() || _proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, _proxy.local_db().get_token_metadata_ptr(), *rs);
|
||||
} catch (const std::invalid_argument& ex) {
|
||||
if (!view_builders.empty()) {
|
||||
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes and LocalSecondaryIndexes on a table "
|
||||
"using tablets require the number of racks in the cluster to be either 1 or 3"));
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Cannot create table '{}' with tablets: the configuration "
|
||||
"option 'rf_rack_valid_keyspaces' is enabled, which enforces that tables using tablets can only be created in clusters "
|
||||
"that have either 1 or 3 racks", table_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
|
||||
@@ -2114,9 +2126,12 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation(fmt::format(
|
||||
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
|
||||
}
|
||||
if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
|
||||
!p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, p.local().local_db().get_token_metadata_ptr(),
|
||||
p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy());
|
||||
} catch (const std::invalid_argument& ex) {
|
||||
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes on a table "
|
||||
"using tablets require the number of racks in the cluster to be either 1 or 3"));
|
||||
}
|
||||
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
|
||||
@@ -3051,7 +3051,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -2016,12 +2016,14 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("cf"), ",");
|
||||
auto sfopt = req->get_query_param("sf");
|
||||
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
};
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
try {
|
||||
if (column_families.empty()) {
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
@@ -2029,7 +2031,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
|
||||
}
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
@@ -2064,7 +2066,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
db::snapshot_options opts = {.skip_flush = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
}
|
||||
|
||||
compaction::compaction_stats stats;
|
||||
|
||||
@@ -146,7 +146,8 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
db::snapshot_options opts = {.skip_flush = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
|
||||
@@ -53,10 +53,10 @@ static std::string json_escape(std::string_view str) {
|
||||
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::syslog_send_helper(const sstring& msg) {
|
||||
future<> audit_syslog_storage_helper::syslog_send_helper(temporary_buffer<char> msg) {
|
||||
try {
|
||||
auto lock = co_await get_units(_semaphore, 1, std::chrono::hours(1));
|
||||
co_await _sender.send(_syslog_address, net::packet{msg.data(), msg.size()});
|
||||
co_await _sender.send(_syslog_address, std::span(&msg, 1));
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
auto error_msg = seastar::format(
|
||||
@@ -90,7 +90,7 @@ future<> audit_syslog_storage_helper::start(const db::config& cfg) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await syslog_send_helper("Initializing syslog audit backend.");
|
||||
co_await syslog_send_helper(temporary_buffer<char>::copy_of("Initializing syslog audit backend."));
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::stop() {
|
||||
@@ -120,7 +120,7 @@ future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
|
||||
audit_info->table(),
|
||||
username);
|
||||
|
||||
co_await syslog_send_helper(msg);
|
||||
co_await syslog_send_helper(std::move(msg).release());
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::write_login(const sstring& username,
|
||||
@@ -139,7 +139,7 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
|
||||
client_ip,
|
||||
username);
|
||||
|
||||
co_await syslog_send_helper(msg.c_str());
|
||||
co_await syslog_send_helper(std::move(msg).release());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ class audit_syslog_storage_helper : public storage_helper {
|
||||
net::datagram_channel _sender;
|
||||
seastar::semaphore _semaphore;
|
||||
|
||||
future<> syslog_send_helper(const sstring& msg);
|
||||
future<> syslog_send_helper(seastar::temporary_buffer<char> msg);
|
||||
public:
|
||||
explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&);
|
||||
virtual ~audit_syslog_storage_helper();
|
||||
|
||||
@@ -876,22 +876,6 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
continue; // some tables might not have been created if they were not used
|
||||
}
|
||||
|
||||
// use longer than usual timeout as we scan the whole table
|
||||
// but not infinite or very long as we want to fail reasonably fast
|
||||
const auto t = 5min;
|
||||
const timeout_config tc{t, t, t, t, t, t, t};
|
||||
::service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
::service::query_state qs(cs, empty_service_permit());
|
||||
|
||||
auto rows = co_await qp.execute_internal(
|
||||
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
|
||||
db::consistency_level::ALL,
|
||||
qs,
|
||||
{},
|
||||
cql3::query_processor::cache_internal::no);
|
||||
if (rows->empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<sstring> col_names;
|
||||
for (const auto& col : schema->all_columns()) {
|
||||
col_names.push_back(col.name_as_cql_string());
|
||||
@@ -900,30 +884,51 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
for (size_t i = 1; i < col_names.size(); ++i) {
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
for (const auto& row : *rows) {
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col : schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(
|
||||
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
|
||||
std::vector<mutation> collected;
|
||||
// use longer than usual timeout as we scan the whole table
|
||||
// but not infinite or very long as we want to fail reasonably fast
|
||||
const auto t = 5min;
|
||||
const timeout_config tc{t, t, t, t, t, t, t};
|
||||
::service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
::service::query_state qs(cs, empty_service_permit());
|
||||
|
||||
co_await qp.query_internal(
|
||||
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
|
||||
db::consistency_level::ALL,
|
||||
{},
|
||||
1000,
|
||||
[&qp, &cf_name, &col_names, &val_binders_str, &schema, ts, &collected] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col : schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(
|
||||
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
}
|
||||
}
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_keyspace::NAME,
|
||||
cf_name,
|
||||
fmt::join(col_names, ", "),
|
||||
val_binders_str),
|
||||
internal_distributed_query_state(),
|
||||
ts,
|
||||
std::move(values));
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(log,
|
||||
format("expecting single insert mutation, got {}", muts.size()));
|
||||
}
|
||||
co_yield std::move(muts[0]);
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_keyspace::NAME,
|
||||
cf_name,
|
||||
fmt::join(col_names, ", "),
|
||||
val_binders_str),
|
||||
internal_distributed_query_state(),
|
||||
ts,
|
||||
std::move(values));
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(log,
|
||||
format("expecting single insert mutation, got {}", muts.size()));
|
||||
}
|
||||
|
||||
collected.push_back(std::move(muts[0]));
|
||||
co_return stop_iteration::no;
|
||||
},
|
||||
std::move(qs));
|
||||
|
||||
for (auto& m : collected) {
|
||||
co_yield std::move(m);
|
||||
}
|
||||
}
|
||||
co_yield co_await sys_ks.make_auth_version_mutation(ts,
|
||||
|
||||
18
configure.py
18
configure.py
@@ -725,7 +725,9 @@ raft_tests = set([
|
||||
vector_search_tests = set([
|
||||
'test/vector_search/vector_store_client_test',
|
||||
'test/vector_search/load_balancer_test',
|
||||
'test/vector_search/client_test'
|
||||
'test/vector_search/client_test',
|
||||
'test/vector_search/filter_test',
|
||||
'test/vector_search/rescoring_test'
|
||||
])
|
||||
|
||||
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
|
||||
@@ -1034,6 +1036,9 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/functions/aggregate_fcts.cc',
|
||||
'cql3/functions/castas_fcts.cc',
|
||||
'cql3/functions/error_injection_fcts.cc',
|
||||
'cql3/statements/strong_consistency/modification_statement.cc',
|
||||
'cql3/statements/strong_consistency/select_statement.cc',
|
||||
'cql3/statements/strong_consistency/statement_helpers.cc',
|
||||
'cql3/functions/vector_similarity_fcts.cc',
|
||||
'cql3/statements/cf_prop_defs.cc',
|
||||
'cql3/statements/cf_statement.cc',
|
||||
@@ -1059,8 +1064,8 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/statements/raw/parsed_statement.cc',
|
||||
'cql3/statements/property_definitions.cc',
|
||||
'cql3/statements/update_statement.cc',
|
||||
'cql3/statements/strongly_consistent_modification_statement.cc',
|
||||
'cql3/statements/strongly_consistent_select_statement.cc',
|
||||
'cql3/statements/broadcast_modification_statement.cc',
|
||||
'cql3/statements/broadcast_select_statement.cc',
|
||||
'cql3/statements/delete_statement.cc',
|
||||
'cql3/statements/prune_materialized_view_statement.cc',
|
||||
'cql3/statements/batch_statement.cc',
|
||||
@@ -1351,6 +1356,9 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'lang/wasm.cc',
|
||||
'lang/wasm_alien_thread_runner.cc',
|
||||
'lang/wasm_instance_cache.cc',
|
||||
'service/strong_consistency/groups_manager.cc',
|
||||
'service/strong_consistency/coordinator.cc',
|
||||
'service/strong_consistency/state_machine.cc',
|
||||
'service/raft/group0_state_id_handler.cc',
|
||||
'service/raft/group0_state_machine.cc',
|
||||
'service/raft/group0_state_machine_merger.cc',
|
||||
@@ -1380,6 +1388,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'vector_search/dns.cc',
|
||||
'vector_search/client.cc',
|
||||
'vector_search/clients.cc',
|
||||
'vector_search/filter.cc',
|
||||
'vector_search/truststore.cc'
|
||||
] + [Antlr3Grammar('cql3/Cql.g')] \
|
||||
+ scylla_raft_core
|
||||
@@ -1489,6 +1498,7 @@ idls = ['idl/gossip_digest.idl.hh',
|
||||
'idl/hinted_handoff.idl.hh',
|
||||
'idl/storage_proxy.idl.hh',
|
||||
'idl/sstables.idl.hh',
|
||||
'idl/strong_consistency/state_machine.idl.hh',
|
||||
'idl/group0_state_machine.idl.hh',
|
||||
'idl/mapreduce_request.idl.hh',
|
||||
'idl/replica_exception.idl.hh',
|
||||
@@ -1784,6 +1794,8 @@ deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
|
||||
deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/filter_test'] = ['test/vector_search/filter_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/rescoring_test'] = ['test/vector_search/rescoring_test.cc'] + scylla_tests_dependencies
|
||||
|
||||
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
|
||||
|
||||
|
||||
@@ -47,6 +47,9 @@ target_sources(cql3
|
||||
functions/aggregate_fcts.cc
|
||||
functions/castas_fcts.cc
|
||||
functions/error_injection_fcts.cc
|
||||
statements/strong_consistency/select_statement.cc
|
||||
statements/strong_consistency/modification_statement.cc
|
||||
statements/strong_consistency/statement_helpers.cc
|
||||
functions/vector_similarity_fcts.cc
|
||||
statements/cf_prop_defs.cc
|
||||
statements/cf_statement.cc
|
||||
@@ -72,8 +75,8 @@ target_sources(cql3
|
||||
statements/raw/parsed_statement.cc
|
||||
statements/property_definitions.cc
|
||||
statements/update_statement.cc
|
||||
statements/strongly_consistent_modification_statement.cc
|
||||
statements/strongly_consistent_select_statement.cc
|
||||
statements/broadcast_modification_statement.cc
|
||||
statements/broadcast_select_statement.cc
|
||||
statements/delete_statement.cc
|
||||
statements/prune_materialized_view_statement.cc
|
||||
statements/batch_statement.cc
|
||||
|
||||
@@ -48,8 +48,10 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono
|
||||
|
||||
struct query_processor::remote {
|
||||
remote(service::migration_manager& mm, service::mapreduce_service& fwd,
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client)
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client,
|
||||
service::strong_consistency::coordinator& _sc_coordinator)
|
||||
: mm(mm), mapreducer(fwd), ss(ss), group0_client(group0_client)
|
||||
, sc_coordinator(_sc_coordinator)
|
||||
, gate("query_processor::remote")
|
||||
{}
|
||||
|
||||
@@ -57,6 +59,7 @@ struct query_processor::remote {
|
||||
service::mapreduce_service& mapreducer;
|
||||
service::storage_service& ss;
|
||||
service::raft_group0_client& group0_client;
|
||||
service::strong_consistency::coordinator& sc_coordinator;
|
||||
|
||||
seastar::named_gate gate;
|
||||
};
|
||||
@@ -514,9 +517,16 @@ query_processor::~query_processor() {
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
||||
query_processor::acquire_strongly_consistent_coordinator() {
|
||||
auto [remote_, holder] = remote();
|
||||
return {remote_.get().sc_coordinator, std::move(holder)};
|
||||
}
|
||||
|
||||
void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer,
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client) {
|
||||
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client);
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client,
|
||||
service::strong_consistency::coordinator& sc_coordinator) {
|
||||
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client, sc_coordinator);
|
||||
}
|
||||
|
||||
future<> query_processor::stop_remote() {
|
||||
@@ -860,6 +870,7 @@ struct internal_query_state {
|
||||
sstring query_string;
|
||||
std::unique_ptr<query_options> opts;
|
||||
statements::prepared_statement::checked_weak_ptr p;
|
||||
std::optional<service::query_state> qs;
|
||||
bool more_results = true;
|
||||
};
|
||||
|
||||
@@ -867,10 +878,14 @@ internal_query_state query_processor::create_paged_state(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const data_value_list& values,
|
||||
int32_t page_size) {
|
||||
int32_t page_size,
|
||||
std::optional<service::query_state> qs) {
|
||||
auto p = prepare_internal(query_string);
|
||||
auto opts = make_internal_options(p, values, cl, page_size);
|
||||
return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), true};
|
||||
if (!qs) {
|
||||
qs.emplace(query_state_for_internal_call());
|
||||
}
|
||||
return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), std::move(qs), true};
|
||||
}
|
||||
|
||||
bool query_processor::has_more_results(cql3::internal_query_state& state) const {
|
||||
@@ -893,9 +908,8 @@ future<> query_processor::for_each_cql_result(
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_paged_internal(internal_query_state& state) {
|
||||
state.p->statement->validate(*this, service::client_state::for_internal_calls());
|
||||
auto qs = query_state_for_internal_call();
|
||||
::shared_ptr<cql_transport::messages::result_message> msg =
|
||||
co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
|
||||
co_await state.p->statement->execute(*this, *state.qs, *state.opts, std::nullopt);
|
||||
|
||||
class visitor : public result_message::visitor_base {
|
||||
internal_query_state& _state;
|
||||
@@ -1202,8 +1216,9 @@ future<> query_processor::query_internal(
|
||||
db::consistency_level cl,
|
||||
const data_value_list& values,
|
||||
int32_t page_size,
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f) {
|
||||
auto query_state = create_paged_state(query_string, cl, values, page_size);
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
|
||||
std::optional<service::query_state> qs) {
|
||||
auto query_state = create_paged_state(query_string, cl, values, page_size, std::move(qs));
|
||||
co_return co_await for_each_cql_result(query_state, std::move(f));
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,10 @@ class query_state;
|
||||
class mapreduce_service;
|
||||
class raft_group0_client;
|
||||
|
||||
namespace strong_consistency {
|
||||
class coordinator;
|
||||
}
|
||||
|
||||
namespace broadcast_tables {
|
||||
struct query;
|
||||
}
|
||||
@@ -155,7 +159,8 @@ public:
|
||||
~query_processor();
|
||||
|
||||
void start_remote(service::migration_manager&, service::mapreduce_service&,
|
||||
service::storage_service& ss, service::raft_group0_client&);
|
||||
service::storage_service& ss, service::raft_group0_client&,
|
||||
service::strong_consistency::coordinator&);
|
||||
future<> stop_remote();
|
||||
|
||||
data_dictionary::database db() {
|
||||
@@ -174,6 +179,9 @@ public:
|
||||
return _proxy;
|
||||
}
|
||||
|
||||
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
||||
acquire_strongly_consistent_coordinator();
|
||||
|
||||
cql_stats& get_cql_stats() {
|
||||
return _cql_stats;
|
||||
}
|
||||
@@ -322,6 +330,7 @@ public:
|
||||
* page_size - maximum page size
|
||||
* f - a function to be run on each row of the query result,
|
||||
* if the function returns stop_iteration::yes the iteration will stop
|
||||
* qs - optional query state (default: std::nullopt)
|
||||
*
|
||||
* \note This function is optimized for convenience, not performance.
|
||||
*/
|
||||
@@ -330,7 +339,8 @@ public:
|
||||
db::consistency_level cl,
|
||||
const data_value_list& values,
|
||||
int32_t page_size,
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
|
||||
std::optional<service::query_state> qs = std::nullopt);
|
||||
|
||||
/*
|
||||
* \brief iterate over all cql results using paging
|
||||
@@ -499,7 +509,8 @@ private:
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
const data_value_list& values,
|
||||
int32_t page_size);
|
||||
int32_t page_size,
|
||||
std::optional<service::query_state> qs = std::nullopt);
|
||||
|
||||
/*!
|
||||
* \brief run a query using paging
|
||||
|
||||
@@ -46,6 +46,13 @@ void metadata::add_non_serialized_column(lw_shared_ptr<column_specification> nam
|
||||
_column_info->_names.emplace_back(std::move(name));
|
||||
}
|
||||
|
||||
void metadata::hide_last_column() {
|
||||
if (_column_info->_column_count == 0) {
|
||||
utils::on_internal_error("Trying to hide a column when there are no columns visible.");
|
||||
}
|
||||
_column_info->_column_count--;
|
||||
}
|
||||
|
||||
void metadata::set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state) {
|
||||
_flags.set<flag::HAS_MORE_PAGES>();
|
||||
_paging_state = std::move(paging_state);
|
||||
|
||||
@@ -73,6 +73,7 @@ public:
|
||||
uint32_t value_count() const;
|
||||
|
||||
void add_non_serialized_column(lw_shared_ptr<column_specification> name);
|
||||
void hide_last_column();
|
||||
|
||||
public:
|
||||
void set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state);
|
||||
|
||||
@@ -225,10 +225,9 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// The second hyphen is not really true because currently topological changes can
|
||||
// disturb it (see scylladb/scylladb#23345), but we ignore that.
|
||||
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const std::invalid_argument& e) {
|
||||
if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) {
|
||||
// There's no guarantee what the type of the exception will be, so we need to
|
||||
// wrap it manually here in a type that can be passed to the user.
|
||||
// wrap the exception manually here in a type that can be passed to the user.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
} else {
|
||||
// Even when RF-rack-validity is not enforced for the keyspace, we'd
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
*/
|
||||
|
||||
|
||||
#include "cql3/statements/strongly_consistent_modification_statement.hh"
|
||||
#include "cql3/statements/broadcast_modification_statement.hh"
|
||||
|
||||
#include <optional>
|
||||
|
||||
@@ -28,11 +28,11 @@
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
static logging::logger logger("strongly_consistent_modification_statement");
|
||||
static logging::logger logger("broadcast_modification_statement");
|
||||
|
||||
namespace statements {
|
||||
|
||||
strongly_consistent_modification_statement::strongly_consistent_modification_statement(
|
||||
broadcast_modification_statement::broadcast_modification_statement(
|
||||
uint32_t bound_terms,
|
||||
schema_ptr schema,
|
||||
broadcast_tables::prepared_update query)
|
||||
@@ -43,7 +43,7 @@ strongly_consistent_modification_statement::strongly_consistent_modification_sta
|
||||
{ }
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
broadcast_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
@@ -63,7 +63,7 @@ evaluate_prepared(
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
broadcast_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
|
||||
}
|
||||
@@ -103,11 +103,11 @@ strongly_consistent_modification_statement::execute_without_checking_exception_m
|
||||
), result);
|
||||
}
|
||||
|
||||
uint32_t strongly_consistent_modification_statement::get_bound_terms() const {
|
||||
uint32_t broadcast_modification_statement::get_bound_terms() const {
|
||||
return _bound_terms;
|
||||
}
|
||||
|
||||
future<> strongly_consistent_modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
future<> broadcast_modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
auto f = state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::MODIFY);
|
||||
if (_query.value_condition.has_value()) {
|
||||
f = f.then([this, &state] {
|
||||
@@ -117,7 +117,7 @@ future<> strongly_consistent_modification_statement::check_access(query_processo
|
||||
return f;
|
||||
}
|
||||
|
||||
bool strongly_consistent_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
bool broadcast_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return _schema->ks_name() == ks_name && (!cf_name || _schema->cf_name() == *cf_name);
|
||||
}
|
||||
|
||||
@@ -27,13 +27,13 @@ struct prepared_update {
|
||||
|
||||
}
|
||||
|
||||
class strongly_consistent_modification_statement : public cql_statement_opt_metadata {
|
||||
class broadcast_modification_statement : public cql_statement_opt_metadata {
|
||||
const uint32_t _bound_terms;
|
||||
const schema_ptr _schema;
|
||||
const broadcast_tables::prepared_update _query;
|
||||
|
||||
public:
|
||||
strongly_consistent_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query);
|
||||
broadcast_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query);
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
@@ -9,7 +9,7 @@
|
||||
*/
|
||||
|
||||
|
||||
#include "cql3/statements/strongly_consistent_select_statement.hh"
|
||||
#include "cql3/statements/broadcast_select_statement.hh"
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
@@ -24,7 +24,7 @@ namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
|
||||
static logging::logger logger("strongly_consistent_select_statement");
|
||||
static logging::logger logger("broadcast_select_statement");
|
||||
|
||||
static
|
||||
expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) {
|
||||
@@ -58,7 +58,7 @@ bool is_selecting_only_value(const cql3::selection::selection& selection) {
|
||||
selection.get_columns()[0]->name() == "value";
|
||||
}
|
||||
|
||||
strongly_consistent_select_statement::strongly_consistent_select_statement(schema_ptr schema, uint32_t bound_terms,
|
||||
broadcast_select_statement::broadcast_select_statement(schema_ptr schema, uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
@@ -73,7 +73,7 @@ strongly_consistent_select_statement::strongly_consistent_select_statement(schem
|
||||
_query{prepare_query()}
|
||||
{ }
|
||||
|
||||
broadcast_tables::prepared_select strongly_consistent_select_statement::prepare_query() const {
|
||||
broadcast_tables::prepared_select broadcast_select_statement::prepare_query() const {
|
||||
if (!is_selecting_only_value(*_selection)) {
|
||||
throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed");
|
||||
}
|
||||
@@ -94,7 +94,7 @@ evaluate_prepared(
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
broadcast_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
|
||||
}
|
||||
@@ -25,12 +25,12 @@ struct prepared_select {
|
||||
|
||||
}
|
||||
|
||||
class strongly_consistent_select_statement : public select_statement {
|
||||
class broadcast_select_statement : public select_statement {
|
||||
const broadcast_tables::prepared_select _query;
|
||||
|
||||
broadcast_tables::prepared_select prepare_query() const;
|
||||
public:
|
||||
strongly_consistent_select_statement(schema_ptr schema,
|
||||
broadcast_select_statement(schema_ptr schema,
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
@@ -123,10 +123,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
|
||||
// We hold a group0_guard, so it's correct to check this here.
|
||||
// The topology or schema cannot change while we're performing this query.
|
||||
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const std::invalid_argument& e) {
|
||||
if (replica::database::enforce_rf_rack_validity_for_keyspace(cfg, *ksm)) {
|
||||
// There's no guarantee what the type of the exception will be, so we need to
|
||||
// wrap it manually here in a type that can be passed to the user.
|
||||
// wrap the exception in a type that can be passed to the user.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
} else {
|
||||
// Even when RF-rack-validity is not enforced for the keyspace, we'd
|
||||
|
||||
@@ -31,8 +31,6 @@
|
||||
#include "db/config.hh"
|
||||
#include "compaction/time_window_compaction_strategy.hh"
|
||||
|
||||
bool is_internal_keyspace(std::string_view name);
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
@@ -124,10 +122,6 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
|
||||
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
|
||||
#endif
|
||||
|
||||
if (!_properties->get_compression_options() && !is_internal_keyspace(keyspace())) {
|
||||
builder.set_compressor_params(db.get_config().sstable_compression_user_table_options());
|
||||
}
|
||||
|
||||
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
|
||||
}
|
||||
|
||||
|
||||
@@ -98,6 +98,7 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
const sstring& strategy_class,
|
||||
const locator::token_metadata& tm,
|
||||
bool rf_rack_valid_keyspaces,
|
||||
bool enforce_rack_list,
|
||||
locator::replication_strategy_config_options options,
|
||||
const locator::replication_strategy_config_options& old_options,
|
||||
bool rack_list_enabled,
|
||||
@@ -107,7 +108,7 @@ static locator::replication_strategy_config_options prepare_options(
|
||||
auto is_nts = locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) == "org.apache.cassandra.locator.NetworkTopologyStrategy";
|
||||
auto is_alter = !old_options.empty();
|
||||
const auto& all_dcs = tm.get_datacenter_racks_token_owners();
|
||||
auto auto_expand_racks = uses_tablets && rf_rack_valid_keyspaces && rack_list_enabled;
|
||||
auto auto_expand_racks = uses_tablets && rack_list_enabled && (rf_rack_valid_keyspaces || enforce_rack_list);
|
||||
|
||||
logger.debug("prepare_options: {}: is_nts={} auto_expand_racks={} rack_list_enabled={} old_options={} new_options={} all_dcs={}",
|
||||
strategy_class, is_nts, auto_expand_racks, rack_list_enabled, old_options, options, all_dcs);
|
||||
@@ -417,7 +418,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
|
||||
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
|
||||
bool uses_tablets = initial_tablets.has_value();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
|
||||
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), cfg.enforce_rack_list(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
@@ -434,7 +435,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
auto sc = get_replication_strategy_class();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
if (sc) {
|
||||
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
|
||||
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), cfg.enforce_rack_list(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
|
||||
} else {
|
||||
sc = old->strategy_name();
|
||||
options = old_options;
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "cql3/statements/strongly_consistent_modification_statement.hh"
|
||||
#include "cql3/statements/broadcast_modification_statement.hh"
|
||||
#include "cql3/statements/raw/modification_statement.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/expr/expr-utils.hh"
|
||||
@@ -29,6 +29,8 @@
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
#include "cql3/statements/strong_consistency/modification_statement.hh"
|
||||
#include "cql3/statements/strong_consistency/statement_helpers.hh"
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
@@ -546,7 +548,7 @@ modification_statement::process_where_clause(data_dictionary::database db, expr:
|
||||
}
|
||||
}
|
||||
|
||||
::shared_ptr<strongly_consistent_modification_statement>
|
||||
::shared_ptr<broadcast_modification_statement>
|
||||
modification_statement::prepare_for_broadcast_tables() const {
|
||||
// FIXME: implement for every type of `modification_statement`.
|
||||
throw service::broadcast_tables::unsupported_operation_error{};
|
||||
@@ -554,24 +556,27 @@ modification_statement::prepare_for_broadcast_tables() const {
|
||||
|
||||
namespace raw {
|
||||
|
||||
::shared_ptr<cql_statement_opt_metadata>
|
||||
modification_statement::prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) {
|
||||
::shared_ptr<cql3::statements::modification_statement> statement = prepare(db, ctx, stats);
|
||||
|
||||
if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
|
||||
return statement->prepare_for_broadcast_tables();
|
||||
} else {
|
||||
return statement;
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<prepared_statement>
|
||||
modification_statement::prepare(data_dictionary::database db, cql_stats& stats) {
|
||||
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
|
||||
auto meta = get_prepare_context();
|
||||
auto statement = prepare_statement(db, meta, stats);
|
||||
|
||||
auto statement = std::invoke([&] -> shared_ptr<cql_statement> {
|
||||
auto result = prepare(db, meta, stats);
|
||||
|
||||
if (strong_consistency::is_strongly_consistent(db, schema->ks_name())) {
|
||||
return ::make_shared<strong_consistency::modification_statement>(std::move(result));
|
||||
}
|
||||
|
||||
if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
|
||||
return result->prepare_for_broadcast_tables();
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
auto partition_key_bind_indices = meta.get_partition_key_bind_indexes(*schema);
|
||||
return std::make_unique<prepared_statement>(audit_info(), std::move(statement), meta, std::move(partition_key_bind_indices));
|
||||
return std::make_unique<prepared_statement>(audit_info(), std::move(statement), meta,
|
||||
std::move(partition_key_bind_indices));
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::modification_statement>
|
||||
|
||||
@@ -30,7 +30,7 @@ class operation;
|
||||
|
||||
namespace statements {
|
||||
|
||||
class strongly_consistent_modification_statement;
|
||||
class broadcast_modification_statement;
|
||||
|
||||
namespace raw { class modification_statement; }
|
||||
|
||||
@@ -113,15 +113,15 @@ public:
|
||||
|
||||
virtual void add_update_for_key(mutation& m, const query::clustering_range& range, const update_parameters& params, const json_cache_opt& json_cache) const = 0;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
virtual const sstring& keyspace() const;
|
||||
const sstring& keyspace() const;
|
||||
|
||||
virtual const sstring& column_family() const;
|
||||
const sstring& column_family() const;
|
||||
|
||||
virtual bool is_counter() const;
|
||||
bool is_counter() const;
|
||||
|
||||
virtual bool is_view() const;
|
||||
bool is_view() const;
|
||||
|
||||
int64_t get_timestamp(int64_t now, const query_options& options) const;
|
||||
|
||||
@@ -129,12 +129,12 @@ public:
|
||||
|
||||
std::optional<gc_clock::duration> get_time_to_live(const query_options& options) const;
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
// Validate before execute, using client state and current schema
|
||||
void validate(query_processor&, const service::client_state& state) const override;
|
||||
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
void add_operation(::shared_ptr<operation> op);
|
||||
|
||||
@@ -256,7 +256,9 @@ public:
|
||||
|
||||
virtual json_cache_opt maybe_prepare_json_cache(const query_options& options) const;
|
||||
|
||||
virtual ::shared_ptr<strongly_consistent_modification_statement> prepare_for_broadcast_tables() const;
|
||||
virtual ::shared_ptr<broadcast_modification_statement> prepare_for_broadcast_tables() const;
|
||||
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
|
||||
protected:
|
||||
/**
|
||||
@@ -264,9 +266,7 @@ protected:
|
||||
* processed to check that they are compatible.
|
||||
* @throws InvalidRequestException
|
||||
*/
|
||||
virtual void validate_where_clause_for_conditions() const;
|
||||
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
void validate_where_clause_for_conditions() const;
|
||||
|
||||
friend class raw::modification_statement;
|
||||
};
|
||||
|
||||
@@ -40,7 +40,6 @@ protected:
|
||||
|
||||
public:
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
::shared_ptr<cql_statement_opt_metadata> prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats);
|
||||
::shared_ptr<cql3::statements::modification_statement> prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const;
|
||||
void add_raw(sstring&& raw) { _raw_cql = std::move(raw); }
|
||||
const sstring& get_raw_cql() const { return _raw_cql; }
|
||||
|
||||
@@ -131,8 +131,6 @@ private:
|
||||
|
||||
void verify_ordering_is_valid(const prepared_orderings_type&, const schema&, const restrictions::statement_restrictions& restrictions) const;
|
||||
|
||||
prepared_ann_ordering_type prepare_ann_ordering(const schema& schema, prepare_context& ctx, data_dictionary::database db) const;
|
||||
|
||||
// Checks whether this ordering reverses all results.
|
||||
// We only allow leaving select results unchanged or reversing them.
|
||||
bool is_ordering_reversed(const prepared_orderings_type&) const;
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include "cql3/statements/strong_consistency/select_statement.hh"
|
||||
#include "cql3/statements/strong_consistency/statement_helpers.hh"
|
||||
#include "cql3/statements/select_statement.hh"
|
||||
#include "cql3/expr/expression.hh"
|
||||
#include "cql3/expr/evaluate.hh"
|
||||
@@ -16,7 +18,7 @@
|
||||
#include "cql3/statements/raw/select_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/statements/prune_materialized_view_statement.hh"
|
||||
#include "cql3/statements/strongly_consistent_select_statement.hh"
|
||||
#include "cql3/statements/broadcast_select_statement.hh"
|
||||
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -25,12 +27,14 @@
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
#include "service/qos/qos_common.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/functions/as_json_function.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "cql3/restrictions/statement_restrictions.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
#include "types/vector.hh"
|
||||
#include "vector_search/filter.hh"
|
||||
#include "validation.hh"
|
||||
#include "exceptions/unrecognized_entity_exception.hh"
|
||||
#include <optional>
|
||||
@@ -368,8 +372,9 @@ uint64_t select_statement::get_inner_loop_limit(uint64_t limit, bool is_aggregat
|
||||
}
|
||||
|
||||
bool select_statement::needs_post_query_ordering() const {
|
||||
// We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
|
||||
return _restrictions->key_is_in_relation() && !_parameters->orderings().empty();
|
||||
// We need post-query ordering for queries with IN on the partition key and an ORDER BY
|
||||
// and ANN index queries with rescoring.
|
||||
return static_cast<bool>(_ordering_comparator);
|
||||
}
|
||||
|
||||
struct select_statement_executor {
|
||||
@@ -1958,14 +1963,46 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
}));
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::select_statement> vector_indexed_table_select_statement::prepare(data_dictionary::database db, schema_ptr schema,
|
||||
uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, std::unique_ptr<attributes> attrs) {
|
||||
struct ann_ordering_info {
|
||||
secondary_index::index _index;
|
||||
raw::select_statement::prepared_ann_ordering_type _prepared_ann_ordering;
|
||||
bool is_rescoring_enabled;
|
||||
};
|
||||
|
||||
static std::optional<ann_ordering_info> get_ann_ordering_info(
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
lw_shared_ptr<const raw::select_statement::parameters> parameters,
|
||||
prepare_context& ctx) {
|
||||
|
||||
if (parameters->orderings().empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto [column_id, ordering] = parameters->orderings().front();
|
||||
const auto& ann_vector = std::get_if<raw::select_statement::ann_vector>(&ordering);
|
||||
if (!ann_vector) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
::shared_ptr<column_identifier> column = column_id->prepare_column_identifier(*schema);
|
||||
const column_definition* def = schema->get_column_definition(column->name());
|
||||
if (!def) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
fmt::format("Undefined column name {}", column->text()));
|
||||
}
|
||||
|
||||
if (!def->type->is_vector() || static_cast<const vector_type_impl*>(def->type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering is only supported on float vector indexes");
|
||||
}
|
||||
|
||||
auto e = expr::prepare_expression(*ann_vector, db, schema->ks_name(), nullptr, def->column_specification);
|
||||
expr::fill_prepare_context(e, ctx);
|
||||
|
||||
raw::select_statement::prepared_ann_ordering_type prepared_ann_ordering = std::make_pair(std::move(def), std::move(e));
|
||||
|
||||
auto cf = db.find_column_family(schema);
|
||||
auto& sim = cf.get_index_manager();
|
||||
auto [index_opt, _] = restrictions->find_idx(sim);
|
||||
|
||||
auto indexes = sim.list_indexes();
|
||||
auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) {
|
||||
@@ -1977,27 +2014,90 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
if (it == indexes.end()) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
|
||||
}
|
||||
|
||||
index_opt = *it;
|
||||
|
||||
if (!index_opt) {
|
||||
throw std::runtime_error("No index found.");
|
||||
return ann_ordering_info{
|
||||
*it,
|
||||
std::move(prepared_ann_ordering),
|
||||
secondary_index::vector_index::is_rescoring_enabled(it->metadata().options())
|
||||
};
|
||||
}
|
||||
|
||||
static uint32_t add_similarity_function_to_selectors(
|
||||
std::vector<selection::prepared_selector>& prepared_selectors,
|
||||
const ann_ordering_info& ann_ordering_info,
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema) {
|
||||
auto similarity_function_name = secondary_index::vector_index::get_cql_similarity_function_name(ann_ordering_info._index.metadata().options());
|
||||
// Create the function name
|
||||
auto func_name = functions::function_name::native_function(sstring(similarity_function_name));
|
||||
|
||||
// Create the function arguments
|
||||
std::vector<expr::expression> args;
|
||||
args.push_back(expr::column_value(ann_ordering_info._prepared_ann_ordering.first));
|
||||
args.push_back(ann_ordering_info._prepared_ann_ordering.second);
|
||||
|
||||
// Get the function object
|
||||
std::vector<shared_ptr<assignment_testable>> provided_args;
|
||||
provided_args.push_back(expr::as_assignment_testable(args[0], expr::type_of(args[0])));
|
||||
provided_args.push_back(expr::as_assignment_testable(args[1], expr::type_of(args[1])));
|
||||
|
||||
auto func = cql3::functions::instance().get(db, schema->ks_name(), func_name, provided_args, schema->ks_name(), schema->cf_name(), nullptr);
|
||||
|
||||
// Create the function call expression
|
||||
expr::function_call similarity_func_call{
|
||||
.func = func,
|
||||
.args = std::move(args),
|
||||
};
|
||||
|
||||
// Add the similarity function as a prepared selector (last)
|
||||
prepared_selectors.push_back(selection::prepared_selector{
|
||||
.expr = std::move(similarity_func_call),
|
||||
.alias = nullptr,
|
||||
});
|
||||
return prepared_selectors.size() - 1;
|
||||
}
|
||||
|
||||
static select_statement::ordering_comparator_type get_similarity_ordering_comparator(std::vector<selection::prepared_selector>& prepared_selectors, uint32_t similarity_column_index) {
|
||||
auto type = expr::type_of(prepared_selectors[similarity_column_index].expr);
|
||||
if (type->get_kind() != abstract_type::kind::float_kind) {
|
||||
seastar::on_internal_error(logger, "Similarity function must return float type.");
|
||||
}
|
||||
return [similarity_column_index, type] (const raw::select_statement::result_row_type& r1, const raw::select_statement::result_row_type& r2) {
|
||||
auto& c1 = r1[similarity_column_index];
|
||||
auto& c2 = r2[similarity_column_index];
|
||||
auto f1 = c1 ? value_cast<float>(type->deserialize(*c1)) : std::numeric_limits<float>::quiet_NaN();
|
||||
auto f2 = c2 ? value_cast<float>(type->deserialize(*c2)) : std::numeric_limits<float>::quiet_NaN();
|
||||
if (std::isfinite(f1) && std::isfinite(f2)) {
|
||||
return f1 > f2;
|
||||
}
|
||||
return std::isfinite(f1);
|
||||
};
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::select_statement> vector_indexed_table_select_statement::prepare(data_dictionary::database db, schema_ptr schema,
|
||||
uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<attributes> attrs) {
|
||||
|
||||
auto prepared_filter = vector_search::prepare_filter(*restrictions, parameters->allow_filtering());
|
||||
|
||||
return ::make_shared<cql3::statements::vector_indexed_table_select_statement>(schema, bound_terms, parameters, std::move(selection), std::move(restrictions),
|
||||
std::move(group_by_cell_indices), is_reversed, std::move(ordering_comparator), std::move(prepared_ann_ordering), std::move(limit),
|
||||
std::move(per_partition_limit), stats, *index_opt, std::move(attrs));
|
||||
std::move(per_partition_limit), stats, index, std::move(prepared_filter), std::move(attrs));
|
||||
}
|
||||
|
||||
vector_indexed_table_select_statement::vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator,
|
||||
prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<attributes> attrs)
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index,
|
||||
vector_search::prepared_filter prepared_filter, std::unique_ptr<attributes> attrs)
|
||||
: select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, limit,
|
||||
per_partition_limit, stats, std::move(attrs)}
|
||||
, _index{index}
|
||||
, _prepared_ann_ordering(std::move(prepared_ann_ordering)) {
|
||||
, _prepared_ann_ordering(std::move(prepared_ann_ordering))
|
||||
, _prepared_filter(std::move(prepared_filter)) {
|
||||
|
||||
if (!limit.has_value()) {
|
||||
throw exceptions::invalid_request_exception("Vector ANN queries must have a limit specified");
|
||||
@@ -2032,13 +2132,19 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
|
||||
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
auto aoe = abort_on_expiry(timeout);
|
||||
auto filter_json = _prepared_filter.to_json(options);
|
||||
uint64_t fetch = static_cast<uint64_t>(std::ceil(limit * secondary_index::vector_index::get_oversampling(_index.metadata().options())));
|
||||
auto pkeys = co_await qp.vector_store_client().ann(
|
||||
_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, aoe.abort_source());
|
||||
_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), fetch, filter_json, aoe.abort_source());
|
||||
if (!pkeys.has_value()) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error())));
|
||||
}
|
||||
|
||||
if (pkeys->size() > limit && !secondary_index::vector_index::is_rescoring_enabled(_index.metadata().options())) {
|
||||
pkeys->erase(pkeys->begin() + limit, pkeys->end());
|
||||
}
|
||||
|
||||
co_return co_await query_base_table(qp, state, options, pkeys.value(), timeout);
|
||||
});
|
||||
|
||||
@@ -2055,11 +2161,11 @@ void vector_indexed_table_select_statement::update_stats() const {
|
||||
}
|
||||
|
||||
lw_shared_ptr<query::read_command> vector_indexed_table_select_statement::prepare_command_for_base_query(
|
||||
query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
query_processor& qp, service::query_state& state, const query_options& options, uint64_t fetch_limit) const {
|
||||
auto slice = make_partition_slice(options);
|
||||
return ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(slice), qp.proxy().get_max_result_size(slice),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate())), query::partition_limit(query::max_partitions),
|
||||
query::row_limit(get_inner_loop_limit(fetch_limit, _selection->is_aggregate())), query::partition_limit(query::max_partitions),
|
||||
_query_start_time_point, tracing::make_trace_info(state.get_trace_state()), query_id::create_null_id(), query::is_first_page::no,
|
||||
options.get_timestamp(state));
|
||||
}
|
||||
@@ -2077,7 +2183,7 @@ std::vector<float> vector_indexed_table_select_statement::get_ann_ordering_vecto
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(query_processor& qp,
|
||||
service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys,
|
||||
lowres_clock::time_point timeout) const {
|
||||
auto command = prepare_command_for_base_query(qp, state, options);
|
||||
auto command = prepare_command_for_base_query(qp, state, options, pkeys.size());
|
||||
|
||||
// For tables without clustering columns, we can optimize by querying
|
||||
// partition ranges instead of individual primary keys, since the
|
||||
@@ -2116,6 +2222,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
|
||||
query::result_merger{command->get_row_limit(), query::max_partitions});
|
||||
|
||||
co_return co_await wrap_result_to_error_message([this, &command, &options](auto result) {
|
||||
command->set_row_limit(get_limit(options, _limit));
|
||||
return process_results(std::move(result), command, options, _query_start_time_point);
|
||||
})(std::move(result));
|
||||
}
|
||||
@@ -2129,6 +2236,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
|
||||
std::nullopt)
|
||||
.then(wrap_result_to_error_message([this, &options, command](service::storage_proxy::coordinator_query_result qr) {
|
||||
command->set_row_limit(get_limit(options, _limit));
|
||||
return this->process_results(std::move(qr.query_result), command, options, _query_start_time_point);
|
||||
}));
|
||||
}
|
||||
@@ -2223,32 +2331,41 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
|
||||
prepared_selectors = maybe_jsonize_select_clause(std::move(prepared_selectors), db, schema);
|
||||
|
||||
auto aggregation_depth = 0u;
|
||||
std::optional<ann_ordering_info> ann_ordering_info_opt = get_ann_ordering_info(db, schema, _parameters, ctx);
|
||||
bool is_ann_query = ann_ordering_info_opt.has_value();
|
||||
|
||||
// Force aggregation if GROUP BY is used. This will wrap every column x as first(x).
|
||||
if (!_group_by_columns.empty()) {
|
||||
aggregation_depth = std::max(aggregation_depth, 1u);
|
||||
if (prepared_selectors.empty()) {
|
||||
// We have a "SELECT * GROUP BY". If we leave prepared_selectors
|
||||
// empty, below we choose selection::wildcard() for SELECT *, and
|
||||
// forget to do the "levellize" trick needed for the GROUP BY.
|
||||
// So we need to set prepared_selectors. See #16531.
|
||||
auto all_columns = selection::selection::wildcard_columns(schema);
|
||||
std::vector<::shared_ptr<selection::raw_selector>> select_all;
|
||||
select_all.reserve(all_columns.size());
|
||||
for (const column_definition *cdef : all_columns) {
|
||||
auto name = ::make_shared<cql3::column_identifier::raw>(cdef->name_as_text(), true);
|
||||
select_all.push_back(::make_shared<selection::raw_selector>(
|
||||
expr::unresolved_identifier(std::move(name)), nullptr));
|
||||
}
|
||||
prepared_selectors = selection::raw_selector::to_prepared_selectors(select_all, *schema, db, keyspace());
|
||||
if (prepared_selectors.empty() && (!_group_by_columns.empty() || (is_ann_query && ann_ordering_info_opt->is_rescoring_enabled))) {
|
||||
// We have a "SELECT * GROUP BY" or "SELECT * ORDER BY ANN" with rescoring enabled. If we leave prepared_selectors
|
||||
// empty, below we choose selection::wildcard() for SELECT *, and either:
|
||||
// - forget to do the "levellize" trick needed for the GROUP BY. See #16531.
|
||||
// - forget to add the similarity function needed for ORDER BY ANN with rescoring. See below.
|
||||
// So we need to set prepared_selectors.
|
||||
auto all_columns = selection::selection::wildcard_columns(schema);
|
||||
std::vector<::shared_ptr<selection::raw_selector>> select_all;
|
||||
select_all.reserve(all_columns.size());
|
||||
for (const column_definition *cdef : all_columns) {
|
||||
auto name = ::make_shared<cql3::column_identifier::raw>(cdef->name_as_text(), true);
|
||||
select_all.push_back(::make_shared<selection::raw_selector>(
|
||||
expr::unresolved_identifier(std::move(name)), nullptr));
|
||||
}
|
||||
prepared_selectors = selection::raw_selector::to_prepared_selectors(select_all, *schema, db, keyspace());
|
||||
}
|
||||
|
||||
for (auto& ps : prepared_selectors) {
|
||||
expr::fill_prepare_context(ps.expr, ctx);
|
||||
}
|
||||
|
||||
// Force aggregation if GROUP BY is used. This will wrap every column x as first(x).
|
||||
auto aggregation_depth = _group_by_columns.empty() ? 0u : 1u;
|
||||
|
||||
select_statement::ordering_comparator_type ordering_comparator;
|
||||
bool hide_last_column = false;
|
||||
if (is_ann_query && ann_ordering_info_opt->is_rescoring_enabled) {
|
||||
uint32_t similarity_column_index = add_similarity_function_to_selectors(prepared_selectors, *ann_ordering_info_opt, db, schema);
|
||||
hide_last_column = true;
|
||||
ordering_comparator = get_similarity_ordering_comparator(prepared_selectors, similarity_column_index);
|
||||
}
|
||||
|
||||
for (auto& ps : prepared_selectors) {
|
||||
aggregation_depth = std::max(aggregation_depth, expr::aggregation_depth(ps.expr));
|
||||
}
|
||||
@@ -2266,6 +2383,11 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
? selection::selection::wildcard(schema)
|
||||
: selection::selection::from_selectors(db, schema, keyspace(), levellized_prepared_selectors);
|
||||
|
||||
if (is_ann_query && hide_last_column) {
|
||||
// Hide the similarity selector from the client by reducing column_count
|
||||
selection->get_result_metadata()->hide_last_column();
|
||||
}
|
||||
|
||||
// Cassandra 5.0.2 disallows PER PARTITION LIMIT with aggregate queries
|
||||
// but only if GROUP BY is not used.
|
||||
// See #9879 for more details.
|
||||
@@ -2273,8 +2395,6 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
throw exceptions::invalid_request_exception("PER PARTITION LIMIT is not allowed with aggregate queries.");
|
||||
}
|
||||
|
||||
bool is_ann_query = !_parameters->orderings().empty() && std::holds_alternative<select_statement::ann_vector>(_parameters->orderings().front().second);
|
||||
|
||||
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering() || is_ann_query,
|
||||
restrictions::check_indexes(!_parameters->is_mutation_fragments()));
|
||||
|
||||
@@ -2282,19 +2402,14 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
validate_distinct_selection(*schema, *selection, *restrictions);
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type ordering_comparator;
|
||||
bool is_reversed_ = false;
|
||||
|
||||
std::optional<prepared_ann_ordering_type> prepared_ann_ordering;
|
||||
|
||||
auto orderings = _parameters->orderings();
|
||||
|
||||
if (!orderings.empty()) {
|
||||
if (!orderings.empty() && !is_ann_query) {
|
||||
std::visit([&](auto&& ordering) {
|
||||
using T = std::decay_t<decltype(ordering)>;
|
||||
if constexpr (std::is_same_v<T, select_statement::ann_vector>) {
|
||||
prepared_ann_ordering = prepare_ann_ordering(*schema, ctx, db);
|
||||
} else {
|
||||
if constexpr (!std::is_same_v<T, select_statement::ann_vector>) {
|
||||
SCYLLA_ASSERT(!for_view);
|
||||
verify_ordering_is_allowed(*_parameters, *restrictions);
|
||||
prepared_orderings_type prepared_orderings = prepare_orderings(*schema);
|
||||
@@ -2307,7 +2422,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
}
|
||||
|
||||
std::vector<sstring> warnings;
|
||||
if (!prepared_ann_ordering.has_value()) {
|
||||
if (!is_ann_query) {
|
||||
check_needs_filtering(*restrictions, db.get_config().strict_allow_filtering(), warnings);
|
||||
ensure_filtering_columns_retrieval(db, *selection, *restrictions);
|
||||
}
|
||||
@@ -2361,7 +2476,21 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
&& restrictions->partition_key_restrictions_size() == schema->partition_key_size());
|
||||
};
|
||||
|
||||
if (_parameters->is_prune_materialized_view()) {
|
||||
if (strong_consistency::is_strongly_consistent(db, schema->ks_name())) {
|
||||
stmt = ::make_shared<strong_consistency::select_statement>(
|
||||
schema,
|
||||
ctx.bound_variables_size(),
|
||||
_parameters,
|
||||
std::move(selection),
|
||||
std::move(restrictions),
|
||||
std::move(group_by_cell_indices),
|
||||
is_reversed_,
|
||||
std::move(ordering_comparator),
|
||||
prepare_limit(db, ctx, _limit),
|
||||
prepare_limit(db, ctx, _per_partition_limit),
|
||||
stats,
|
||||
std::move(prepared_attrs));
|
||||
} else if (_parameters->is_prune_materialized_view()) {
|
||||
stmt = ::make_shared<cql3::statements::prune_materialized_view_statement>(
|
||||
schema,
|
||||
ctx.bound_variables_size(),
|
||||
@@ -2390,10 +2519,10 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
prepare_limit(db, ctx, _per_partition_limit),
|
||||
stats,
|
||||
std::move(prepared_attrs));
|
||||
} else if (prepared_ann_ordering) {
|
||||
} else if (is_ann_query) {
|
||||
stmt = vector_indexed_table_select_statement::prepare(db, schema, ctx.bound_variables_size(), _parameters, std::move(selection), std::move(restrictions),
|
||||
std::move(group_by_cell_indices), is_reversed_, std::move(ordering_comparator), std::move(*prepared_ann_ordering),
|
||||
prepare_limit(db, ctx, _limit), prepare_limit(db, ctx, _per_partition_limit), stats, std::move(prepared_attrs));
|
||||
std::move(group_by_cell_indices), is_reversed_, std::move(ordering_comparator), std::move(ann_ordering_info_opt->_prepared_ann_ordering),
|
||||
prepare_limit(db, ctx, _limit), prepare_limit(db, ctx, _per_partition_limit), stats, ann_ordering_info_opt->_index, std::move(prepared_attrs));
|
||||
} else if (restrictions->uses_secondary_indexing()) {
|
||||
stmt = view_indexed_table_select_statement::prepare(
|
||||
db,
|
||||
@@ -2425,7 +2554,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
std::move(prepared_attrs)
|
||||
);
|
||||
} else if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
|
||||
stmt = ::make_shared<cql3::statements::strongly_consistent_select_statement>(
|
||||
stmt = ::make_shared<cql3::statements::broadcast_select_statement>(
|
||||
schema,
|
||||
ctx.bound_variables_size(),
|
||||
_parameters,
|
||||
@@ -2615,28 +2744,6 @@ void select_statement::verify_ordering_is_valid(const prepared_orderings_type& o
|
||||
}
|
||||
}
|
||||
|
||||
select_statement::prepared_ann_ordering_type select_statement::prepare_ann_ordering(const schema& schema, prepare_context& ctx, data_dictionary::database db) const {
|
||||
auto [column_id, ordering] = _parameters->orderings().front();
|
||||
const auto& ann_vector = std::get_if<select_statement::ann_vector>(&ordering);
|
||||
SCYLLA_ASSERT(ann_vector);
|
||||
|
||||
::shared_ptr<column_identifier> column = column_id->prepare_column_identifier(schema);
|
||||
const column_definition* def = schema.get_column_definition(column->name());
|
||||
if (!def) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
fmt::format("Undefined column name {}", column->text()));
|
||||
}
|
||||
|
||||
if (!def->type->is_vector() || static_cast<const vector_type_impl*>(def->type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering is only supported on float vector indexes");
|
||||
}
|
||||
|
||||
auto e = expr::prepare_expression(*ann_vector, db, keyspace(), nullptr, def->column_specification);
|
||||
expr::fill_prepare_context(e, ctx);
|
||||
|
||||
return std::make_pair(std::move(def), std::move(e));
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type select_statement::get_ordering_comparator(const prepared_orderings_type& orderings,
|
||||
selection::selection& selection,
|
||||
const restrictions::statement_restrictions& restrictions) {
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "locator/host_id.hh"
|
||||
#include "service/cas_shard.hh"
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
#include "vector_search/filter.hh"
|
||||
|
||||
namespace service {
|
||||
class client_state;
|
||||
@@ -362,6 +363,7 @@ private:
|
||||
class vector_indexed_table_select_statement : public select_statement {
|
||||
secondary_index::index _index;
|
||||
prepared_ann_ordering_type _prepared_ann_ordering;
|
||||
vector_search::prepared_filter _prepared_filter;
|
||||
mutable gc_clock::time_point _query_start_time_point;
|
||||
|
||||
public:
|
||||
@@ -371,13 +373,13 @@ public:
|
||||
lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, std::unique_ptr<cql3::attributes> attrs);
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<cql3::attributes> attrs);
|
||||
|
||||
vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator,
|
||||
prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit, std::optional<expr::expression> per_partition_limit,
|
||||
cql_stats& stats, const secondary_index::index& index, std::unique_ptr<cql3::attributes> attrs);
|
||||
cql_stats& stats, const secondary_index::index& index, vector_search::prepared_filter prepared_filter, std::unique_ptr<cql3::attributes> attrs);
|
||||
|
||||
private:
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> do_execute(
|
||||
@@ -385,7 +387,7 @@ private:
|
||||
|
||||
void update_stats() const;
|
||||
|
||||
lw_shared_ptr<query::read_command> prepare_command_for_base_query(query_processor& qp, service::query_state& state, const query_options& options) const;
|
||||
lw_shared_ptr<query::read_command> prepare_command_for_base_query(query_processor& qp, service::query_state& state, const query_options& options, uint64_t fetch_limit) const;
|
||||
|
||||
std::vector<float> get_ann_ordering_vector(const query_options& options) const;
|
||||
|
||||
|
||||
82
cql3/statements/strong_consistency/modification_statement.cc
Normal file
82
cql3/statements/strong_consistency/modification_statement.cc
Normal file
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "modification_statement.hh"
|
||||
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/strong_consistency/coordinator.hh"
|
||||
#include "cql3/statements/strong_consistency/statement_helpers.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
static logging::logger logger("sc_modification_statement");
|
||||
|
||||
modification_statement::modification_statement(shared_ptr<base_statement> statement)
|
||||
: cql_statement_opt_metadata(&timeout_config::write_timeout)
|
||||
, _statement(std::move(statement))
|
||||
{
|
||||
}
|
||||
|
||||
using result_message = cql_transport::messages::result_message;
|
||||
|
||||
future<shared_ptr<result_message>> modification_statement::execute(query_processor& qp, service::query_state& qs,
|
||||
const query_options& options, std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<result_message>>);
|
||||
}
|
||||
|
||||
future<shared_ptr<result_message>> modification_statement::execute_without_checking_exception_message(
|
||||
query_processor& qp, service::query_state& qs, const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
auto json_cache = base_statement::json_cache_opt{};
|
||||
const auto keys = _statement->build_partition_keys(options, json_cache);
|
||||
if (keys.size() != 1 || !query::is_single_partition(keys[0])) {
|
||||
throw exceptions::invalid_request_exception("Strongly consistent queries can only target a single partition");
|
||||
}
|
||||
if (_statement->requires_read()) {
|
||||
throw exceptions::invalid_request_exception("Strongly consistent updates don't support data prefetch");
|
||||
}
|
||||
|
||||
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
|
||||
const auto mutate_result = co_await coordinator.get().mutate(_statement->s,
|
||||
keys[0].start()->value().token(),
|
||||
[&](api::timestamp_type ts) {
|
||||
const auto prefetch_data = update_parameters::prefetch_data(_statement->s);
|
||||
const auto ttl = _statement->get_time_to_live(options);
|
||||
const auto params = update_parameters(_statement->s, options, ts, ttl, prefetch_data);
|
||||
const auto ranges = _statement->create_clustering_ranges(options, json_cache);
|
||||
auto muts = _statement->apply_updates(keys, ranges, params, json_cache);
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(logger, ::format("statement '{}' has unexpected number of mutations {}",
|
||||
raw_cql_statement, muts.size()));
|
||||
}
|
||||
return std::move(*muts.begin());
|
||||
});
|
||||
|
||||
using namespace service::strong_consistency;
|
||||
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
|
||||
co_return co_await redirect_statement(qp, options, redirect->target);
|
||||
}
|
||||
|
||||
co_return seastar::make_shared<result_message::void_message>();
|
||||
}
|
||||
|
||||
future<> modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
return _statement->check_access(qp, state);
|
||||
}
|
||||
|
||||
uint32_t modification_statement::get_bound_terms() const {
|
||||
return _statement->get_bound_terms();
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return _statement->depends_on(ks_name, cf_name);
|
||||
}
|
||||
}
|
||||
39
cql3/statements/strong_consistency/modification_statement.hh
Normal file
39
cql3/statements/strong_consistency/modification_statement.hh
Normal file
@@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/expr/expression.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
|
||||
class modification_statement : public cql_statement_opt_metadata {
|
||||
using result_message = cql_transport::messages::result_message;
|
||||
using base_statement = cql3::statements::modification_statement;
|
||||
|
||||
shared_ptr<base_statement> _statement;
|
||||
public:
|
||||
modification_statement(shared_ptr<base_statement> statement);
|
||||
|
||||
future<shared_ptr<result_message>> execute(query_processor& qp, service::query_state& state,
|
||||
const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
future<shared_ptr<result_message>> execute_without_checking_exception_message(query_processor& qp,
|
||||
service::query_state& qs, const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
};
|
||||
|
||||
}
|
||||
56
cql3/statements/strong_consistency/select_statement.cc
Normal file
56
cql3/statements/strong_consistency/select_statement.cc
Normal file
@@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "select_statement.hh"
|
||||
|
||||
#include "query/query-request.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/strong_consistency/coordinator.hh"
|
||||
#include "cql3/statements/strong_consistency/statement_helpers.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
|
||||
using result_message = cql_transport::messages::result_message;
|
||||
|
||||
future<::shared_ptr<result_message>> select_statement::do_execute(query_processor& qp,
|
||||
service::query_state& state,
|
||||
const query_options& options) const
|
||||
{
|
||||
const auto key_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
if (key_ranges.size() != 1 || !query::is_single_partition(key_ranges[0])) {
|
||||
throw exceptions::invalid_request_exception("Strongly consistent queries can only target a single partition");
|
||||
}
|
||||
const auto now = gc_clock::now();
|
||||
auto read_command = make_lw_shared<query::read_command>(
|
||||
_query_schema->id(),
|
||||
_query_schema->version(),
|
||||
make_partition_slice(options),
|
||||
query::max_result_size(query::result_memory_limiter::maximum_result_size),
|
||||
query::tombstone_limit(query::tombstone_limit::max),
|
||||
query::row_limit(get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate())),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
tracing::make_trace_info(state.get_trace_state()),
|
||||
query_id::create_null_id(),
|
||||
query::is_first_page::no,
|
||||
options.get_timestamp(state));
|
||||
const auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
|
||||
auto query_result = co_await coordinator.get().query(_query_schema, *read_command,
|
||||
key_ranges, state.get_trace_state(), timeout);
|
||||
|
||||
using namespace service::strong_consistency;
|
||||
if (const auto* redirect = get_if<need_redirect>(&query_result)) {
|
||||
co_return co_await redirect_statement(qp, options, redirect->target);
|
||||
}
|
||||
|
||||
co_return co_await process_results(get<lw_shared_ptr<query::result>>(std::move(query_result)),
|
||||
read_command, options, now);
|
||||
}
|
||||
|
||||
}
|
||||
26
cql3/statements/strong_consistency/select_statement.hh
Normal file
26
cql3/statements/strong_consistency/select_statement.hh
Normal file
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/statements/select_statement.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
|
||||
class select_statement : public cql3::statements::select_statement {
|
||||
using result_message = cql_transport::messages::result_message;
|
||||
|
||||
public:
|
||||
using cql3::statements::select_statement::select_statement;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> do_execute(query_processor& qp,
|
||||
service::query_state& state, const query_options& options) const override;
|
||||
};
|
||||
|
||||
}
|
||||
37
cql3/statements/strong_consistency/statement_helpers.cc
Normal file
37
cql3/statements/strong_consistency/statement_helpers.cc
Normal file
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "statement_helpers.hh"
|
||||
|
||||
#include "transport/messages/result_message_base.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "locator/tablet_replication_strategy.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(query_processor& qp,
|
||||
const query_options& options,
|
||||
const locator::tablet_replica& target)
|
||||
{
|
||||
const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id();
|
||||
if (target.host != my_host_id) {
|
||||
throw exceptions::invalid_request_exception(format(
|
||||
"Strongly consistent writes can be executed only on the leader node, "
|
||||
"leader id {}, current host id {}",
|
||||
target.host, my_host_id));
|
||||
}
|
||||
auto&& func_values_cache = const_cast<cql3::query_options&>(options).take_cached_pk_function_calls();
|
||||
co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache));
|
||||
}
|
||||
|
||||
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name) {
|
||||
const auto* tablet_aware_rs = db.find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware();
|
||||
return tablet_aware_rs && tablet_aware_rs->get_consistency() != data_dictionary::consistency_config_option::eventual;
|
||||
}
|
||||
|
||||
}
|
||||
23
cql3/statements/strong_consistency/statement_helpers.hh
Normal file
23
cql3/statements/strong_consistency/statement_helpers.hh
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "locator/tablets.hh"
|
||||
|
||||
namespace cql3::statements::strong_consistency {
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(
|
||||
query_processor& qp,
|
||||
const query_options& options,
|
||||
const locator::tablet_replica& target);
|
||||
|
||||
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name);
|
||||
|
||||
}
|
||||
@@ -13,7 +13,7 @@
|
||||
#include "cql3/expr/expression.hh"
|
||||
#include "cql3/expr/evaluate.hh"
|
||||
#include "cql3/expr/expr-utils.hh"
|
||||
#include "cql3/statements/strongly_consistent_modification_statement.hh"
|
||||
#include "cql3/statements/broadcast_modification_statement.hh"
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
#include "raw/update_statement.hh"
|
||||
|
||||
@@ -333,7 +333,7 @@ std::optional<expr::expression> get_value_condition(const expr::expression& the_
|
||||
return binop->rhs;
|
||||
}
|
||||
|
||||
::shared_ptr<strongly_consistent_modification_statement>
|
||||
::shared_ptr<broadcast_modification_statement>
|
||||
update_statement::prepare_for_broadcast_tables() const {
|
||||
if (attrs) {
|
||||
if (attrs->is_time_to_live_set()) {
|
||||
@@ -359,7 +359,7 @@ update_statement::prepare_for_broadcast_tables() const {
|
||||
.value_condition = get_value_condition(_condition),
|
||||
};
|
||||
|
||||
return ::make_shared<strongly_consistent_modification_statement>(
|
||||
return ::make_shared<broadcast_modification_statement>(
|
||||
get_bound_terms(),
|
||||
s,
|
||||
query
|
||||
|
||||
@@ -45,7 +45,7 @@ private:
|
||||
virtual void execute_operations_for_key(mutation& m, const clustering_key_prefix& prefix, const update_parameters& params, const json_cache_opt& json_cache) const;
|
||||
|
||||
public:
|
||||
virtual ::shared_ptr<strongly_consistent_modification_statement> prepare_for_broadcast_tables() const override;
|
||||
virtual ::shared_ptr<broadcast_modification_statement> prepare_for_broadcast_tables() const override;
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
@@ -323,6 +323,9 @@ void cache_mutation_reader::touch_partition() {
|
||||
|
||||
inline
|
||||
future<> cache_mutation_reader::fill_buffer() {
|
||||
if (const auto& ex = get_abort_exception(); ex) {
|
||||
return make_exception_future<>(ex);
|
||||
}
|
||||
if (_state == state::before_static_row) {
|
||||
touch_partition();
|
||||
auto after_static_row = [this] {
|
||||
|
||||
26
db/config.cc
26
db/config.cc
@@ -1341,7 +1341,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
|
||||
"Server-global user table compression options. If enabled, all user tables"
|
||||
"will be compressed using the provided options, unless overridden"
|
||||
"by compression options in the table schema. The available options are:\n"
|
||||
"by compression options in the table schema. User tables are all tables in non-system keyspaces. The available options are:\n"
|
||||
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
|
||||
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
|
||||
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
|
||||
@@ -1584,7 +1584,14 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
|
||||
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
|
||||
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
|
||||
"keyspaces, attempting to start a node with this option ON will fail.")
|
||||
"keyspaces, attempting to start a node with this option ON will fail. "
|
||||
"DEPRECATED. Use enforce_rack_list instead.")
|
||||
, enforce_rack_list(this, "enforce_rack_list", liveness::MustRestart, value_status::Used, false,
|
||||
"Enforce rack list for tablet keyspaces. "
|
||||
"When the option is on, CREATE STATEMENT expands numeric rfs to rack lists "
|
||||
"and ALTER STATEMENT is allowed only when rack lists are used in all DCs."
|
||||
"Additionally, if there are existing tablet keyspaces with numeric rf in any DC "
|
||||
"attempting to start a node with this option ON will fail.")
|
||||
// FIXME: make frequency per table in order to reduce work in each iteration.
|
||||
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
|
||||
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
|
||||
@@ -1785,6 +1792,21 @@ const db::extensions& db::config::extensions() const {
|
||||
return *_extensions;
|
||||
}
|
||||
|
||||
compression_parameters db::config::get_sstable_compression_user_table_options(bool dicts_feature_enabled) const {
|
||||
if (sstable_compression_user_table_options.is_set()
|
||||
|| dicts_feature_enabled
|
||||
|| !sstable_compression_user_table_options().uses_dictionary_compressor()) {
|
||||
return sstable_compression_user_table_options();
|
||||
} else {
|
||||
// Fall back to non-dict if dictionary compression is not enabled cluster-wide.
|
||||
auto options = sstable_compression_user_table_options();
|
||||
auto params = options.get_options();
|
||||
auto algo = compression_parameters::non_dict_equivalent(options.get_algorithm());
|
||||
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(algo));
|
||||
return compression_parameters{params};
|
||||
}
|
||||
}
|
||||
|
||||
std::map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
|
||||
@@ -419,7 +419,13 @@ public:
|
||||
named_value<bool> enable_sstables_mc_format;
|
||||
named_value<bool> enable_sstables_md_format;
|
||||
named_value<sstring> sstable_format;
|
||||
|
||||
// NOTE: Do not use this option directly.
|
||||
// Use get_sstable_compression_user_table_options() instead.
|
||||
named_value<compression_parameters> sstable_compression_user_table_options;
|
||||
|
||||
compression_parameters get_sstable_compression_user_table_options(bool dicts_feature_enabled) const;
|
||||
|
||||
named_value<bool> sstable_compression_dictionaries_allow_in_ddl;
|
||||
named_value<bool> sstable_compression_dictionaries_enable_writing;
|
||||
named_value<float> sstable_compression_dictionaries_memory_budget_fraction;
|
||||
@@ -599,6 +605,7 @@ public:
|
||||
named_value<bool> enable_create_table_with_compact_storage;
|
||||
|
||||
named_value<bool> rf_rack_valid_keyspaces;
|
||||
named_value<bool> enforce_rack_list;
|
||||
|
||||
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
|
||||
named_value<bool> force_capacity_based_balancing;
|
||||
|
||||
@@ -850,7 +850,7 @@ mutation_reader row_cache::make_nonpopulating_reader(schema_ptr schema, reader_p
|
||||
std::move(permit),
|
||||
e.key(),
|
||||
query::clustering_key_filter_ranges(slice.row_ranges(*schema, e.key().key())),
|
||||
e.partition().read(_tracker.region(), _tracker.memtable_cleaner(), nullptr, phase_of(pos)),
|
||||
e.partition().read(_tracker.region(), _tracker.memtable_cleaner(), &_tracker, phase_of(pos)),
|
||||
false,
|
||||
_tracker.region(),
|
||||
_read_section,
|
||||
|
||||
@@ -96,16 +96,16 @@ static logging::logger diff_logger("schema_diff");
|
||||
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
|
||||
namespace db {
|
||||
namespace {
|
||||
const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
if (ks_name == schema_tables::NAME) {
|
||||
props.enable_schema_commitlog();
|
||||
const auto set_use_schema_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
if (builder.ks_name() == schema_tables::NAME) {
|
||||
builder.enable_schema_commitlog();
|
||||
}
|
||||
});
|
||||
const auto set_group0_table_options =
|
||||
schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
if (ks_name == schema_tables::NAME) {
|
||||
schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
if (builder.ks_name() == schema_tables::NAME) {
|
||||
// all schema tables are group0 tables
|
||||
props.is_group0_table = true;
|
||||
builder.set_is_group0_table(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
if (tag.empty()) {
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
|
||||
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
|
||||
};
|
||||
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
});
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
if (ks_name.empty()) {
|
||||
throw std::runtime_error("You must supply a keyspace name");
|
||||
}
|
||||
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
|
||||
|
||||
@@ -38,10 +38,14 @@ class backup_task_impl;
|
||||
|
||||
} // snapshot namespace
|
||||
|
||||
struct snapshot_options {
|
||||
bool skip_flush = false;
|
||||
gc_clock::time_point created_at = gc_clock::now();
|
||||
std::optional<gc_clock::time_point> expires_at;
|
||||
};
|
||||
|
||||
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
|
||||
public:
|
||||
using skip_flush = bool_class<class skip_flush_tag>;
|
||||
|
||||
struct table_snapshot_details {
|
||||
int64_t total;
|
||||
int64_t live;
|
||||
@@ -70,8 +74,8 @@ public:
|
||||
*
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
|
||||
return take_snapshot(tag, {}, sf);
|
||||
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
|
||||
return take_snapshot(tag, {}, opts);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -80,7 +84,7 @@ public:
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
|
||||
|
||||
/**
|
||||
* Takes the snapshot of multiple tables. A snapshot name must be specified.
|
||||
@@ -89,7 +93,7 @@ public:
|
||||
* @param tables a vector of tables names to snapshot
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
|
||||
/**
|
||||
* Remove the snapshot with the given name from the given keyspaces.
|
||||
@@ -127,8 +131,8 @@ private:
|
||||
|
||||
friend class snapshot::backup_task_impl;
|
||||
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -42,11 +42,11 @@ extern logging::logger cdc_log;
|
||||
|
||||
namespace db {
|
||||
namespace {
|
||||
const auto set_wait_for_sync_to_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
if ((ks_name == system_distributed_keyspace::NAME_EVERYWHERE && cf_name == system_distributed_keyspace::CDC_GENERATIONS_V2) ||
|
||||
(ks_name == system_distributed_keyspace::NAME && cf_name == system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION))
|
||||
const auto set_wait_for_sync_to_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
if ((builder.ks_name() == system_distributed_keyspace::NAME_EVERYWHERE && builder.cf_name() == system_distributed_keyspace::CDC_GENERATIONS_V2) ||
|
||||
(builder.ks_name() == system_distributed_keyspace::NAME && builder.cf_name() == system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION))
|
||||
{
|
||||
props.wait_for_sync_to_commitlog = true;
|
||||
builder.set_wait_for_sync_to_commitlog(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -66,24 +66,24 @@ static thread_local auto sstableinfo_type = user_type_impl::get_instance(
|
||||
|
||||
namespace db {
|
||||
namespace {
|
||||
const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
const auto set_null_sharder = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
// tables in the "system" keyspace which need to use null sharder
|
||||
static const std::unordered_set<sstring> tables = {
|
||||
// empty
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.use_null_sharder = true;
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.set_use_null_sharder(true);
|
||||
}
|
||||
});
|
||||
const auto set_wait_for_sync_to_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
const auto set_wait_for_sync_to_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
static const std::unordered_set<sstring> tables = {
|
||||
system_keyspace::PAXOS,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.wait_for_sync_to_commitlog = true;
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.set_wait_for_sync_to_commitlog(true);
|
||||
}
|
||||
});
|
||||
const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
const auto set_use_schema_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
static const std::unordered_set<sstring> tables = {
|
||||
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
|
||||
system_keyspace::BROADCAST_KV_STORE,
|
||||
@@ -108,18 +108,18 @@ namespace {
|
||||
system_keyspace::ROLE_MEMBERS,
|
||||
system_keyspace::ROLE_ATTRIBUTES,
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::v3::CDC_LOCAL,
|
||||
system_keyspace::CDC_LOCAL,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.enable_schema_commitlog();
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.enable_schema_commitlog();
|
||||
}
|
||||
});
|
||||
|
||||
const auto set_group0_table_options =
|
||||
schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
static const std::unordered_set<sstring> tables = {
|
||||
// scylla_local may store a replicated tombstone related to schema
|
||||
// (see `make_group0_schema_version_mutation`), so we include it in the group0 tables list.
|
||||
@@ -142,8 +142,8 @@ namespace {
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
system_keyspace::REPAIR_TASKS,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.set_is_group0_table(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -918,7 +918,7 @@ schema_ptr system_keyspace::corrupt_data() {
|
||||
return scylla_local;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::batches() {
|
||||
schema_ptr system_keyspace::batches() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BATCHES), NAME, BATCHES,
|
||||
// partition key
|
||||
@@ -946,53 +946,7 @@ schema_ptr system_keyspace::v3::batches() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::built_indexes() {
|
||||
// identical to ours, but ours otoh is a mix-in of the 3.x series cassandra one
|
||||
return db::system_keyspace::built_indexes();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::local() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, LOCAL), NAME, LOCAL,
|
||||
// partition key
|
||||
{{"key", utf8_type}},
|
||||
// clustering key
|
||||
{},
|
||||
// regular columns
|
||||
{
|
||||
{"bootstrapped", utf8_type},
|
||||
{"broadcast_address", inet_addr_type},
|
||||
{"cluster_name", utf8_type},
|
||||
{"cql_version", utf8_type},
|
||||
{"data_center", utf8_type},
|
||||
{"gossip_generation", int32_type},
|
||||
{"host_id", uuid_type},
|
||||
{"listen_address", inet_addr_type},
|
||||
{"native_protocol_version", utf8_type},
|
||||
{"partitioner", utf8_type},
|
||||
{"rack", utf8_type},
|
||||
{"release_version", utf8_type},
|
||||
{"rpc_address", inet_addr_type},
|
||||
{"schema_version", uuid_type},
|
||||
{"thrift_version", utf8_type},
|
||||
{"tokens", set_type_impl::get_instance(utf8_type, true)},
|
||||
{"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, true)},
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"information about the local node"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.with_hash_version();
|
||||
return builder.build(schema_builder::compact_storage::no);
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::truncated() {
|
||||
schema_ptr system_keyspace::truncated() {
|
||||
static thread_local auto local = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, TRUNCATED), NAME, TRUNCATED,
|
||||
// partition key
|
||||
@@ -1022,7 +976,7 @@ schema_ptr system_keyspace::v3::truncated() {
|
||||
|
||||
thread_local data_type replay_position_type = tuple_type_impl::get_instance({long_type, int32_type});
|
||||
|
||||
schema_ptr system_keyspace::v3::commitlog_cleanups() {
|
||||
schema_ptr system_keyspace::commitlog_cleanups() {
|
||||
static thread_local auto local = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, COMMITLOG_CLEANUPS), NAME, COMMITLOG_CLEANUPS,
|
||||
// partition key
|
||||
@@ -1049,47 +1003,7 @@ schema_ptr system_keyspace::v3::commitlog_cleanups() {
|
||||
return local;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::peers() {
|
||||
// identical
|
||||
return db::system_keyspace::peers();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::peer_events() {
|
||||
// identical
|
||||
return db::system_keyspace::peer_events();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::range_xfers() {
|
||||
// identical
|
||||
return db::system_keyspace::range_xfers();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::compaction_history() {
|
||||
// identical
|
||||
return db::system_keyspace::compaction_history();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::sstable_activity() {
|
||||
// identical
|
||||
return db::system_keyspace::sstable_activity();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::size_estimates() {
|
||||
// identical
|
||||
return db::system_keyspace::size_estimates();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::large_partitions() {
|
||||
// identical
|
||||
return db::system_keyspace::large_partitions();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::scylla_local() {
|
||||
// identical
|
||||
return db::system_keyspace::scylla_local();
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::available_ranges() {
|
||||
schema_ptr system_keyspace::available_ranges() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, AVAILABLE_RANGES), NAME, AVAILABLE_RANGES,
|
||||
// partition key
|
||||
@@ -1112,7 +1026,7 @@ schema_ptr system_keyspace::v3::available_ranges() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::views_builds_in_progress() {
|
||||
schema_ptr system_keyspace::views_builds_in_progress() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, VIEWS_BUILDS_IN_PROGRESS), NAME, VIEWS_BUILDS_IN_PROGRESS,
|
||||
// partition key
|
||||
@@ -1135,7 +1049,7 @@ schema_ptr system_keyspace::v3::views_builds_in_progress() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::built_views() {
|
||||
schema_ptr system_keyspace::built_views() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BUILT_VIEWS), NAME, BUILT_VIEWS,
|
||||
// partition key
|
||||
@@ -1158,7 +1072,7 @@ schema_ptr system_keyspace::v3::built_views() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::v3::scylla_views_builds_in_progress() {
|
||||
schema_ptr system_keyspace::scylla_views_builds_in_progress() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, std::make_optional(id))
|
||||
@@ -1174,7 +1088,7 @@ schema_ptr system_keyspace::v3::scylla_views_builds_in_progress() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
/*static*/ schema_ptr system_keyspace::v3::cdc_local() {
|
||||
/*static*/ schema_ptr system_keyspace::cdc_local() {
|
||||
static thread_local auto cdc_local = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, CDC_LOCAL), NAME, CDC_LOCAL,
|
||||
// partition key
|
||||
@@ -2180,21 +2094,21 @@ future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) {
|
||||
co_await std::visit(make_visitor(
|
||||
[this] (cdc::generation_id_v1 id) -> future<> {
|
||||
co_await execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
|
||||
sstring(v3::CDC_LOCAL), id.ts);
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", CDC_LOCAL),
|
||||
sstring(CDC_LOCAL), id.ts);
|
||||
},
|
||||
[this] (cdc::generation_id_v2 id) -> future<> {
|
||||
co_await execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", v3::CDC_LOCAL),
|
||||
sstring(v3::CDC_LOCAL), id.ts, id.id);
|
||||
format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", CDC_LOCAL),
|
||||
sstring(CDC_LOCAL), id.ts, id.id);
|
||||
}
|
||||
), gen_id);
|
||||
}
|
||||
|
||||
future<std::optional<cdc::generation_id>> system_keyspace::get_cdc_generation_id() {
|
||||
auto msg = co_await execute_cql(
|
||||
format("SELECT streams_timestamp, uuid FROM system.{} WHERE key = ?", v3::CDC_LOCAL),
|
||||
sstring(v3::CDC_LOCAL));
|
||||
format("SELECT streams_timestamp, uuid FROM system.{} WHERE key = ?", CDC_LOCAL),
|
||||
sstring(CDC_LOCAL));
|
||||
|
||||
if (msg->empty()) {
|
||||
co_return std::nullopt;
|
||||
@@ -2220,19 +2134,19 @@ static const sstring CDC_REWRITTEN_KEY = "rewritten";
|
||||
future<> system_keyspace::cdc_set_rewritten(std::optional<cdc::generation_id_v1> gen_id) {
|
||||
if (gen_id) {
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", CDC_LOCAL),
|
||||
CDC_REWRITTEN_KEY, gen_id->ts).discard_result();
|
||||
} else {
|
||||
// Insert just the row marker.
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (key) VALUES (?)", v3::CDC_LOCAL),
|
||||
format("INSERT INTO system.{} (key) VALUES (?)", CDC_LOCAL),
|
||||
CDC_REWRITTEN_KEY).discard_result();
|
||||
}
|
||||
}
|
||||
|
||||
future<bool> system_keyspace::cdc_is_rewritten() {
|
||||
// We don't care about the actual timestamp; it's additional information for debugging purposes.
|
||||
return execute_cql(format("SELECT key FROM system.{} WHERE key = ?", v3::CDC_LOCAL), CDC_REWRITTEN_KEY)
|
||||
return execute_cql(format("SELECT key FROM system.{} WHERE key = ?", CDC_LOCAL), CDC_REWRITTEN_KEY)
|
||||
.then([] (::shared_ptr<cql3::untyped_result_set> msg) {
|
||||
return !msg->empty();
|
||||
});
|
||||
@@ -2376,11 +2290,11 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
scylla_local(), db::schema_tables::scylla_table_schema_history(),
|
||||
repair_history(),
|
||||
repair_tasks(),
|
||||
v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
v3::truncated(),
|
||||
v3::commitlog_cleanups(),
|
||||
v3::cdc_local(),
|
||||
views_builds_in_progress(), built_views(),
|
||||
scylla_views_builds_in_progress(),
|
||||
truncated(),
|
||||
commitlog_cleanups(),
|
||||
cdc_local(),
|
||||
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
|
||||
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
|
||||
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
|
||||
@@ -2403,7 +2317,7 @@ static bool maybe_write_in_user_memory(schema_ptr s) {
|
||||
return (s.get() == system_keyspace::batchlog().get())
|
||||
|| (s.get() == system_keyspace::batchlog_v2().get())
|
||||
|| (s.get() == system_keyspace::paxos().get())
|
||||
|| s == system_keyspace::v3::scylla_views_builds_in_progress();
|
||||
|| s == system_keyspace::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
future<> system_keyspace::make(
|
||||
@@ -2689,7 +2603,7 @@ mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::v
|
||||
|
||||
future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
@@ -2705,7 +2619,7 @@ future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_n
|
||||
// before all shards are registered.
|
||||
// if another shard has already registered, this won't overwrite its status. if it hasn't registered, we insert
|
||||
// a status with first_token=null and next_token=null, indicating it hasn't made progress.
|
||||
auto&& schema = db::system_keyspace::v3::scylla_views_builds_in_progress();
|
||||
auto&& schema = db::system_keyspace::scylla_views_builds_in_progress();
|
||||
auto timestamp = api::new_timestamp();
|
||||
mutation m{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks_name))};
|
||||
|
||||
@@ -2723,7 +2637,7 @@ future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_n
|
||||
|
||||
future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
@@ -2734,14 +2648,14 @@ future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring vi
|
||||
|
||||
future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
int32_t(this_shard_id())).discard_result();
|
||||
@@ -2749,20 +2663,20 @@ future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring vi
|
||||
|
||||
future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
|
||||
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_views() {
|
||||
return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return execute_cql(format("SELECT * FROM system.{}", BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return *cql_result
|
||||
| std::views::transform([] (const cql3::untyped_result_set::row& row) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
@@ -2774,7 +2688,7 @@ future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_view
|
||||
|
||||
future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_view_build_progress() {
|
||||
return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
std::vector<view_build_progress> progress;
|
||||
for (auto& row : *cql_result) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
@@ -3227,6 +3141,8 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
const bool strongly_consistent_tables = _db.features().strongly_consistent_tables;
|
||||
|
||||
for (auto& row : *rs) {
|
||||
if (!row.has("host_id")) {
|
||||
// There are no clustering rows, only the static row.
|
||||
@@ -3463,7 +3379,9 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
ret.session = service::session_id(some_row.get_as<utils::UUID>("session"));
|
||||
}
|
||||
|
||||
if (some_row.has("tablet_balancing_enabled")) {
|
||||
if (strongly_consistent_tables) {
|
||||
ret.tablet_balancing_enabled = false;
|
||||
} else if (some_row.has("tablet_balancing_enabled")) {
|
||||
ret.tablet_balancing_enabled = some_row.get_as<bool>("tablet_balancing_enabled");
|
||||
} else {
|
||||
ret.tablet_balancing_enabled = true;
|
||||
|
||||
@@ -127,6 +127,8 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
|
||||
|
||||
static schema_ptr raft_snapshot_config();
|
||||
static schema_ptr local();
|
||||
static schema_ptr truncated();
|
||||
static schema_ptr commitlog_cleanups();
|
||||
static schema_ptr peers();
|
||||
static schema_ptr peer_events();
|
||||
static schema_ptr range_xfers();
|
||||
@@ -137,7 +139,10 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
|
||||
static schema_ptr large_rows();
|
||||
static schema_ptr large_cells();
|
||||
static schema_ptr corrupt_data();
|
||||
static schema_ptr scylla_local();
|
||||
static schema_ptr batches();
|
||||
static schema_ptr available_ranges();
|
||||
static schema_ptr built_views();
|
||||
static schema_ptr cdc_local();
|
||||
future<> force_blocking_flush(sstring cfname);
|
||||
// This function is called when the system.peers table is read,
|
||||
// and it fixes some types of inconsistencies that can occur
|
||||
@@ -204,6 +209,12 @@ public:
|
||||
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
|
||||
static constexpr auto CLIENT_ROUTES = "client_routes";
|
||||
static constexpr auto VERSIONS = "versions";
|
||||
static constexpr auto BATCHES = "batches";
|
||||
static constexpr auto AVAILABLE_RANGES = "available_ranges";
|
||||
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
|
||||
static constexpr auto BUILT_VIEWS = "built_views";
|
||||
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
|
||||
static constexpr auto CDC_LOCAL = "cdc_local";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
@@ -211,42 +222,6 @@ public:
|
||||
static constexpr auto ROLE_ATTRIBUTES = "role_attributes";
|
||||
static constexpr auto ROLE_PERMISSIONS = "role_permissions";
|
||||
|
||||
struct v3 {
|
||||
static constexpr auto BATCHES = "batches";
|
||||
static constexpr auto PAXOS = "paxos";
|
||||
static constexpr auto BUILT_INDEXES = "IndexInfo";
|
||||
static constexpr auto LOCAL = "local";
|
||||
static constexpr auto PEERS = "peers";
|
||||
static constexpr auto PEER_EVENTS = "peer_events";
|
||||
static constexpr auto RANGE_XFERS = "range_xfers";
|
||||
static constexpr auto COMPACTION_HISTORY = "compaction_history";
|
||||
static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
|
||||
static constexpr auto SIZE_ESTIMATES = "size_estimates";
|
||||
static constexpr auto AVAILABLE_RANGES = "available_ranges";
|
||||
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
|
||||
static constexpr auto BUILT_VIEWS = "built_views";
|
||||
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
|
||||
static constexpr auto CDC_LOCAL = "cdc_local";
|
||||
static schema_ptr batches();
|
||||
static schema_ptr built_indexes();
|
||||
static schema_ptr local();
|
||||
static schema_ptr truncated();
|
||||
static schema_ptr commitlog_cleanups();
|
||||
static schema_ptr peers();
|
||||
static schema_ptr peer_events();
|
||||
static schema_ptr range_xfers();
|
||||
static schema_ptr compaction_history();
|
||||
static schema_ptr sstable_activity();
|
||||
static schema_ptr size_estimates();
|
||||
static schema_ptr large_partitions();
|
||||
static schema_ptr scylla_local();
|
||||
static schema_ptr available_ranges();
|
||||
static schema_ptr views_builds_in_progress();
|
||||
static schema_ptr built_views();
|
||||
static schema_ptr scylla_views_builds_in_progress();
|
||||
static schema_ptr cdc_local();
|
||||
};
|
||||
|
||||
// Partition estimates for a given range of tokens.
|
||||
struct range_estimates {
|
||||
schema_ptr schema;
|
||||
@@ -264,6 +239,7 @@ public:
|
||||
static schema_ptr batchlog_v2();
|
||||
static schema_ptr paxos();
|
||||
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
static schema_ptr scylla_local();
|
||||
static schema_ptr raft();
|
||||
static schema_ptr raft_snapshots();
|
||||
static schema_ptr repair_history();
|
||||
@@ -283,6 +259,8 @@ public:
|
||||
static schema_ptr dicts();
|
||||
static schema_ptr view_building_tasks();
|
||||
static schema_ptr client_routes();
|
||||
static schema_ptr views_builds_in_progress();
|
||||
static schema_ptr scylla_views_builds_in_progress();
|
||||
|
||||
// auth
|
||||
static schema_ptr roles();
|
||||
|
||||
@@ -195,7 +195,7 @@ public:
|
||||
return mutation_reader(std::make_unique<build_progress_reader>(
|
||||
s,
|
||||
std::move(permit),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
range,
|
||||
slice,
|
||||
std::move(trace_state),
|
||||
|
||||
@@ -3707,7 +3707,7 @@ void validate_view_keyspace(const data_dictionary::database& db, std::string_vie
|
||||
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs);
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const std::invalid_argument& e) {
|
||||
throw std::logic_error(fmt::format(
|
||||
"Materialized views and secondary indexes are not supported on the keyspace '{}': {}",
|
||||
keyspace_name, e.what()));
|
||||
|
||||
@@ -68,6 +68,8 @@ public:
|
||||
.with_column("peer", inet_addr_type, column_kind::partition_key)
|
||||
.with_column("dc", utf8_type)
|
||||
.with_column("up", boolean_type)
|
||||
.with_column("draining", boolean_type)
|
||||
.with_column("excluded", boolean_type)
|
||||
.with_column("status", utf8_type)
|
||||
.with_column("load", utf8_type)
|
||||
.with_column("tokens", int32_type)
|
||||
@@ -107,8 +109,11 @@ public:
|
||||
|
||||
if (tm.get_topology().has_node(hostid)) {
|
||||
// Not all entries in gossiper are present in the topology
|
||||
sstring dc = tm.get_topology().get_location(hostid).dc;
|
||||
auto& node = tm.get_topology().get_node(hostid);
|
||||
sstring dc = node.dc_rack().dc;
|
||||
set_cell(cr, "dc", dc);
|
||||
set_cell(cr, "draining", node.is_draining());
|
||||
set_cell(cr, "excluded", node.is_excluded());
|
||||
}
|
||||
|
||||
if (ownership.contains(eps.get_ip())) {
|
||||
@@ -1134,6 +1139,8 @@ public:
|
||||
set_cell(r.cells(), "dc", node.dc());
|
||||
set_cell(r.cells(), "rack", node.rack());
|
||||
set_cell(r.cells(), "up", _gossiper.local().is_alive(host));
|
||||
set_cell(r.cells(), "draining", node.is_draining());
|
||||
set_cell(r.cells(), "excluded", node.is_excluded());
|
||||
if (auto ip = _gossiper.local().get_address_map().find(host)) {
|
||||
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
|
||||
}
|
||||
@@ -1144,6 +1151,9 @@ public:
|
||||
if (stats && stats->capacity.contains(host)) {
|
||||
auto capacity = stats->capacity.at(host);
|
||||
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
|
||||
if (auto ts_iter = stats->tablet_stats.find(host); ts_iter != stats->tablet_stats.end()) {
|
||||
set_cell(r.cells(), "effective_capacity", data_value(int64_t(ts_iter->second.effective_capacity)));
|
||||
}
|
||||
|
||||
if (auto utilization = load.get_allocated_utilization(host)) {
|
||||
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
|
||||
@@ -1168,9 +1178,12 @@ private:
|
||||
.with_column("rack", utf8_type)
|
||||
.with_column("ip", inet_addr_type)
|
||||
.with_column("up", boolean_type)
|
||||
.with_column("draining", boolean_type)
|
||||
.with_column("excluded", boolean_type)
|
||||
.with_column("tablets_allocated", long_type)
|
||||
.with_column("tablets_allocated_per_shard", double_type)
|
||||
.with_column("storage_capacity", long_type)
|
||||
.with_column("effective_capacity", long_type)
|
||||
.with_column("storage_allocated_load", long_type)
|
||||
.with_column("storage_allocated_utilization", double_type)
|
||||
.with_column("storage_load", long_type)
|
||||
@@ -1484,7 +1497,7 @@ future<> initialize_virtual_tables(
|
||||
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
|
||||
|
||||
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
|
||||
db.find_column_family(system_keyspace::v3::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
|
||||
}
|
||||
|
||||
|
||||
4
dist/common/scripts/scylla_coredump_setup
vendored
4
dist/common/scripts/scylla_coredump_setup
vendored
@@ -114,6 +114,10 @@ WantedBy=local-fs.target scylla-server.service
|
||||
'''[1:-1]
|
||||
with open('/etc/systemd/system/var-lib-systemd-coredump.mount', 'w') as f:
|
||||
f.write(dot_mount)
|
||||
# in case we have old mounts in deleted state hanging around from older installation
|
||||
# systemd doesn't seem to be able to deal with those properly, and assume they are still active
|
||||
# and doesn't do anything about them
|
||||
run('umount /var/lib/systemd/coredump', shell=True, check=False)
|
||||
os.makedirs('/var/lib/scylla/coredump', exist_ok=True)
|
||||
systemd_unit.reload()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').enable()
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
# Remove an outdated KB
|
||||
|
||||
/stable/kb/perftune-modes-sync.html: /stable/kb/index.html
|
||||
|
||||
# Move the diver information to another project
|
||||
|
||||
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
|
||||
|
||||
@@ -190,7 +190,7 @@ then every rack in every datacenter receives a replica, except for racks compris
|
||||
of only :doc:`zero-token nodes </architecture/zero-token-nodes>`. Racks added after
|
||||
the keyspace creation do not receive replicas.
|
||||
|
||||
When ``rf_rack_valid_keyspaces``` is enabled in the config and the keyspace is tablet-based,
|
||||
When ``enforce_rack_list`` (or (deprecated) ``rf_rack_valid_keyspaces``) is enabled in the config and the keyspace is tablet-based,
|
||||
the numeric replication factor is automatically expanded into a rack list when the statement is
|
||||
executed, which can be observed in the DESCRIBE output afterwards. If the numeric RF is smaller than
|
||||
the number of racks in a DC, a subset of racks is chosen arbitrarily.
|
||||
|
||||
@@ -241,8 +241,8 @@ Currently, the possible orderings are limited by the :ref:`clustering order <clu
|
||||
|
||||
.. _vector-queries:
|
||||
|
||||
Vector queries
|
||||
~~~~~~~~~~~~~~
|
||||
Vector queries :label-note:`ScyllaDB Cloud`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ``ORDER BY`` clause can also be used with vector columns to perform the approximate nearest neighbor (ANN) search.
|
||||
When using vector columns, the syntax is as follows:
|
||||
@@ -280,11 +280,25 @@ For example::
|
||||
FROM ImageEmbeddings
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
.. warning::
|
||||
|
||||
Currently, vector queries do not support filtering with ``WHERE`` clause,
|
||||
grouping with ``GROUP BY`` and paging. This will be added in the future releases.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
|
||||
For example::
|
||||
|
||||
SELECT image_id FROM ImageEmbeddings
|
||||
WHERE user_id = 'user123'
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
|
||||
|
||||
Other filtering scenarios are currently not supported.
|
||||
|
||||
.. note::
|
||||
|
||||
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
|
||||
Vector indexes do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information
|
||||
about Vector Search is available in the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
.. _limit-clause:
|
||||
|
||||
|
||||
@@ -129,17 +129,15 @@ More on :doc:`Local Secondary Indexes </features/local-secondary-indexes>`
|
||||
|
||||
.. _create-vector-index-statement:
|
||||
|
||||
Vector Index :label-caution:`Experimental` :label-note:`ScyllaDB Cloud`
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Vector Index :label-note:`ScyllaDB Cloud`
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. note::
|
||||
|
||||
Vector indexes are supported in ScyllaDB Cloud only in the clusters that have the vector search feature enabled.
|
||||
Moreover, vector indexes are an experimental feature that:
|
||||
|
||||
* is not suitable for production use,
|
||||
* does not guarantee backward compatibility between ScyllaDB versions,
|
||||
* does not support all the features of ScyllaDB (e.g., tracing, filtering, TTL).
|
||||
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
|
||||
Vector indexes do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information
|
||||
about Vector Search is available in the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
|
||||
similarity search on vector data.
|
||||
@@ -177,6 +175,26 @@ The following options are supported for vector indexes. All of them are optional
|
||||
| | as ``efSearch``. Higher values lead to better recall (i.e., more relevant results are found) | |
|
||||
| | but increase query latency. Supported values are integers between 1 and 4096. | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
| ``quantization`` | The quantization method to use for compressing vectors in Vector Index. Vectors in base table | ``f32`` |
|
||||
| | are never compressed. Supported values (case-insensitive) are: | |
|
||||
| | | |
|
||||
| | * ``f32``: 32-bit single-precision IEEE 754 floating-point. | |
|
||||
| | * ``f16``: 16-bit standard half-precision floating-point (IEEE 754). | |
|
||||
| | * ``bf16``: 16-bit "Brain" floating-point (optimized for ML workloads). | |
|
||||
| | * ``i8``: 8-bit signed integer. | |
|
||||
| | * ``b1``: 1-bit binary value (packed 8 per byte). | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
| ``oversampling`` | A multiplier for the candidate set size during the search phase. For example, if a query asks for 10 | ``1.0`` |
|
||||
| | similar vectors (``LIMIT 10``) and ``oversampling`` is 2.0, the search will initially retrieve 20 | |
|
||||
| | candidates. This can improve accuracy at the cost of latency. Supported values are | |
|
||||
| | floating-point numbers between 1.0 (no oversampling) and 100.0. | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
| ``rescoring`` | Flag enabling recalculation of similarity scores with full precision and re-ranking of the candidate set.| ``false`` |
|
||||
| | Valid only for quantization below ``f32``. Supported values are: | |
|
||||
| | | |
|
||||
| | * ``true``: Enable rescoring. | |
|
||||
| | * ``false``: Disable rescoring. | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
|
||||
|
||||
.. _drop-index-statement:
|
||||
|
||||
@@ -665,6 +665,14 @@ it is not possible to update only some elements of a vector (without updating th
|
||||
Types stored in a vector are not implicitly frozen, so if you want to store a frozen collection or
|
||||
frozen UDT in a vector, you need to explicitly wrap them using `frozen` keyword.
|
||||
|
||||
.. note::
|
||||
|
||||
The main application of vectors is to support vector search capabilities, which
|
||||
are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
|
||||
Note that Vector Search clusters do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information
|
||||
about Vector Search is available in the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
.. .. _custom-types:
|
||||
|
||||
.. Custom Types
|
||||
|
||||
@@ -221,6 +221,87 @@ scylla-bucket/prefix/
|
||||
```
|
||||
See the API [documentation](#copying-sstables-on-s3-backup) for more details about the actual backup request.
|
||||
|
||||
### The snapshot manifest
|
||||
|
||||
Each table snapshot directory contains a manifest.json file that lists the contents of the snapshot and some metadata.
|
||||
The json structure is as follows:
|
||||
```
|
||||
{
|
||||
"manifest": {
|
||||
"version": "1.0",
|
||||
"scope": "node"
|
||||
},
|
||||
"node": {
|
||||
"host_id": "<UUID>",
|
||||
"datacenter": "mydc",
|
||||
"rack": "myrack"
|
||||
},
|
||||
"snapshot": {
|
||||
"name": "snapshot name",
|
||||
"created_at": seconds_since_epoch,
|
||||
"expires_at": seconds_since_epoch | null,
|
||||
},
|
||||
"table": {
|
||||
"keyspace_name": "my_keyspace",
|
||||
"table_name": "my_table",
|
||||
"table_id": "<UUID>",
|
||||
"tablets_type": "none|powof2",
|
||||
"tablet_count": N
|
||||
},
|
||||
"sstables": [
|
||||
{
|
||||
"id": "67e35000-d8c6-11f0-9599-060de9f3bd1b",
|
||||
"toc_name": "me-3gw7_0ndy_3wlq829wcsddgwha1n-big-TOC.txt",
|
||||
"data_size": 75,
|
||||
"index_size": 8,
|
||||
"first_token": -8629266958227979430,
|
||||
"last_token": 9168982884335614769,
|
||||
},
|
||||
{
|
||||
"id": "67e35000-d8c6-11f0-85dc-0625e9f3bd1b",
|
||||
"toc_name": "me-3gw7_0ndy_3wlq821a6cqlbmxrtn-big-TOC.txt",
|
||||
"data_size": 73,
|
||||
"index_size": 8,
|
||||
"first_token": 221146791717891383,
|
||||
"last_token": 7354559975791427036,
|
||||
},
|
||||
...
|
||||
],
|
||||
"files": [ ... ]
|
||||
}
|
||||
|
||||
The `manifest` member contains the following attributes:
|
||||
- `version` - respresenting the version of the manifest itself. It is incremented when members are added or removed from the manifest.
|
||||
- `scope` - the scope of metadata stored in this manifest file. The following scopes are supported:
|
||||
- `node` - the manifest describes all SSTables owned by this node in this snapshot.
|
||||
|
||||
The `node` member contains metadata about this node that enables datacenter- or rack-aware restore.
|
||||
- `host_id` - is the node's unique host_id (a UUID).
|
||||
- `datacenter` - is the node's datacenter.
|
||||
- `rack` - is the node's rack.
|
||||
|
||||
The `snapshot` member contains metadata about the snapshot.
|
||||
- `name` - is the snapshot name (a.k.a. `tag`)
|
||||
- `created_at` - is the time when the snapshot was created.
|
||||
- `expires_at` - is an optional time when the snapshot expires and can be dropped, if a TTL was set for the snapshot. If there is no TTL, `expires_at` may be omitted, set to null, or set to 0.
|
||||
|
||||
The `table` member contains metadata about the table being snapshot.
|
||||
- `keyspace_name` and `table_name` - are self-explanatory.
|
||||
- `table_id` - a universally unique identifier (UUID) of the table set when the table is created.
|
||||
- `tablets_type`:
|
||||
- `none` - if the keyspace uses vnodes replication
|
||||
- `powof2` - if the keyspace uses tables replication, and the tablet token ranges are based on powers of 2.
|
||||
- `tablet_count` - Optional. If `tablets_type` is not `none`, contains the number of tablets allcated in the table. If `tablets_type` is `powof2`, tablet_count would be a power of 2.
|
||||
|
||||
The `sstables` member is a list containing metadata about the SSTables in the snapshot.
|
||||
- `id` - is the STable's unique id (a UUID). It is carried over with the SSTable when it's streamed as part of tablet migration, even if it gets a new generation.
|
||||
- `toc_name` - is the name of the SSTable Table Of Contents (TOC) component.
|
||||
- `data_size` and `index_size` - are the sizes of the SSTable's data and index components, respectively. They can be used to estimate how much disk space is needed for restore.
|
||||
- `first_token` and `last_token` - are the first and last tokens in the SSTable, respectively. They can be used to determine if a SSTable is fully contained in a (tablet) token range to enable efficient file-based streaming of the SSTable.
|
||||
|
||||
The optional `files` member may contain a list of non-SSTable files included in the snapshot directory, not including the manifest.json file and schema.cql.
|
||||
```
|
||||
|
||||
3. `CREATE KEYSPACE` with S3/GS storage
|
||||
|
||||
When creating a keyspace with S3/GS storage, the data is stored under the bucket passed as argument to the `CREATE KEYSPACE` statement.
|
||||
|
||||
@@ -6,10 +6,10 @@ same amount of disk space. This means that the number of tablets located on a no
|
||||
proportional to the gross disk capacity of that node. Because the used disk space of
|
||||
different tablets can vary greatly, this could create imbalance in disk utilization.
|
||||
|
||||
Size based load balancing aims to achieve better disk utilization accross nodes in a
|
||||
Size based load balancing aims to achieve better disk utilization across nodes in a
|
||||
cluster. The load balancer will continuously gather information about available disk
|
||||
space and tablet sizes from all the nodes. It then incrementally computes tablet
|
||||
migration plans which equalize disk utilization accross the cluster.
|
||||
migration plans which equalize disk utilization across the cluster.
|
||||
|
||||
# Basic operation
|
||||
|
||||
@@ -75,7 +75,7 @@ migrations), and will wait for correct tablet sizes to arrive after the next ``l
|
||||
refresh by the topology coordinator.
|
||||
|
||||
One exception to this are nodes which have been excluded from the cluster. These nodes
|
||||
are down and therefor are not able to send fresh ``load_stats``. But they have to be drained
|
||||
are down and therefore are not able to send fresh ``load_stats``. But they have to be drained
|
||||
of their tablets (via tablet rebuild), and the balancer must do this even with incomplete
|
||||
tablet data. So, only excluded nodes are allowed to have missing tablet sizes.
|
||||
|
||||
|
||||
@@ -357,6 +357,7 @@ Schema:
|
||||
CREATE TABLE system.load_per_node (
|
||||
node uuid PRIMARY KEY,
|
||||
dc text,
|
||||
effective_capacity bigint,
|
||||
rack text,
|
||||
storage_allocated_load bigint,
|
||||
storage_allocated_utilization double,
|
||||
@@ -372,6 +373,7 @@ Columns:
|
||||
* `storage_allocated_load` - Disk space allocated for tablets, assuming each tablet has a fixed size (target_tablet_size).
|
||||
* `storage_allocated_utilization` - Fraction of node's disk capacity taken for `storage_allocated_load`, where 1.0 means full utilization.
|
||||
* `storage_capacity` - Total disk capacity in bytes. Used to compute `storage_allocated_utilization`. By default equal to file system's capacity.
|
||||
* `effective_capacity` - Sum of available disk space and tablet sizes on a node. Used to compute load on a node for size based balancing.
|
||||
* `storage_load` - Disk space allocated for tablets, computed with actual tablet sizes. Can be null if some of the tablet sizes are not known.
|
||||
* `storage_utilization` - Fraction of node's disk capacity taken for `storage_load` (with actual tablet sizes), where 1.0 means full utilization. Can be null if some of the tablet sizes are not known.
|
||||
* `tablets_allocated` - Number of tablet replicas on the node. Migrating tablets are accounted as if migration already finished.
|
||||
|
||||
@@ -46,14 +46,11 @@ stateDiagram-v2
|
||||
state replacing {
|
||||
rp_join_group0 : join_group0
|
||||
rp_left_token_ring : left_token_ring
|
||||
rp_tablet_draining : tablet_draining
|
||||
rp_write_both_read_old : write_both_read_old
|
||||
rp_write_both_read_new : write_both_read_new
|
||||
[*] --> rp_join_group0
|
||||
rp_join_group0 --> rp_left_token_ring: rollback
|
||||
rp_join_group0 --> rp_tablet_draining
|
||||
rp_tablet_draining --> rp_write_both_read_old
|
||||
rp_tablet_draining --> rp_left_token_ring: rollback
|
||||
rp_join_group0 --> rp_write_both_read_old
|
||||
rp_join_group0 --> [*]: rejected
|
||||
rp_write_both_read_old --> rp_write_both_read_new: streaming completed
|
||||
rp_write_both_read_old --> rp_left_token_ring: rollback
|
||||
@@ -69,34 +66,34 @@ stateDiagram-v2
|
||||
normal --> decommissioning: leave
|
||||
normal --> removing: remove
|
||||
state decommissioning {
|
||||
de_tablet_migration0 : tablet_migration (draining)
|
||||
de_tablet_migration1 : tablet_migration
|
||||
[*] --> de_tablet_migration0
|
||||
de_tablet_migration0 --> [*]: aborted
|
||||
de_left_token_ring : left_token_ring
|
||||
de_tablet_draining : tablet_draining
|
||||
de_tablet_migration : tablet_migration
|
||||
de_write_both_read_old: write_both_read_old
|
||||
de_write_both_read_new : write_both_read_new
|
||||
de_rollback_to_normal : rollback_to_normal
|
||||
[*] --> de_tablet_draining
|
||||
de_tablet_draining --> de_rollback_to_normal: rollback
|
||||
de_rollback_to_normal --> de_tablet_migration
|
||||
de_tablet_draining --> de_write_both_read_old
|
||||
de_tablet_migration --> [*]
|
||||
de_rollback_to_normal --> de_tablet_migration1
|
||||
de_tablet_migration0 --> de_write_both_read_old
|
||||
de_tablet_migration1 --> [*]
|
||||
de_write_both_read_old --> de_write_both_read_new: streaming completed
|
||||
de_write_both_read_old --> de_rollback_to_normal: rollback
|
||||
de_write_both_read_new --> de_left_token_ring
|
||||
de_left_token_ring --> [*]
|
||||
}
|
||||
state removing {
|
||||
re_tablet_migration0 : tablet_migration (draining)
|
||||
re_tablet_migration1 : tablet_migration
|
||||
[*] --> re_tablet_migration0
|
||||
re_tablet_migration0 --> [*]: aborted
|
||||
re_left_token_ring : left_token_ring
|
||||
re_tablet_draining : tablet_draining
|
||||
re_tablet_migration : tablet_migration
|
||||
re_write_both_read_old : write_both_read_old
|
||||
re_write_both_read_new : write_both_read_new
|
||||
re_rollback_to_normal : rollback_to_normal
|
||||
[*] --> re_tablet_draining
|
||||
re_tablet_draining --> re_rollback_to_normal: rollback
|
||||
re_rollback_to_normal --> re_tablet_migration
|
||||
re_tablet_migration --> [*]
|
||||
re_tablet_draining --> re_write_both_read_old
|
||||
re_rollback_to_normal --> re_tablet_migration1
|
||||
re_tablet_migration1 --> [*]
|
||||
re_tablet_migration0 --> re_write_both_read_old
|
||||
re_write_both_read_old --> re_write_both_read_new: streaming completed
|
||||
re_write_both_read_old --> re_rollback_to_normal: rollback
|
||||
re_write_both_read_new --> re_left_token_ring
|
||||
@@ -181,6 +178,27 @@ are the currently supported global topology operations:
|
||||
contain replicas of the table being truncated. It uses [sessions](#Topology guards)
|
||||
to make sure that no stale RPCs are executed outside of the scope of the request.
|
||||
|
||||
## Tablet draining
|
||||
|
||||
Presence of node requests of type `leave` (decommission) and `remove` will cause the load
|
||||
balancer to start migrating tablets away from those nodes. Until they are drained
|
||||
of tablets, the requests are in paused state and are not picked by topology
|
||||
coordinator for execution.
|
||||
|
||||
Paused requests are also pending, the "topology_request" field is engaged.
|
||||
The node's state is still `normal` when request is paused.
|
||||
|
||||
Canceling the requests will stop the draining process, because absence of a request
|
||||
lifts the draining state on the node.
|
||||
|
||||
When tablet scheduler is done with migrating all tablets away from a draining node,
|
||||
the associated request will be unpaused, and can be picked by topology coordinator
|
||||
for execution. This time it will enter the `write_both_read_old` transition and proceed
|
||||
with the vnode part.
|
||||
|
||||
This allows multiple requests to drain tablets in parallel, and other requests to not be blocked
|
||||
by long `leave` and `remove` requests.
|
||||
|
||||
## Zero-token nodes
|
||||
|
||||
Zero-token nodes (the nodes started with `join_ring=false`) never own tokens or become
|
||||
@@ -221,12 +239,6 @@ that there are no tablet transitions in the system.
|
||||
|
||||
Tablets are migrated in parallel and independently.
|
||||
|
||||
There is a variant of tablet migration track called tablet draining track, which is invoked
|
||||
as a step of certain topology operations (e.g. decommission, removenode). Its goal is to readjust tablet replicas
|
||||
so that a given topology change can proceed. For example, when decommissioning a node, we
|
||||
need to migrate tablet replicas away from the node being decommissioned.
|
||||
Tablet draining happens before making changes to vnode-based replication.
|
||||
|
||||
## Node replace with tablets
|
||||
|
||||
Tablet replicas on the replaced node are rebuilt after the replacing node is already in the normal state and
|
||||
|
||||
@@ -28,8 +28,7 @@ Incremental Repair is only supported for tables that use the tablets architectur
|
||||
Incremental Repair Modes
|
||||
------------------------
|
||||
|
||||
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
|
||||
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
|
||||
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
|
||||
|
||||
The available modes are:
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ Compaction Strategies with Materialized Views
|
||||
Materialized views, just like regular tables, use one of the available :doc:`compaction strategies </architecture/compaction/compaction-strategies>`.
|
||||
When a materialized view is created, it does not inherit its base table compaction strategy settings, because the data model
|
||||
of a view does not necessarily have the same characteristics as the one from its base table.
|
||||
Instead, the default compaction strategy (SizeTieredCompactionStrategy) is used.
|
||||
Instead, the default compaction strategy (IncrementalCompactionStrategy) is used.
|
||||
|
||||
A compaction strategy for a new materialized view can be explicitly set during its creation, using the following command:
|
||||
|
||||
|
||||
@@ -57,7 +57,6 @@ Knowledge Base
|
||||
* :doc:`Customizing CPUSET </kb/customizing-cpuset>`
|
||||
* :doc:`Recreate RAID devices </kb/raid-device>` - How to recreate your RAID devices without running scylla-setup
|
||||
* :doc:`Configure ScyllaDB Networking with Multiple NIC/IP Combinations </kb/yaml-address>` - examples for setting the different IP addresses in scylla.yaml
|
||||
* :doc:`Updating the Mode in perftune.yaml After a ScyllaDB Upgrade </kb/perftune-modes-sync>`
|
||||
* :doc:`Kafka Sink Connector Quickstart </using-scylla/integrations/kafka-connector>`
|
||||
* :doc:`Kafka Sink Connector Configuration </using-scylla/integrations/sink-config>`
|
||||
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
==============================================================
|
||||
Updating the Mode in perftune.yaml After a ScyllaDB Upgrade
|
||||
==============================================================
|
||||
|
||||
We improved ScyllaDB's performance by `removing the rx_queues_count from the mode
|
||||
condition <https://github.com/scylladb/seastar/pull/949>`_. As a result, ScyllaDB operates in
|
||||
the ``sq_split`` mode instead of the ``mq`` mode (see :doc:`Seastar Perftune </operating-scylla/admin-tools/perftune>` for information about the modes).
|
||||
If you upgrade from an earlier version of ScyllaDB, your cluster's existing nodes may use the ``mq`` mode,
|
||||
while new nodes will use the ``sq_split`` mode. As using different modes across one cluster is not recommended,
|
||||
you should change the configuration to ensure that the ``sq_split`` mode is used on all nodes.
|
||||
|
||||
This section describes how to update the `perftune.yaml` file to configure the ``sq_split`` mode on all nodes.
|
||||
|
||||
Procedure
|
||||
------------
|
||||
The examples below assume that you are using the default locations for storing data and the `scylla.yaml` file,
|
||||
and that your NIC is ``eth5``.
|
||||
|
||||
#. Backup your old configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo mv /etc/scylla.d/cpuset.conf /etc/scylla.d/cpuset.conf.old
|
||||
sudo mv /etc/scylla.d/perftune.yaml /etc/scylla.d/perftune.yaml.old
|
||||
|
||||
#. Create a new configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo scylla_sysconfig_setup --nic eth5 --homedir /var/lib/scylla --confdir /etc/scylla
|
||||
|
||||
A new ``/etc/scylla.d/cpuset.conf`` will be generated on the output.
|
||||
|
||||
#. Compare the contents of the newly generated ``/etc/scylla.d/cpuset.conf`` with ``/etc/scylla.d/cpuset.conf.old`` you created in step 1.
|
||||
|
||||
- If they are exactly the same, rename ``/etc/scylla.d/perftune.yaml.old`` you created in step 1 back to ``/etc/scylla.d/perftune.yaml`` and continue to the next node.
|
||||
- If they are different, move on to the next steps.
|
||||
|
||||
#. Restart the ``scylla-server`` service.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool drain
|
||||
sudo systemctl restart scylla-server
|
||||
|
||||
#. Wait for the service to become up and running (similarly to how it is done during a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart>`). It may take a considerable amount of time before the node is in the UN state due to resharding.
|
||||
|
||||
#. Continue to the next node.
|
||||
@@ -25,4 +25,8 @@ Port Description Protocol
|
||||
19142 Native shard-aware transport port (ssl) TCP
|
||||
====== ============================================ ========
|
||||
|
||||
If you're using ScyllaDB Alternator, ensure that the ports configured
|
||||
for Alternator with the ``alternator_port`` or ``alternator_https_port`` parameter
|
||||
are open. See :doc:`ScyllaDB Alternator </alternator/alternator>` for details.
|
||||
|
||||
.. note:: For ScyllaDB Manager ports, see the `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_ documentation.
|
||||
|
||||
@@ -93,6 +93,25 @@ API calls
|
||||
|
||||
Cluster tasks are not unregistered from task manager with API calls.
|
||||
|
||||
Node operations module
|
||||
----------------------
|
||||
|
||||
There is a module named ``node_ops``, which allows tracking node operations: decommission, removenode, bootstrap, replace, rebuild.
|
||||
|
||||
The ``type`` field designates the operation, and is one of:
|
||||
- ``decommission``
|
||||
- ``remove node``
|
||||
- ``bootstrap``
|
||||
- ``replace``
|
||||
- ``rebuild``
|
||||
|
||||
The ``scope`` and ``kind`` fields are set to ``cluster``.
|
||||
|
||||
The ``entity`` field holds the host id of the node which is being operated on, as long as the request
|
||||
is not finished. In case of the ``replace`` operation, it will hold the host id of the replacing node.
|
||||
|
||||
``decommission`` and ``remove node`` tasks are abortable, but only before they finish tablet migration.
|
||||
|
||||
Tasks API
|
||||
---------
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
|
||||
|
||||
nodetool cluster repair --tablet-tokens 1,10474535988
|
||||
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
|
||||
|
||||
For example:
|
||||
|
||||
|
||||
@@ -29,5 +29,13 @@ Before you run ``nodetool decommission``:
|
||||
request may fail.
|
||||
In such a case, ALTER the keyspace to reduce the RF before running ``nodetool decommission``.
|
||||
|
||||
It's allowed to invoke ``nodetool decommission`` on multiple nodes in parallel. This will be faster than doing
|
||||
it sequentially if there is significant amount of data in tablet-based keyspaces, because
|
||||
tablets are migrated from nodes in parallel. Decommission process first migrates tablets away, and this
|
||||
part is done in parallel for all nodes being decommissioned. Then it does the vnode-based decommission, and
|
||||
this part is serialized with other vnode-based operations, including those from other decommission operations.
|
||||
|
||||
Decommission which is still in the tablet draining phase can be canceled using Task Manager API.
|
||||
See :doc:`Task manager </operating-scylla/admin-tools/task-manager>`.
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -56,6 +56,17 @@ To only mark the node as permanently down without doing actual removal, use :doc
|
||||
|
||||
.. _removenode-ignore-dead-nodes:
|
||||
|
||||
It's allowed to invoke ``nodetool removenode`` on multiple nodes in parallel. This will be faster than doing
|
||||
it sequentially if there is significant amount of data in tablet-based keyspaces, because
|
||||
tablets which belong to removed nodes will be rebuilt in parallel. Node removal first migrates tablets to new
|
||||
replicas, in parallel for all nodes being removed. Then does the part which executes
|
||||
removal for the vnode-based keyspaces, and this is serialized with other vnode-based operations, including
|
||||
those from other removenode operations.
|
||||
|
||||
Removenode which is still in the tablet rebuild phase can be canceled using Task Manager API.
|
||||
Tablets which are already rebuilt will remain on their new replicas.
|
||||
See :doc:`Task manager </operating-scylla/admin-tools/task-manager>`.
|
||||
|
||||
Ignoring Dead Nodes
|
||||
---------------------
|
||||
|
||||
|
||||
@@ -177,8 +177,8 @@ To display the log classes (output changes with each version so your display may
|
||||
storage_proxy
|
||||
storage_service
|
||||
stream_session
|
||||
strongly_consistent_modification_statement
|
||||
strongly_consistent_select_statement
|
||||
broadcast_modification_statement
|
||||
broadcast_select_statement
|
||||
system_distributed_keyspace
|
||||
system_keyspace
|
||||
table
|
||||
|
||||
@@ -17,7 +17,7 @@ Glossary
|
||||
The CAP Theorem is the notion that **C** (Consistency), **A** (Availability) and **P** (Partition Tolerance) of data are mutually dependent in a distributed system. Increasing any 2 of these factors will reduce the third. ScyllaDB chooses availability and partition tolerance over consistency. See :doc:`Fault Tolerance </architecture/architecture-fault-tolerance>`.
|
||||
|
||||
Cluster
|
||||
One or multiple ScyllaDB nodes, acting in concert, which own a single contiguous token range. State is communicated between nodes in the cluster via the Gossip protocol. See :doc:`Ring Architecture </architecture/ringarchitecture/index>`.
|
||||
One or multiple ScyllaDB nodes, acting in concert, which own a single contiguous token range. State is communicated between nodes in the cluster via :doc:`Raft </architecture/raft>`.
|
||||
|
||||
Clustering Key
|
||||
A single or multi-column clustering key determines a row’s uniqueness and sort order on disk within a partition. See :doc:`Ring Architecture </architecture/ringarchitecture/index>`.
|
||||
@@ -157,14 +157,18 @@ Glossary
|
||||
Table
|
||||
A collection of columns fetched by row. Columns are ordered by Clustering Key. See :doc:`Ring Architecture </architecture/ringarchitecture/index>`.
|
||||
|
||||
Tablet
|
||||
The name of a :term:`Token Range` in the `Tablet Model </architecture/tablets>` of assigning ownership of data in a cluster.
|
||||
|
||||
Time-window compaction strategy
|
||||
TWCS is designed for time series data. See :doc:`Compaction Strategies</architecture/compaction/compaction-strategies/>`.
|
||||
|
||||
Token
|
||||
A value in a range, used to identify both nodes and partitions. Each node in a ScyllaDB cluster is given an (initial) token, which defines the end of the range a node handles. See :doc:`Ring Architecture </architecture/ringarchitecture/index>`.
|
||||
A hash value calculated over the partition key of a table. All tokens calculated over any partition key of any table are part the same token domain. This allows unified assignment of partitions to nodes in a ScyllaDB cluster via using :term:`Token Range` as the basis of ownership assignment.
|
||||
|
||||
Token Range
|
||||
The total range of potential unique identifiers supported by the partitioner. By default, each ScyllaDB node in the cluster handles 256 token ranges. Each token range corresponds to a Vnode. Each range of hashes in turn is a segment of the total range of a given hash function. See :doc:`Ring Architecture </architecture/ringarchitecture/index>`.
|
||||
A contiguous range of :term:`Token` values, the basis of ownership assignment in a ScyllaDB cluster. A token range is defined by a start and end token. Each node in a ScyllaDB cluster owns a number of token ranges.
|
||||
There are two models of distributing token ranges in the cluster, the :doc:`VNode Model </architecture/ringarchitecture/index>` and the `Tablet Model </architecture/tablets>`.
|
||||
|
||||
Tombstone
|
||||
A marker that indicates that data has been deleted. A large number of tombstones may impact read performance and disk usage, so an efficient tombstone garbage collection strategy should be employed. See :ref:`Tombstones GC options <ddl-tombstones-gc>`.
|
||||
|
||||
@@ -47,7 +47,6 @@
|
||||
|
||||
namespace encryption {
|
||||
|
||||
static auto constexpr KSNAME = "system_replicated_keys";
|
||||
static auto constexpr TABLENAME = "encrypted_keys";
|
||||
|
||||
static logger log("replicated_key_provider");
|
||||
@@ -182,7 +181,7 @@ future<::shared_ptr<cql3::untyped_result_set>> replicated_key_provider::query(ss
|
||||
future<> replicated_key_provider::force_blocking_flush() {
|
||||
return _ctxt.get_database().invoke_on_all([](replica::database& db) {
|
||||
// if (!Boolean.getBoolean("cassandra.unsafesystem"))
|
||||
replica::column_family& cf = db.find_column_family(KSNAME, TABLENAME);
|
||||
replica::column_family& cf = db.find_column_family(replicated_key_provider_factory::KSNAME, TABLENAME);
|
||||
return cf.flush();
|
||||
});
|
||||
}
|
||||
@@ -306,7 +305,7 @@ future<std::tuple<UUID, key_ptr>> replicated_key_provider::get_key(const key_inf
|
||||
if (id.id) {
|
||||
uuid = utils::UUID_gen::get_UUID(*id.id);
|
||||
log.debug("Finding key {} ({})", uuid, info);
|
||||
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? AND key_id=?;", KSNAME, TABLENAME);
|
||||
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? AND key_id=?;", replicated_key_provider_factory::KSNAME, TABLENAME);
|
||||
res = co_await query(std::move(s), _system_key->name(), cipher, int32_t(id.info.len), uuid);
|
||||
|
||||
// if we find nothing, and we actually queried a specific key (by uuid), we've failed.
|
||||
@@ -316,7 +315,7 @@ future<std::tuple<UUID, key_ptr>> replicated_key_provider::get_key(const key_inf
|
||||
}
|
||||
} else {
|
||||
log.debug("Finding key ({})", info);
|
||||
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? LIMIT 1;", KSNAME, TABLENAME);
|
||||
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? LIMIT 1;", replicated_key_provider_factory::KSNAME, TABLENAME);
|
||||
res = co_await query(std::move(s), _system_key->name(), cipher, int32_t(id.info.len));
|
||||
}
|
||||
|
||||
@@ -333,7 +332,7 @@ future<std::tuple<UUID, key_ptr>> replicated_key_provider::get_key(const key_inf
|
||||
auto ks = base64_encode(b);
|
||||
log.trace("Inserting generated key {}", uuid);
|
||||
co_await query(fmt::format("INSERT INTO {}.{} (key_file, cipher, strength, key_id, key) VALUES (?, ?, ?, ?, ?)",
|
||||
KSNAME, TABLENAME), _system_key->name(), cipher, int32_t(id.info.len), uuid, ks
|
||||
replicated_key_provider_factory::KSNAME, TABLENAME), _system_key->name(), cipher, int32_t(id.info.len), uuid, ks
|
||||
);
|
||||
log.trace("Flushing key table");
|
||||
co_await force_blocking_flush();
|
||||
@@ -366,8 +365,8 @@ future<> replicated_key_provider::validate() const {
|
||||
|
||||
schema_ptr encrypted_keys_table() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(KSNAME, TABLENAME);
|
||||
return schema_builder(KSNAME, TABLENAME, std::make_optional(id))
|
||||
auto id = generate_legacy_id(replicated_key_provider_factory::KSNAME, TABLENAME);
|
||||
return schema_builder(replicated_key_provider_factory::KSNAME, TABLENAME, std::make_optional(id))
|
||||
.with_column("key_file", utf8_type, column_kind::partition_key)
|
||||
.with_column("cipher", utf8_type, column_kind::partition_key)
|
||||
.with_column("strength", int32_type, column_kind::clustering_key)
|
||||
@@ -387,23 +386,23 @@ future<> replicated_key_provider::maybe_initialize_tables() {
|
||||
}
|
||||
|
||||
future<> replicated_key_provider::do_initialize_tables(::replica::database& db, service::migration_manager& mm) {
|
||||
if (db.has_schema(KSNAME, TABLENAME)) {
|
||||
if (db.has_schema(replicated_key_provider_factory::KSNAME, TABLENAME)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
log.debug("Creating keyspace and table");
|
||||
if (!db.has_keyspace(KSNAME)) {
|
||||
if (!db.has_keyspace(replicated_key_provider_factory::KSNAME)) {
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
try {
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
KSNAME,
|
||||
replicated_key_provider_factory::KSNAME,
|
||||
"org.apache.cassandra.locator.EverywhereStrategy",
|
||||
{},
|
||||
std::nullopt,
|
||||
std::nullopt,
|
||||
true);
|
||||
co_await mm.announce(service::prepare_new_keyspace_announcement(db, ksm, ts), std::move(group0_guard), fmt::format("encryption at rest: create keyspace {}", KSNAME));
|
||||
co_await mm.announce(service::prepare_new_keyspace_announcement(db, ksm, ts), std::move(group0_guard), fmt::format("encryption at rest: create keyspace {}", replicated_key_provider_factory::KSNAME));
|
||||
} catch (exceptions::already_exists_exception&) {
|
||||
}
|
||||
}
|
||||
@@ -411,10 +410,10 @@ future<> replicated_key_provider::do_initialize_tables(::replica::database& db,
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
try {
|
||||
co_await mm.announce(co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), encrypted_keys_table(), ts), std::move(group0_guard),
|
||||
fmt::format("encryption at rest: create table {}.{}", KSNAME, TABLENAME));
|
||||
fmt::format("encryption at rest: create table {}.{}", replicated_key_provider_factory::KSNAME, TABLENAME));
|
||||
} catch (exceptions::already_exists_exception&) {
|
||||
}
|
||||
auto& ks = db.find_keyspace(KSNAME);
|
||||
auto& ks = db.find_keyspace(replicated_key_provider_factory::KSNAME);
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
// should perhaps check name also..
|
||||
if (rs.get_type() != locator::replication_strategy_type::everywhere_topology) {
|
||||
|
||||
@@ -27,6 +27,8 @@ namespace encryption {
|
||||
|
||||
class replicated_key_provider_factory : public key_provider_factory {
|
||||
public:
|
||||
static constexpr const char* KSNAME = "system_replicated_keys";
|
||||
|
||||
replicated_key_provider_factory();
|
||||
~replicated_key_provider_factory();
|
||||
|
||||
|
||||
@@ -172,6 +172,7 @@ public:
|
||||
gms::feature topology_global_request_queue { *this, "TOPOLOGY_GLOBAL_REQUEST_QUEUE"sv };
|
||||
gms::feature lwt_with_tablets { *this, "LWT_WITH_TABLETS"sv };
|
||||
gms::feature repair_msg_split { *this, "REPAIR_MSG_SPLIT"sv };
|
||||
gms::feature parallel_tablet_draining { *this, "PARALLEL_TABLET_DRAINING"sv };
|
||||
gms::feature view_building_coordinator { *this, "VIEW_BUILDING_COORDINATOR"sv };
|
||||
gms::feature ms_sstable { *this, "MS_SSTABLE_FORMAT"sv };
|
||||
gms::feature rack_list_rf { *this, "RACK_LIST_RF"sv };
|
||||
|
||||
@@ -148,6 +148,9 @@ public:
|
||||
inet_address get_broadcast_address() const noexcept {
|
||||
return get_token_metadata_ptr()->get_topology().my_address();
|
||||
}
|
||||
inet_address get_cql_address() const noexcept {
|
||||
return get_token_metadata_ptr()->get_topology().my_cql_address();
|
||||
}
|
||||
const std::set<inet_address>& get_seeds() const noexcept;
|
||||
|
||||
public:
|
||||
|
||||
@@ -54,6 +54,7 @@ set(idl_headers
|
||||
sstables.idl.hh
|
||||
storage_proxy.idl.hh
|
||||
storage_service.idl.hh
|
||||
strong_consistency/state_machine.idl.hh
|
||||
group0_state_machine.idl.hh
|
||||
mapreduce_request.idl.hh
|
||||
replica_exception.idl.hh
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "idl/token.idl.hh"
|
||||
#include "idl/keys.idl.hh"
|
||||
#include "idl/uuid.idl.hh"
|
||||
#include "idl/position_in_partition.idl.hh"
|
||||
|
||||
namespace db {
|
||||
enum class read_repair_decision : uint8_t {
|
||||
@@ -22,19 +23,6 @@ enum class read_repair_decision : uint8_t {
|
||||
};
|
||||
}
|
||||
|
||||
enum class bound_weight : int8_t {
|
||||
before_all_prefixed = -1,
|
||||
equal = 0,
|
||||
after_all_prefixed = 1,
|
||||
}
|
||||
|
||||
enum class partition_region : uint8_t {
|
||||
partition_start,
|
||||
static_row,
|
||||
clustered,
|
||||
partition_end,
|
||||
};
|
||||
|
||||
namespace service {
|
||||
namespace pager {
|
||||
class paging_state {
|
||||
|
||||
20
idl/strong_consistency/state_machine.idl.hh
Normal file
20
idl/strong_consistency/state_machine.idl.hh
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "idl/frozen_mutation.idl.hh"
|
||||
#include "idl/uuid.idl.hh"
|
||||
|
||||
namespace service {
|
||||
namespace strong_consistency {
|
||||
|
||||
struct raft_command {
|
||||
frozen_mutation mutation;
|
||||
};
|
||||
|
||||
} // namespace strong_consistency
|
||||
} // namespace service
|
||||
@@ -227,7 +227,7 @@ public:
|
||||
_db,
|
||||
s,
|
||||
std::move(permit),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::BUILT_VIEWS),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::BUILT_VIEWS),
|
||||
range,
|
||||
slice,
|
||||
std::move(trace_state),
|
||||
|
||||
@@ -19,43 +19,114 @@
|
||||
#include "types/concrete_types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
namespace secondary_index {
|
||||
|
||||
template <int MAX>
|
||||
static void validate_positive_option(const sstring& value) {
|
||||
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
|
||||
int num_value;
|
||||
size_t len;
|
||||
try {
|
||||
num_value = std::stoi(value, &len);
|
||||
} catch (...) {
|
||||
throw exceptions::invalid_request_exception(format("Numeric option {} is not a valid number", value));
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is not an integer", value_name, value));
|
||||
}
|
||||
if (len != value.size()) {
|
||||
throw exceptions::invalid_request_exception(format("Numeric option {} is not a valid number", value));
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is not an integer", value_name, value));
|
||||
}
|
||||
|
||||
if (num_value <= 0 || num_value > MAX) {
|
||||
throw exceptions::invalid_request_exception(format("Numeric option {} out of valid range [1 - {}]", value, MAX));
|
||||
if (num_value <= 0 || num_value > max) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is out of valid range [1 - {}]", value_name, value, max));
|
||||
}
|
||||
}
|
||||
|
||||
static void validate_similarity_function(const sstring& value) {
|
||||
sstring similarity_function = value;
|
||||
std::transform(similarity_function.begin(), similarity_function.end(), similarity_function.begin(), ::tolower);
|
||||
if (similarity_function != "cosine" && similarity_function != "euclidean" && similarity_function != "dot_product") {
|
||||
throw exceptions::invalid_request_exception(format("Unsupported similarity function: {}", value));
|
||||
static void validate_factor_option(float min, float max, const sstring& value_name, const sstring& value) {
|
||||
float num_value;
|
||||
size_t len;
|
||||
try {
|
||||
num_value = std::stof(value, &len);
|
||||
} catch (...) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is not a float", value_name, value));
|
||||
}
|
||||
if (len != value.size()) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is not a float", value_name, value));
|
||||
}
|
||||
|
||||
if (!(num_value >= min && num_value <= max)) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid value in option '{}' for vector index: '{}' is out of valid range [{} - {}]", value_name, value, min, max));
|
||||
}
|
||||
}
|
||||
|
||||
const static std::unordered_map<sstring, std::function<void(const sstring&)>> vector_index_options = {
|
||||
{"similarity_function", validate_similarity_function},
|
||||
{"maximum_node_connections", validate_positive_option<512>},
|
||||
{"construction_beam_width", validate_positive_option<4096>},
|
||||
{"search_beam_width", validate_positive_option<4096>},
|
||||
static void validate_enumerated_option(const std::vector<sstring>& supported_values, const sstring& value_name, const sstring& value) {
|
||||
bool is_valid = std::any_of(supported_values.begin(), supported_values.end(),
|
||||
[&](const std::string& func) { return boost::iequals(value, func); });
|
||||
|
||||
if (!is_valid) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
seastar::format("Invalid value in option '{}' for vector index: '{}'. Supported are case-insensitive: {}",
|
||||
value_name,
|
||||
value,
|
||||
fmt::join(supported_values, ", ")));
|
||||
}
|
||||
}
|
||||
|
||||
static const std::vector<sstring> similarity_function_values = {
|
||||
"cosine", "euclidean", "dot_product"
|
||||
};
|
||||
|
||||
static const std::vector<sstring> quantization_values = {
|
||||
"f32", "f16", "bf16", "i8", "b1"
|
||||
};
|
||||
|
||||
static const std::vector<sstring> boolean_values = {
|
||||
"false", "true"
|
||||
};
|
||||
|
||||
const static std::unordered_map<sstring, std::function<void(const sstring&, const sstring&)>> vector_index_options = {
|
||||
// `similarity_function` defines method of calculating similarity between vectors
|
||||
// Used internally by vector store during both indexing and querying
|
||||
// CQL implements corresponding functions in cql3/functions/similarity_functions.hh
|
||||
{"similarity_function", std::bind_front(validate_enumerated_option, similarity_function_values)},
|
||||
// 'maximum_node_connections', 'construction_beam_width', 'search_beam_width' define HNSW index parameters
|
||||
// Used internally by vector store.
|
||||
{"maximum_node_connections", std::bind_front(validate_positive_option, 512)},
|
||||
{"construction_beam_width", std::bind_front(validate_positive_option, 4096)},
|
||||
{"search_beam_width", std::bind_front(validate_positive_option, 4096)},
|
||||
// 'quantization' enables compression of vectors in vector store (not in base table!)
|
||||
// Used internally by vector store. Scylla only checks it to enable rescoring.
|
||||
{"quantization", std::bind_front(validate_enumerated_option, quantization_values)},
|
||||
// 'oversampling' defines factor by which number of candidates retrieved from vector store is multiplied.
|
||||
// It can improve accuracy of ANN queries, especially for quantized vectors when combined with rescoring.
|
||||
// Used by Scylla during query processing to increase query limit sent to vector store.
|
||||
{"oversampling", std::bind_front(validate_factor_option, 1.0f, 100.0f)},
|
||||
// 'rescoring' enables recalculating of similarity scores of candidates retrieved from vector store when quantization is used.
|
||||
{"rescoring", std::bind_front(validate_enumerated_option, boolean_values)},
|
||||
};
|
||||
|
||||
bool vector_index::is_rescoring_enabled(const index_options_map& properties) {
|
||||
auto q = properties.find("quantization");
|
||||
auto r = properties.find("rescoring");
|
||||
return q != properties.end() && !boost::iequals(q->second, "f32")
|
||||
&& r != properties.end() && boost::iequals(r->second, "true");
|
||||
}
|
||||
|
||||
float vector_index::get_oversampling(const index_options_map& properties) {
|
||||
auto it = properties.find("oversampling");
|
||||
if (it != properties.end()) {
|
||||
return std::stof(it->second);
|
||||
}
|
||||
return 1.0f;
|
||||
}
|
||||
|
||||
sstring vector_index::get_cql_similarity_function_name(const index_options_map& properties) {
|
||||
auto it = properties.find("similarity_function");
|
||||
if (it != properties.end()) {
|
||||
return "similarity_" + boost::to_lower_copy(it->second);
|
||||
}
|
||||
return "similarity_cosine";
|
||||
}
|
||||
|
||||
bool vector_index::view_should_exist() const {
|
||||
return false;
|
||||
}
|
||||
@@ -139,7 +210,7 @@ void vector_index::check_index_options(const cql3::statements::index_specific_pr
|
||||
if (it == vector_index_options.end()) {
|
||||
throw exceptions::invalid_request_exception(format("Unsupported option {} for vector index", option.first));
|
||||
}
|
||||
it->second(option.second);
|
||||
it->second(option.first, option.second);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,10 @@ public:
|
||||
static bool has_vector_index(const schema& s);
|
||||
static bool has_vector_index_on_column(const schema& s, const sstring& target_name);
|
||||
static void check_cdc_options(const schema& schema);
|
||||
|
||||
static bool is_rescoring_enabled(const index_options_map& properties);
|
||||
static float get_oversampling(const index_options_map& properties);
|
||||
static sstring get_cql_similarity_function_name(const index_options_map& properties);
|
||||
private:
|
||||
void check_uses_tablets(const schema& schema, const data_dictionary::database& db) const;
|
||||
void check_cdc_not_explicitly_disabled(const schema& schema) const;
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include <variant>
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -178,6 +179,15 @@ size_t get_replication_factor(const replication_strategy_config_option& opt) {
|
||||
return replication_factor_data(opt).count();
|
||||
}
|
||||
|
||||
bool uses_rack_list_exclusively(const replication_strategy_config_options& opts) {
|
||||
for (auto& [_, val] : opts) {
|
||||
if (!std::holds_alternative<rack_list>(val)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
replication_factor_data::replication_factor_data(const replication_strategy_config_option& rf) {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (const sstring& rf) {
|
||||
|
||||
@@ -52,6 +52,8 @@ using replication_strategy_config_options = std::map<sstring, replication_strate
|
||||
// Throws configuration_exception when option is not a valid replication factor specifier.
|
||||
size_t get_replication_factor(const replication_strategy_config_option&);
|
||||
|
||||
bool uses_rack_list_exclusively(const replication_strategy_config_options&);
|
||||
|
||||
struct replication_strategy_params {
|
||||
const replication_strategy_config_options options;
|
||||
std::optional<unsigned> initial_tablets;
|
||||
|
||||
@@ -311,7 +311,8 @@ future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(sch
|
||||
rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name());
|
||||
tablet_count = aligned_tablet_count;
|
||||
}
|
||||
co_return co_await reallocate_tablets(std::move(s), std::move(tm), tablet_map(tablet_count));
|
||||
co_return co_await reallocate_tablets(std::move(s), std::move(tm),
|
||||
tablet_map(tablet_count, get_consistency() != data_dictionary::consistency_config_option::eventual));
|
||||
}
|
||||
|
||||
future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, token_metadata_ptr tm, tablet_map tablets) const {
|
||||
@@ -325,6 +326,16 @@ future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, t
|
||||
for (tablet_id tb : tablets.tablet_ids()) {
|
||||
auto tinfo = tablets.get_tablet_info(tb);
|
||||
tinfo.replicas = co_await reallocate_tablets(s, tm, load, tablets, tb);
|
||||
if (tablets.has_raft_info()) {
|
||||
for (auto& r: tinfo.replicas) {
|
||||
r.shard = 0;
|
||||
}
|
||||
if (!tablets.get_tablet_raft_info(tb).group_id) {
|
||||
tablets.set_tablet_raft_info(tb, tablet_raft_info {
|
||||
.group_id = raft::group_id{utils::UUID_gen::get_time_UUID()}
|
||||
});
|
||||
}
|
||||
}
|
||||
tablets.set_tablet(tb, std::move(tinfo));
|
||||
}
|
||||
|
||||
@@ -497,7 +508,7 @@ future<tablet_replica_set> network_topology_strategy::add_tablets_in_dc(schema_p
|
||||
candidates_list& rack_list = existing.empty() ? new_racks : existing_racks;
|
||||
auto& candidate = rack_list.emplace_back(rack);
|
||||
for (const auto& node : nodes) {
|
||||
if (!node.get().is_normal()) {
|
||||
if (!node.get().is_normal() || node.get().is_draining()) {
|
||||
continue;
|
||||
}
|
||||
const auto& host_id = node.get().host_id();
|
||||
|
||||
@@ -26,6 +26,7 @@ class tablet_aware_replication_strategy : public per_table_replication_strategy
|
||||
private:
|
||||
size_t _initial_tablets = 0;
|
||||
db::tablet_options _tablet_options;
|
||||
data_dictionary::consistency_config_option _consistency;
|
||||
protected:
|
||||
void validate_tablet_options(const abstract_replication_strategy&, const gms::feature_service&, const replication_strategy_config_options&) const;
|
||||
void process_tablet_options(abstract_replication_strategy&, replication_strategy_config_options&, replication_strategy_params);
|
||||
@@ -37,6 +38,8 @@ protected:
|
||||
public:
|
||||
size_t get_initial_tablets() const { return _initial_tablets; }
|
||||
|
||||
data_dictionary::consistency_config_option get_consistency() const { return _consistency; }
|
||||
|
||||
/// Generates tablet_map for a new table.
|
||||
/// Runs under group0 guard.
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, size_t tablet_count) const = 0;
|
||||
|
||||
@@ -470,16 +470,20 @@ bool tablet_metadata::operator==(const tablet_metadata& o) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
tablet_map::tablet_map(size_t tablet_count)
|
||||
tablet_map::tablet_map(size_t tablet_count, bool with_raft_info)
|
||||
: _log2_tablets(log2ceil(tablet_count)) {
|
||||
if (tablet_count != 1ul << _log2_tablets) {
|
||||
on_internal_error(tablet_logger, format("Tablet count not a power of 2: {}", tablet_count));
|
||||
}
|
||||
_tablets.resize(tablet_count);
|
||||
if (with_raft_info) {
|
||||
_raft_info.resize(tablet_count);
|
||||
}
|
||||
}
|
||||
|
||||
tablet_map tablet_map::clone() const {
|
||||
return tablet_map(_tablets, _log2_tablets, _transitions, _resize_decision, _resize_task_info, _repair_scheduler_config);
|
||||
return tablet_map(_tablets, _log2_tablets, _transitions, _resize_decision, _resize_task_info,
|
||||
_repair_scheduler_config, _raft_info);
|
||||
}
|
||||
|
||||
future<tablet_map> tablet_map::clone_gently() const {
|
||||
@@ -497,7 +501,15 @@ future<tablet_map> tablet_map::clone_gently() const {
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
co_return tablet_map(std::move(tablets), _log2_tablets, std::move(transitions), _resize_decision, _resize_task_info, _repair_scheduler_config);
|
||||
raft_info_container raft_info;
|
||||
raft_info.reserve(_raft_info.size());
|
||||
for (const auto& i: _raft_info) {
|
||||
raft_info.emplace_back(i);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
co_return tablet_map(std::move(tablets), _log2_tablets, std::move(transitions), _resize_decision,
|
||||
_resize_task_info, _repair_scheduler_config, std::move(raft_info));
|
||||
}
|
||||
|
||||
void tablet_map::check_tablet_id(tablet_id id) const {
|
||||
@@ -707,6 +719,24 @@ const tablet_transition_info* tablet_map::get_tablet_transition_info(tablet_id i
|
||||
return &i->second;
|
||||
}
|
||||
|
||||
const tablet_raft_info& tablet_map::get_tablet_raft_info(tablet_id id) const {
|
||||
check_tablet_id(id);
|
||||
if (_raft_info.empty()) {
|
||||
on_internal_error(tablet_logger, "Tablet map doesn't have raft info");
|
||||
}
|
||||
return _raft_info[size_t(id)];
|
||||
}
|
||||
|
||||
void tablet_map::set_tablet_raft_info(tablet_id id, tablet_raft_info raft_info) {
|
||||
check_tablet_id(id);
|
||||
if (_raft_info.empty()) {
|
||||
on_internal_error(tablet_logger,
|
||||
format("Tablet map has no raft info, tablet_id {}, group_id {}",
|
||||
id, raft_info.group_id));
|
||||
}
|
||||
_raft_info[size_t(id)] = std::move(raft_info);
|
||||
}
|
||||
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<tablet_transition_stage, sstring> tablet_transition_stage_to_name = {
|
||||
{tablet_transition_stage::allow_write_both_read_old, "allow_write_both_read_old"},
|
||||
@@ -1416,6 +1446,7 @@ void tablet_aware_replication_strategy::process_tablet_options(abstract_replicat
|
||||
replication_strategy_params params) {
|
||||
if (ars._uses_tablets) {
|
||||
_initial_tablets = params.initial_tablets.value_or(0);
|
||||
_consistency = params.consistency.value_or(data_dictionary::consistency_config_option::eventual);
|
||||
mark_as_per_table(ars);
|
||||
}
|
||||
}
|
||||
@@ -1580,15 +1611,25 @@ static void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metad
|
||||
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Keyspace '{}' has been verified to be RF-rack-valid", ks);
|
||||
}
|
||||
|
||||
static bool is_normal_token_owner(const token_metadata& tm, locator::host_id host) {
|
||||
auto& node = tm.get_topology().get_node(host);
|
||||
return tm.is_normal_token_owner(host) && !tm.is_leaving(host) && !node.is_draining();
|
||||
}
|
||||
|
||||
static bool is_transitioning_token_owner(const token_metadata& tm, locator::host_id host) {
|
||||
auto& node = tm.get_topology().get_node(host);
|
||||
return tm.get_topology().get_node(host).is_bootstrapping() ||
|
||||
(tm.is_normal_token_owner(host) && (tm.is_leaving(host) || node.is_draining()));
|
||||
}
|
||||
|
||||
void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr tmptr, const abstract_replication_strategy& ars) {
|
||||
assert_rf_rack_valid_keyspace(ks, tmptr, ars,
|
||||
tmptr->get_topology().get_datacenter_racks(),
|
||||
[&tmptr] (host_id host) {
|
||||
return tmptr->is_normal_token_owner(host) && !tmptr->is_leaving(host);
|
||||
return is_normal_token_owner(*tmptr, host);
|
||||
},
|
||||
[&tmptr] (host_id host) {
|
||||
return tmptr->get_topology().get_node(host).is_bootstrapping() ||
|
||||
(tmptr->is_normal_token_owner(host) && tmptr->is_leaving(host));
|
||||
return is_transitioning_token_owner(*tmptr, host);
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -1625,14 +1666,13 @@ void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr
|
||||
if (op.tag == rf_rack_topology_operation::type::add && host == op.node_id) {
|
||||
return true;
|
||||
}
|
||||
return tmptr->is_normal_token_owner(host) && !tmptr->is_leaving(host);
|
||||
return is_normal_token_owner(*tmptr, host);
|
||||
},
|
||||
[&tmptr, &op] (host_id host) {
|
||||
if (op.tag == rf_rack_topology_operation::type::add && host == op.node_id) {
|
||||
return false;
|
||||
}
|
||||
return tmptr->get_topology().get_node(host).is_bootstrapping() ||
|
||||
(tmptr->is_normal_token_owner(host) && tmptr->is_leaving(host));
|
||||
return is_transitioning_token_owner(*tmptr, host);
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -1642,7 +1682,7 @@ rack_list get_allowed_racks(const locator::token_metadata& tm, const sstring& dc
|
||||
auto normal_nodes = [&] (const sstring& rack) {
|
||||
int count = 0;
|
||||
for (auto n : topo.get_datacenter_rack_nodes().at(dc).at(rack)) {
|
||||
count += int(n.get().is_normal());
|
||||
count += int(n.get().is_normal() && !n.get().is_draining());
|
||||
}
|
||||
return count;
|
||||
};
|
||||
@@ -1715,6 +1755,12 @@ auto fmt::formatter<locator::tablet_map>::format(const locator::tablet_map& r, f
|
||||
out = fmt::format_to(out, ", session={}", tr->session_id);
|
||||
}
|
||||
}
|
||||
if (r.has_raft_info()) {
|
||||
const auto& raft_info = r.get_tablet_raft_info(tid);
|
||||
if (raft_info.group_id) {
|
||||
out = fmt::format_to(out, ", group_id={}", raft_info.group_id);
|
||||
}
|
||||
}
|
||||
first = false;
|
||||
tid = *r.next_tablet(tid);
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "raft/raft.hh"
|
||||
|
||||
#include <ranges>
|
||||
#include <seastar/core/reactor.hh>
|
||||
@@ -200,10 +201,7 @@ enum class tablet_repair_incremental_mode : uint8_t {
|
||||
disabled,
|
||||
};
|
||||
|
||||
// FIXME: Incremental repair is disabled by default due to
|
||||
// https://github.com/scylladb/scylladb/issues/26041 and
|
||||
// https://github.com/scylladb/scylladb/issues/27414
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
|
||||
|
||||
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
|
||||
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);
|
||||
@@ -250,6 +248,13 @@ struct tablet_info {
|
||||
bool operator==(const tablet_info&) const = default;
|
||||
};
|
||||
|
||||
/// Reft-related information for strongly-consistent tablets.
|
||||
struct tablet_raft_info {
|
||||
raft::group_id group_id;
|
||||
|
||||
bool operator==(const tablet_raft_info&) const = default;
|
||||
};
|
||||
|
||||
// Merges tablet_info b into a, but with following constraints:
|
||||
// - they cannot have active repair task, since each task has a different id
|
||||
// - their replicas must be all co-located.
|
||||
@@ -552,6 +557,7 @@ public:
|
||||
class tablet_map {
|
||||
public:
|
||||
using tablet_container = utils::chunked_vector<tablet_info>;
|
||||
using raft_info_container = utils::chunked_vector<tablet_raft_info>;
|
||||
private:
|
||||
using transitions_map = std::unordered_map<tablet_id, tablet_transition_info>;
|
||||
// The implementation assumes that _tablets.size() is a power of 2:
|
||||
@@ -564,16 +570,20 @@ private:
|
||||
resize_decision _resize_decision;
|
||||
tablet_task_info _resize_task_info;
|
||||
std::optional<repair_scheduler_config> _repair_scheduler_config;
|
||||
raft_info_container _raft_info;
|
||||
|
||||
// Internal constructor, used by clone() and clone_gently().
|
||||
tablet_map(tablet_container tablets, size_t log2_tablets, transitions_map transitions,
|
||||
resize_decision resize_decision, tablet_task_info resize_task_info, std::optional<repair_scheduler_config> repair_scheduler_config)
|
||||
resize_decision resize_decision, tablet_task_info resize_task_info,
|
||||
std::optional<repair_scheduler_config> repair_scheduler_config,
|
||||
raft_info_container raft_info)
|
||||
: _tablets(std::move(tablets))
|
||||
, _log2_tablets(log2_tablets)
|
||||
, _transitions(std::move(transitions))
|
||||
, _resize_decision(resize_decision)
|
||||
, _resize_task_info(std::move(resize_task_info))
|
||||
, _repair_scheduler_config(std::move(repair_scheduler_config))
|
||||
, _raft_info(std::move(raft_info))
|
||||
{}
|
||||
|
||||
/// Returns the largest token owned by tablet_id when the tablet_count is `1 << log2_tablets`.
|
||||
@@ -586,7 +596,7 @@ public:
|
||||
/// Constructs a tablet map.
|
||||
///
|
||||
/// \param tablet_count The desired tablets to allocate. Must be a power of two.
|
||||
explicit tablet_map(size_t tablet_count);
|
||||
explicit tablet_map(size_t tablet_count, bool with_raft_info = false);
|
||||
|
||||
tablet_map(tablet_map&&) = default;
|
||||
tablet_map(const tablet_map&) = delete;
|
||||
@@ -612,6 +622,16 @@ public:
|
||||
/// \throws std::logic_error If the given id does not belong to this instance.
|
||||
const tablet_transition_info* get_tablet_transition_info(tablet_id) const;
|
||||
|
||||
/// Returns true for strongly-consistent tablets.
|
||||
/// Use get_tablet_raft_info() to retrieve Raft info for a specific tablet_id.
|
||||
bool has_raft_info() const {
|
||||
return !_raft_info.empty();
|
||||
}
|
||||
|
||||
/// Returns Raft information for the given tablet_id.
|
||||
/// It is an internal error to call this method if has_raft_info() returns false.
|
||||
const tablet_raft_info& get_tablet_raft_info(tablet_id) const;
|
||||
|
||||
/// Returns the largest token owned by a given tablet.
|
||||
/// \throws std::logic_error If the given id does not belong to this instance.
|
||||
dht::token get_last_token(tablet_id id) const;
|
||||
@@ -721,6 +741,7 @@ public:
|
||||
void set_repair_scheduler_config(std::optional<locator::repair_scheduler_config> config);
|
||||
void clear_tablet_transition_info(tablet_id);
|
||||
void clear_transitions();
|
||||
void set_tablet_raft_info(tablet_id, tablet_raft_info);
|
||||
|
||||
// Destroys gently.
|
||||
// The tablet map is not usable after this call and should be destroyed.
|
||||
@@ -876,7 +897,7 @@ class abstract_replication_strategy;
|
||||
/// * if the keyspace is RF-rack-valid, no side effect,
|
||||
/// * if the keyspace is RF-rack-invalid, an exception will be thrown. It will contain information about the reason
|
||||
/// why the keyspace is RF-rack-invalid and will be worded in a way that can be returned directly to the user.
|
||||
/// There are NO guarantees about the type of the exception.
|
||||
/// The exception type is std::invalid_argument.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * Every DC that takes part in replication according to the passed replication strategy MUST be known
|
||||
|
||||
@@ -55,23 +55,24 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = {
|
||||
.rack = locator::production_snitch_base::default_rack,
|
||||
};
|
||||
|
||||
node::node(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, this_node is_this_node, node::idx_type idx)
|
||||
node::node(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, this_node is_this_node, node::idx_type idx, bool draining)
|
||||
: _topology(topology)
|
||||
, _host_id(id)
|
||||
, _dc_rack(std::move(dc_rack))
|
||||
, _state(state)
|
||||
, _shard_count(std::move(shard_count))
|
||||
, _excluded(excluded)
|
||||
, _draining(draining)
|
||||
, _is_this_node(is_this_node)
|
||||
, _idx(idx)
|
||||
{}
|
||||
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, node::this_node is_this_node, node::idx_type idx) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(dc_rack), std::move(state), shard_count, excluded, is_this_node, idx);
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, node::this_node is_this_node, node::idx_type idx, bool draining) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(dc_rack), std::move(state), shard_count, excluded, is_this_node, idx, draining);
|
||||
}
|
||||
|
||||
node_holder node::clone() const {
|
||||
return make(nullptr, host_id(), dc_rack(), get_state(), get_shard_count(), is_excluded(), is_this_node());
|
||||
return make(nullptr, host_id(), dc_rack(), get_state(), get_shard_count(), is_excluded(), is_this_node(), -1, is_draining());
|
||||
}
|
||||
|
||||
std::string node::to_string(node::state s) {
|
||||
|
||||
@@ -65,6 +65,7 @@ private:
|
||||
state _state;
|
||||
shard_id _shard_count = 0;
|
||||
bool _excluded = false;
|
||||
bool _draining = false;
|
||||
|
||||
// Is this node the `localhost` instance
|
||||
this_node _is_this_node;
|
||||
@@ -78,7 +79,8 @@ public:
|
||||
shard_id shard_count = 0,
|
||||
bool excluded = false,
|
||||
this_node is_this_node = this_node::no,
|
||||
idx_type idx = -1);
|
||||
idx_type idx = -1,
|
||||
bool draining = false);
|
||||
|
||||
node(const node&) = delete;
|
||||
node(node&&) = delete;
|
||||
@@ -143,6 +145,16 @@ public:
|
||||
_excluded = excluded;
|
||||
}
|
||||
|
||||
// Indicates that the tablet scheduler should move tablet replicas away
|
||||
// from this node because it's undergoing decommission or removenode operation.
|
||||
bool is_draining() const {
|
||||
return _draining;
|
||||
}
|
||||
|
||||
void set_draining(bool draining) {
|
||||
_draining = draining;
|
||||
}
|
||||
|
||||
bool is_leaving() const noexcept {
|
||||
switch (_state) {
|
||||
case state::being_decommissioned:
|
||||
@@ -178,7 +190,8 @@ private:
|
||||
shard_id shard_count = 0,
|
||||
bool excluded = false,
|
||||
node::this_node is_this_node = this_node::no,
|
||||
idx_type idx = -1);
|
||||
idx_type idx = -1,
|
||||
bool draining = false);
|
||||
node_holder clone() const;
|
||||
|
||||
void set_topology(const locator::topology* topology) noexcept { _topology = topology; }
|
||||
@@ -492,7 +505,7 @@ struct fmt::formatter<locator::node> : fmt::formatter<string_view> {
|
||||
if (!verbose) {
|
||||
return fmt::format_to(ctx.out(), "{}", node.host_id());
|
||||
} else {
|
||||
return fmt::format_to(ctx.out(), " idx={} host_id={} dc={} rack={} state={} shards={} excluded={} this_node={}",
|
||||
return fmt::format_to(ctx.out(), " idx={} host_id={} dc={} rack={} state={} shards={} excluded={} draining={} this_node={}",
|
||||
node.idx(),
|
||||
node.host_id(),
|
||||
node.dc_rack().dc,
|
||||
@@ -500,6 +513,7 @@ struct fmt::formatter<locator::node> : fmt::formatter<string_view> {
|
||||
locator::node::to_string(node.get_state()),
|
||||
node.get_shard_count(),
|
||||
node.is_excluded(),
|
||||
node.is_draining(),
|
||||
bool(node.is_this_node()));
|
||||
}
|
||||
}
|
||||
|
||||
123
main.cc
123
main.cc
@@ -109,6 +109,8 @@
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "db/virtual_tables.hh"
|
||||
|
||||
#include "service/strong_consistency/groups_manager.hh"
|
||||
#include "service/strong_consistency/coordinator.hh"
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
@@ -123,6 +125,7 @@
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "tools/utils.hh"
|
||||
#include "schema/compression_initializer.hh"
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@@ -812,6 +815,17 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
notify_set.notify_all(configurable::system_state::stopped).get();
|
||||
});
|
||||
|
||||
// Register schema initializer for default compression settings.
|
||||
// This statement is deliberately positioned here:
|
||||
// * Needs to be done before starting services that use `schema_builder`s
|
||||
// because registration is not thread-safe (initializer vector is
|
||||
// not thread-local) and every `schema_builder` must use it.
|
||||
// * Needs to be done after the initialization of the configurables
|
||||
// because this particular initializer depends on `db::extensions::is_extension_internal_keyspace()`.
|
||||
register_compression_initializer(*cfg, [&feature_service] {
|
||||
return bool(feature_service.local().sstable_compression_dicts);
|
||||
});
|
||||
|
||||
cfg->setup_directories();
|
||||
|
||||
// We're writing to a non-atomic variable here. But bool writes are atomic
|
||||
@@ -869,6 +883,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
startlog.warn("Ignoring unused features found in config: {}", unused_features);
|
||||
}
|
||||
|
||||
// Enable Raft-related columns in system.tablets if the experimental feature is enabled.
|
||||
replica::set_strongly_consistent_tables_enabled(cfg->check_experimental(
|
||||
db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES
|
||||
));
|
||||
|
||||
gms::feature_config fcfg;
|
||||
fcfg.disabled_features = get_disabled_features_from_db_config(*cfg);
|
||||
|
||||
@@ -1119,12 +1138,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
#endif
|
||||
|
||||
seastar::scheduling_supergroup user_ssg = create_scheduling_supergroup(1000).get();
|
||||
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
replica::database_config dbcfg;
|
||||
dbcfg.compaction_scheduling_group = create_scheduling_group("compaction", "comp", 1000).get();
|
||||
dbcfg.memory_compaction_scheduling_group = create_scheduling_group("mem_compaction", "mcmp", 1000).get();
|
||||
dbcfg.streaming_scheduling_group = maintenance_scheduling_group;
|
||||
dbcfg.statement_scheduling_group = create_scheduling_group("statement", "stmt", 1000).get();
|
||||
dbcfg.statement_scheduling_group = create_scheduling_group("statement", "stmt", 1000, user_ssg).get();
|
||||
dbcfg.memtable_scheduling_group = create_scheduling_group("memtable", "mt", 1000).get();
|
||||
dbcfg.memtable_to_cache_scheduling_group = create_scheduling_group("memtable_to_cache", "mt2c", 200).get();
|
||||
dbcfg.gossip_scheduling_group = create_scheduling_group("gossip", "gms", 1000).get();
|
||||
@@ -1212,7 +1233,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
checkpoint(stop_signal, "starting service level controller");
|
||||
qos::service_level_options default_service_level_configuration;
|
||||
default_service_level_configuration.shares = 1000;
|
||||
sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, dbcfg.statement_scheduling_group).get();
|
||||
sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, user_ssg, dbcfg.statement_scheduling_group).get();
|
||||
sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
|
||||
auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] {
|
||||
sl_controller.stop().get();
|
||||
@@ -1802,6 +1823,21 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
client_routes.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing strongly consistent groups manager");
|
||||
sharded<service::strong_consistency::groups_manager> groups_manager;
|
||||
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
|
||||
std::ref(db), std::ref(feature_service)).get();
|
||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
|
||||
groups_manager.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing strongly consistent coordinator");
|
||||
sharded<service::strong_consistency::coordinator> sc_coordinator;
|
||||
sc_coordinator.start(std::ref(groups_manager), std::ref(db)).get();
|
||||
auto stop_sc_coordinator = defer_verbose_shutdown("strongly consistent coordinator", [&] {
|
||||
sc_coordinator.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing storage service");
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
@@ -1813,7 +1849,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
std::ref(auth_cache), std::ref(client_routes),
|
||||
std::ref(tsm), std::ref(vbsm), std::ref(task_manager), std::ref(gossip_address_map),
|
||||
compression_dict_updated_callback,
|
||||
only_on_shard0(&*disk_space_monitor_shard0)
|
||||
only_on_shard0(&*disk_space_monitor_shard0),
|
||||
std::ref(groups_manager)
|
||||
).get();
|
||||
|
||||
ss.local().set_train_dict_callback([&rpc_dict_training_worker] (std::vector<std::vector<std::byte>> sample) {
|
||||
@@ -1829,7 +1866,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
checkpoint(stop_signal, "initializing query processor remote part");
|
||||
// TODO: do this together with proxy.start_remote(...)
|
||||
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
|
||||
std::ref(ss), std::ref(group0_client)).get();
|
||||
std::ref(ss), std::ref(group0_client), std::ref(sc_coordinator)).get();
|
||||
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [&qp] {
|
||||
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
|
||||
});
|
||||
@@ -2178,6 +2215,17 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// This will also disable migration manager schema pulls if needed.
|
||||
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
|
||||
|
||||
// The call to setup_group0_if_exists() above guarantees that, if group0 is
|
||||
// created and started, the locally persisted group0 state has been applied
|
||||
// before it returns. As a result, tablet Raft groups are started using
|
||||
// tablet metadata that is at least as recent as the locally persisted version.
|
||||
// groups_manager::start() waits for all these Raft groups to start, so when
|
||||
// we begin RPC messaging below, the system is ready to accept proxied requests
|
||||
// from other replicas.
|
||||
groups_manager.invoke_on_all([](service::strong_consistency::groups_manager& m) {
|
||||
return m.start();
|
||||
}).get();
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
@@ -2186,12 +2234,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return messaging.invoke_on_all([&] (auto& ms) {
|
||||
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
|
||||
if (ip == gossiper.local().get_broadcast_address()) {
|
||||
// #27429. When running with broadcast_address != rpc_address, topology gets
|
||||
// confused if we can't resolve rpc/cql address as self.
|
||||
if (ip == gossiper.local().get_broadcast_address() || ip == gossiper.local().get_cql_address()) {
|
||||
return gossiper.local().my_host_id();
|
||||
}
|
||||
try {
|
||||
return gossiper.local().get_host_id(ip);
|
||||
} catch (...) {
|
||||
startlog.debug("Could not resolve host id: {}, {}", ip, std::current_exception());
|
||||
return locator::host_id{};
|
||||
}
|
||||
});
|
||||
@@ -2217,50 +2268,27 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
db.local().check_rf_rack_validity(token_metadata.local().get());
|
||||
|
||||
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
|
||||
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
|
||||
|
||||
// Start audit service after join_cluster so that the table-based audit backend
|
||||
// can properly create its keyspace and table.
|
||||
checkpoint(stop_signal, "starting audit service");
|
||||
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
||||
startlog.error("audit start failed: {}", e);
|
||||
}).get();
|
||||
auto audit_stop = defer([] {
|
||||
audit::audit::stop_audit().get();
|
||||
});
|
||||
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
//
|
||||
// Also, if the dictionary compression feature is not enabled, use
|
||||
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
|
||||
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
|
||||
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
|
||||
|
||||
gms::feature::listener_registration reg_listener;
|
||||
|
||||
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
|
||||
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
|
||||
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
|
||||
}
|
||||
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
|
||||
compression_parameters original_params{sstable_compression_options().get_options()};
|
||||
auto params = sstable_compression_options().get_options();
|
||||
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
|
||||
// Register a callback to update the default compression algorithm when the feature is enabled.
|
||||
// Precondition:
|
||||
// The callback must run inside seastar::async context:
|
||||
// - If the listener fires immediately, we are running inside seastar::async already.
|
||||
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
|
||||
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
|
||||
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
|
||||
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
|
||||
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
|
||||
sstable_compression_options(params, utils::config_file::config_source::None);
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
const auto dicts_feature_enabled = bool(feature_service.local().sstable_compression_dicts);
|
||||
|
||||
try {
|
||||
cfg->sstable_compression_user_table_options().validate(
|
||||
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
|
||||
cfg->get_sstable_compression_user_table_options(dicts_feature_enabled).validate(
|
||||
compression_parameters::dicts_feature_enabled(dicts_feature_enabled));
|
||||
} catch (const std::exception& e) {
|
||||
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
|
||||
throw bad_configuration_error();
|
||||
@@ -2506,13 +2534,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
|
||||
api::set_server_done(ctx).get();
|
||||
|
||||
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
||||
startlog.error("audit start failed: {}", e);
|
||||
}).get();
|
||||
auto audit_stop = defer([] {
|
||||
audit::audit::stop_audit().get();
|
||||
});
|
||||
|
||||
// Create controllers before drain_on_shutdown() below, so that it destructs
|
||||
// after drain stops them in stop_transport()
|
||||
// Register controllers after drain_on_shutdown() below, so that even on start
|
||||
|
||||
@@ -178,7 +178,7 @@ class fragmenting_mutation_freezer {
|
||||
|
||||
tombstone _partition_tombstone;
|
||||
std::optional<static_row> _sr;
|
||||
std::deque<clustering_row> _crs;
|
||||
utils::chunked_vector<clustering_row> _crs;
|
||||
range_tombstone_list _rts;
|
||||
|
||||
frozen_mutation_consumer_fn _consumer;
|
||||
|
||||
@@ -242,7 +242,7 @@ class streamed_mutation_freezer {
|
||||
|
||||
tombstone _partition_tombstone;
|
||||
std::optional<static_row> _sr;
|
||||
std::deque<clustering_row> _crs;
|
||||
utils::chunked_vector<clustering_row> _crs;
|
||||
range_tombstone_list _rts;
|
||||
public:
|
||||
streamed_mutation_freezer(const schema& s, const partition_key& key)
|
||||
|
||||
@@ -226,7 +226,7 @@ future<> mutation_partition_serializer::write_gently(ser::writer_of_mutation_par
|
||||
|
||||
void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone,
|
||||
std::optional<static_row> sr, range_tombstone_list rts,
|
||||
std::deque<clustering_row> crs, ser::writer_of_mutation_partition<bytes_ostream>&& wr)
|
||||
utils::chunked_vector<clustering_row> crs, ser::writer_of_mutation_partition<bytes_ostream>&& wr)
|
||||
{
|
||||
auto srow_writer = std::move(wr).write_tomb(partition_tombstone).start_static_row();
|
||||
auto row_tombstones = [&] {
|
||||
@@ -242,10 +242,9 @@ void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone
|
||||
rts.clear();
|
||||
|
||||
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
|
||||
while (!crs.empty()) {
|
||||
auto& cr = crs.front();
|
||||
for (auto& cr : crs) {
|
||||
write_row(clustering_rows.add(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb()).end_deletable_row();
|
||||
crs.pop_front();
|
||||
cr = clustering_row(clustering_key_prefix{});
|
||||
}
|
||||
std::move(clustering_rows).end_rows().end_mutation_partition();
|
||||
}
|
||||
|
||||
@@ -41,4 +41,4 @@ public:
|
||||
|
||||
void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone,
|
||||
std::optional<static_row> sr, range_tombstone_list range_tombstones,
|
||||
std::deque<clustering_row> clustering_rows, ser::writer_of_mutation_partition<bytes_ostream>&&);
|
||||
utils::chunked_vector<clustering_row> clustering_rows, ser::writer_of_mutation_partition<bytes_ostream>&&);
|
||||
|
||||
@@ -60,28 +60,47 @@ static future<db::system_keyspace::topology_requests_entries> get_entries(db::sy
|
||||
return sys_ks.get_node_ops_request_entries(db_clock::now() - ttl);
|
||||
}
|
||||
|
||||
static tasks::task_stats get_task_stats(const db::system_keyspace::topology_requests_entry& entry,
|
||||
const tasks::virtual_task_hint& hint) {
|
||||
return tasks::task_stats{
|
||||
.task_id = tasks::task_id{entry.id},
|
||||
.type = request_type_to_task_type(entry.request_type),
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.scope = "cluster",
|
||||
.state = get_state(entry),
|
||||
.sequence_number = 0,
|
||||
.keyspace = "",
|
||||
.table = "",
|
||||
.entity = hint.node_id ? fmt::to_string(*hint.node_id) : "",
|
||||
.shard = 0,
|
||||
.start_time = entry.start_time,
|
||||
.end_time = entry.end_time,
|
||||
};
|
||||
}
|
||||
|
||||
future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) {
|
||||
auto entry_opt = co_await _ss._sys_ks.local().get_topology_request_entry_opt(id.uuid());
|
||||
if (!entry_opt) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
auto& entry = *entry_opt;
|
||||
auto stats = get_task_stats(entry, hint);
|
||||
co_return tasks::task_status{
|
||||
.task_id = id,
|
||||
.type = request_type_to_task_type(entry.request_type),
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.scope = "cluster",
|
||||
.state = get_state(entry),
|
||||
.task_id = stats.task_id,
|
||||
.type = stats.type,
|
||||
.kind = stats.kind,
|
||||
.scope = stats.scope,
|
||||
.state = stats.state,
|
||||
.is_abortable = co_await is_abortable(std::move(hint)),
|
||||
.start_time = entry.start_time,
|
||||
.end_time = entry.end_time,
|
||||
.start_time = stats.start_time,
|
||||
.end_time = stats.end_time,
|
||||
.error = entry.error,
|
||||
.parent_id = tasks::task_id::create_null_id(),
|
||||
.sequence_number = 0,
|
||||
.shard = 0,
|
||||
.keyspace = "",
|
||||
.table = "",
|
||||
.entity = "",
|
||||
.sequence_number = stats.sequence_number,
|
||||
.shard = stats.shard,
|
||||
.keyspace = stats.keyspace,
|
||||
.table = stats.table,
|
||||
.entity = stats.entity,
|
||||
.progress_units = "",
|
||||
.progress = tasks::task_manager::task::progress{},
|
||||
.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()))
|
||||
@@ -92,26 +111,41 @@ tasks::task_manager::task_group node_ops_virtual_task::get_group() const noexcep
|
||||
return tasks::task_manager::task_group::topology_change_group;
|
||||
}
|
||||
|
||||
static std::map<tasks::task_id, locator::host_id> get_requests(const service::topology& topology) {
|
||||
std::map<tasks::task_id, locator::host_id> result;
|
||||
for (auto& request : topology.requests) {
|
||||
auto* rs = topology.find(request.first);
|
||||
if (rs) {
|
||||
result.emplace(tasks::task_id(rs->second.request_id), locator::host_id(request.first.uuid()));
|
||||
}
|
||||
}
|
||||
for (auto& [node, rs] : topology.transition_nodes) {
|
||||
result.emplace(tasks::task_id(rs.request_id), locator::host_id(node.uuid()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
future<std::optional<tasks::virtual_task_hint>> node_ops_virtual_task::contains(tasks::task_id task_id) const {
|
||||
if (!task_id.uuid().is_timestamp()) {
|
||||
// Task id of node ops operation is always a timestamp.
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
auto empty_hint = std::make_optional<tasks::virtual_task_hint>({});
|
||||
auto hint = std::make_optional<tasks::virtual_task_hint>({});
|
||||
service::topology& topology = _ss._topology_state_machine._topology;
|
||||
for (auto& request : topology.requests) {
|
||||
if (topology.find(request.first)->second.request_id == task_id.uuid()) {
|
||||
co_return empty_hint;
|
||||
}
|
||||
auto reqs = get_requests(topology);
|
||||
if (reqs.contains(task_id)) {
|
||||
hint->node_id = reqs.at(task_id);
|
||||
co_return hint;
|
||||
}
|
||||
|
||||
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(task_id.uuid());
|
||||
co_return entry && std::holds_alternative<service::topology_request>(entry->request_type) ? empty_hint : std::nullopt;
|
||||
co_return entry && std::holds_alternative<service::topology_request>(entry->request_type) ? hint : std::nullopt;
|
||||
}
|
||||
|
||||
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint) const {
|
||||
return make_ready_future<tasks::is_abortable>(tasks::is_abortable::no);
|
||||
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint hint) const {
|
||||
// Currently, only node operations are supported by abort_topology_request().
|
||||
return make_ready_future<tasks::is_abortable>(tasks::is_abortable(hint.node_id.has_value()));
|
||||
}
|
||||
|
||||
future<std::optional<tasks::task_status>> node_ops_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) {
|
||||
@@ -124,30 +158,21 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::wait(tasks::tas
|
||||
co_return co_await get_status(id, std::move(hint));
|
||||
}
|
||||
|
||||
future<> node_ops_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint) noexcept {
|
||||
return make_ready_future();
|
||||
future<> node_ops_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept {
|
||||
co_await _ss.abort_topology_request(id.uuid());
|
||||
}
|
||||
|
||||
future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
|
||||
db::system_keyspace& sys_ks = _ss._sys_ks.local();
|
||||
co_return std::ranges::to<std::vector<tasks::task_stats>>(co_await get_entries(sys_ks, get_task_manager().get_user_task_ttl())
|
||||
| std::views::transform([] (const auto& e) {
|
||||
auto id = e.first;
|
||||
| std::views::transform([reqs = get_requests(_ss._topology_state_machine._topology)] (const auto& e) {
|
||||
auto id = tasks::task_id{e.first};
|
||||
auto& entry = e.second;
|
||||
return tasks::task_stats {
|
||||
.task_id = tasks::task_id{id},
|
||||
.type = node_ops::request_type_to_task_type(entry.request_type),
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.scope = "cluster",
|
||||
.state = get_state(entry),
|
||||
.sequence_number = 0,
|
||||
.keyspace = "",
|
||||
.table = "",
|
||||
.entity = "",
|
||||
.shard = 0,
|
||||
.start_time = entry.start_time,
|
||||
.end_time = entry.end_time
|
||||
};
|
||||
tasks::virtual_task_hint hint;
|
||||
if (reqs.contains(id)) {
|
||||
hint.node_id = reqs.at(id);
|
||||
}
|
||||
return get_task_stats(entry, hint);
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
@@ -250,6 +250,9 @@ public:
|
||||
}
|
||||
|
||||
virtual future<> fill_buffer() override {
|
||||
if (const auto& ex = get_abort_exception(); ex) {
|
||||
return make_exception_future<>(ex);
|
||||
}
|
||||
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
||||
_reader.with_reserve([&] {
|
||||
if (!_static_row_done) {
|
||||
|
||||
@@ -300,8 +300,7 @@ private:
|
||||
void add_to_rpc_config(server_address srv);
|
||||
void remove_from_rpc_config(const server_address& srv);
|
||||
|
||||
// A helper to wait for a leader to get elected
|
||||
future<> wait_for_leader(seastar::abort_source* as);
|
||||
future<> wait_for_leader(seastar::abort_source* as) override;
|
||||
|
||||
future<> wait_for_state_change(seastar::abort_source* as) override;
|
||||
|
||||
|
||||
@@ -254,6 +254,16 @@ public:
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
|
||||
|
||||
// The returned future is resolved when a leader is elected for the current term.
|
||||
// Note that it is not guaranteed that the leader will remain the same by the time
|
||||
// the future is resolved, so the caller must check the synchronous
|
||||
// `current_leader()` function and retry `wait_for_leader()` if it returns an empty
|
||||
// `raft::server_id`.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the function abortable.
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
virtual future<> wait_for_leader(seastar::abort_source* as) = 0;
|
||||
|
||||
// Manually trigger snapshot creation and log truncation.
|
||||
//
|
||||
// Does nothing if the current apply index is less or equal to the last persisted snapshot descriptor index
|
||||
|
||||
@@ -153,7 +153,7 @@ private:
|
||||
|
||||
sstring _op_name;
|
||||
std::string_view _op_name_view;
|
||||
reader_resources _base_resources;
|
||||
const reader_resources _base_resources;
|
||||
bool _base_resources_consumed = false;
|
||||
reader_resources _resources;
|
||||
reader_permit::state _state = reader_permit::state::active;
|
||||
@@ -270,9 +270,7 @@ public:
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
~impl() {
|
||||
if (_base_resources_consumed) {
|
||||
signal(_base_resources);
|
||||
}
|
||||
release_base_resources();
|
||||
|
||||
if (_resources.non_zero()) {
|
||||
on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): permit {} detected a leak of {{count={}, memory={}}} resources",
|
||||
@@ -342,6 +340,11 @@ public:
|
||||
void on_admission() {
|
||||
SCYLLA_ASSERT(_state != reader_permit::state::active_await);
|
||||
on_permit_active();
|
||||
|
||||
if (_base_resources_consumed) {
|
||||
on_internal_error(rcslog, fmt::format("on_admission(): permit {} already has its base resources consumed", description()));
|
||||
}
|
||||
|
||||
consume(_base_resources);
|
||||
_base_resources_consumed = true;
|
||||
}
|
||||
@@ -370,10 +373,7 @@ public:
|
||||
void on_evicted() {
|
||||
SCYLLA_ASSERT(_state == reader_permit::state::inactive);
|
||||
_state = reader_permit::state::evicted;
|
||||
if (_base_resources_consumed) {
|
||||
signal(_base_resources);
|
||||
_base_resources_consumed = false;
|
||||
}
|
||||
release_base_resources();
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
@@ -403,10 +403,9 @@ public:
|
||||
|
||||
void release_base_resources() noexcept {
|
||||
if (_base_resources_consumed) {
|
||||
_resources -= _base_resources;
|
||||
_base_resources_consumed = false;
|
||||
signal(_base_resources);
|
||||
}
|
||||
_semaphore.signal(std::exchange(_base_resources, {}));
|
||||
}
|
||||
|
||||
sstring description() const {
|
||||
|
||||
@@ -1476,13 +1476,14 @@ future<> repair_service::sync_data_using_repair(
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
shared_ptr<node_ops_info> ops_info) {
|
||||
shared_ptr<node_ops_info> ops_info,
|
||||
service::frozen_topology_guard frozen_topology_guard) {
|
||||
if (ranges.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::data_sync_repair_task_impl>({}, _repair_module->new_repair_uniq_id(), std::move(keyspace), "", std::move(ranges), std::move(neighbors), reason, ops_info);
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::data_sync_repair_task_impl>({}, _repair_module->new_repair_uniq_id(), std::move(keyspace), "", std::move(ranges), std::move(neighbors), reason, ops_info, std::move(frozen_topology_guard));
|
||||
co_await task->done();
|
||||
}
|
||||
|
||||
@@ -1523,7 +1524,7 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=started, reason={}, small_table_optimization={}", id.uuid(), keyspace, _reason, small_table_optimization);
|
||||
co_await module->run(id, [this, &rs, id, &db, keyspace, ranges_reduced_factor, small_table_optimization, germs = std::move(germs), &ranges = _ranges, &neighbors = _neighbors, reason = _reason, &task_as = _as] () mutable {
|
||||
co_await module->run(id, [this, &rs, id, &db, keyspace, ranges_reduced_factor, small_table_optimization, germs = std::move(germs), &ranges = _ranges, &neighbors = _neighbors, reason = _reason, &task_as = _as, frozen_topology_guard = _frozen_topology_guard] () mutable {
|
||||
auto cfs = list_column_families(db, keyspace);
|
||||
_cfs_size = cfs.size();
|
||||
if (cfs.empty()) {
|
||||
@@ -1535,7 +1536,7 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
repair_results.reserve(smp::count);
|
||||
task_as.check();
|
||||
for (auto shard : std::views::iota(0u, smp::count)) {
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges_reduced_factor, ranges, neighbors, reason, germs, small_table_optimization, parent_data = get_repair_uniq_id().task_info] (repair_service& local_repair) mutable -> future<> {
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges_reduced_factor, ranges, neighbors, reason, germs, small_table_optimization, parent_data = get_repair_uniq_id().task_info, frozen_topology_guard] (repair_service& local_repair) mutable -> future<> {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<locator::host_id>();
|
||||
@@ -1545,7 +1546,7 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), std::move(keyspace),
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), std::move(neighbors), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time,
|
||||
service::default_session_id, tablet_repair_sched_info{}, ranges_reduced_factor);
|
||||
frozen_topology_guard, tablet_repair_sched_info{}, ranges_reduced_factor);
|
||||
co_await task->done();
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1584,10 +1585,10 @@ future<std::optional<double>> repair::data_sync_repair_task_impl::expected_total
|
||||
co_return _cfs_size ? std::make_optional<double>(_ranges.size() * _cfs_size * smp::count) : std::nullopt;
|
||||
}
|
||||
|
||||
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
|
||||
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens), reason] () mutable {
|
||||
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens), reason, frozen_topology_guard = std::move(frozen_topology_guard)] () mutable {
|
||||
auto& db = get_db().local();
|
||||
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
|
||||
auto& topology = tmptr->get_topology();
|
||||
@@ -1776,7 +1777,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr, frozen_topology_guard).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}, nr_tables={}", keyspace_name, nr_ranges * nr_tables, nr_tables);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | std::views::keys);
|
||||
@@ -1804,9 +1805,9 @@ future<> repair_service::reset_node_ops_progress(streaming::stream_reason reason
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node_id, shared_ptr<node_ops_info> ops, streaming::stream_reason reason) {
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node_id, shared_ptr<node_ops_info> ops, streaming::stream_reason reason, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), leaving_node_id = std::move(leaving_node_id), ops] () mutable {
|
||||
return seastar::async([this, tmptr = std::move(tmptr), leaving_node_id = std::move(leaving_node_id), ops, frozen_topology_guard = std::move(frozen_topology_guard)] () mutable {
|
||||
auto& db = get_db().local();
|
||||
auto& topology = tmptr->get_topology();
|
||||
auto myhostid = topology.my_host_id();
|
||||
@@ -1990,7 +1991,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, ops).get();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, ops, frozen_topology_guard).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node_id, nr_ranges_total, nr_ranges_synced * nr_tables, nr_ranges_skipped * nr_tables);
|
||||
}
|
||||
@@ -1998,15 +1999,15 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
}
|
||||
|
||||
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) {
|
||||
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto my_address = tmptr->get_topology().my_host_id();
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {}, streaming::stream_reason::decommission);
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {}, streaming::stream_reason::decommission, std::move(frozen_topology_guard));
|
||||
}
|
||||
|
||||
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops), streaming::stream_reason::removenode).then([this] {
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops), streaming::stream_reason::removenode, std::move(frozen_topology_guard)).then([this] {
|
||||
rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion");
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
return db.invoke_on_all([](replica::database &db) {
|
||||
@@ -2017,9 +2018,9 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node) {
|
||||
future<> repair_service::do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, service::frozen_topology_guard frozen_topology_guard, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return seastar::async([this, ks_erms = std::move(ks_erms), tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes), replaced_node] () mutable {
|
||||
return seastar::async([this, ks_erms = std::move(ks_erms), tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes), replaced_node, frozen_topology_guard = std::move(frozen_topology_guard)] () mutable {
|
||||
auto& db = get_db().local();
|
||||
const auto& topology = tmptr->get_topology();
|
||||
auto myid = tmptr->get_my_id();
|
||||
@@ -2210,14 +2211,14 @@ future<> repair_service::do_rebuild_replace_with_repair(std::unordered_map<sstri
|
||||
}).get();
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr).get();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr, frozen_topology_guard).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc_for_keyspace, nr_ranges * nr_tables);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | std::views::keys, source_dc);
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc) {
|
||||
future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto op = sstring("rebuild_with_repair");
|
||||
const auto& topology = tmptr->get_topology();
|
||||
@@ -2226,7 +2227,7 @@ future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator
|
||||
}
|
||||
auto reason = streaming::stream_reason::rebuild;
|
||||
rlogger.info("{}: this-node={} source_dc={}", op, *topology.this_node(), source_dc);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(tmptr), std::move(op), std::move(source_dc), reason).finally([this, reason] { return reset_node_ops_progress(reason);});
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(tmptr), std::move(op), std::move(source_dc), reason, frozen_topology_guard).finally([this, reason] { return reset_node_ops_progress(reason);});
|
||||
co_await get_db().invoke_on_all([](replica::database& db) {
|
||||
for (auto& t : db.get_non_system_column_families()) {
|
||||
t->trigger_offstrategy_compaction();
|
||||
@@ -2234,7 +2235,7 @@ future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node) {
|
||||
future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node, service::frozen_topology_guard frozen_topology_guard) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto cloned_tm = co_await tmptr->clone_async();
|
||||
auto op = sstring("replace_with_repair");
|
||||
@@ -2248,7 +2249,7 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
auto source_dc = utils::optional_param(myloc.dc);
|
||||
rlogger.info("{}: this-node={} ignore_nodes={} source_dc={}", op, *topology.this_node(), ignore_nodes, source_dc);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes), replaced_node).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, frozen_topology_guard, std::move(ignore_nodes), replaced_node).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
}
|
||||
|
||||
// It is called by the repair_tablet rpc verb to repair the given tablet
|
||||
|
||||
@@ -208,15 +208,15 @@ public:
|
||||
|
||||
// The tokens are the tokens assigned to the bootstrap node.
|
||||
// all repair-based node operation entry points must be called on shard 0
|
||||
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc);
|
||||
future<> replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node);
|
||||
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens, service::frozen_topology_guard frozen_topology_guard);
|
||||
future<> decommission_with_repair(locator::token_metadata_ptr tmptr, service::frozen_topology_guard frozen_topology_guard);
|
||||
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops, service::frozen_topology_guard frozen_topology_guard);
|
||||
future<> rebuild_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc, service::frozen_topology_guard frozen_topology_guard);
|
||||
future<> replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node, service::frozen_topology_guard frozen_topology_guard);
|
||||
private:
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops, streaming::stream_reason reason);
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops, streaming::stream_reason reason, service::frozen_topology_guard frozen_topology_guard);
|
||||
|
||||
future<> do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, std::unordered_set<locator::host_id> ignore_nodes = {}, locator::host_id replaced_node = {});
|
||||
future<> do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::static_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, service::frozen_topology_guard frozen_topology_guard, std::unordered_set<locator::host_id> ignore_nodes = {}, locator::host_id replaced_node = {});
|
||||
|
||||
// Must be called on shard 0
|
||||
future<> sync_data_using_repair(sstring keyspace,
|
||||
@@ -224,7 +224,8 @@ private:
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
shared_ptr<node_ops_info> ops_info);
|
||||
shared_ptr<node_ops_info> ops_info,
|
||||
service::frozen_topology_guard frozen_topology_guard);
|
||||
|
||||
future<> reset_node_ops_progress(streaming::stream_reason reason);
|
||||
|
||||
|
||||
@@ -82,11 +82,13 @@ private:
|
||||
std::unordered_map<dht::token_range, repair_neighbors> _neighbors;
|
||||
optimized_optional<abort_source::subscription> _abort_subscription;
|
||||
size_t _cfs_size = 0;
|
||||
service::frozen_topology_guard _frozen_topology_guard;
|
||||
public:
|
||||
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
|
||||
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info, service::frozen_topology_guard frozen_topology_guard)
|
||||
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), reason)
|
||||
, _ranges(std::move(ranges))
|
||||
, _neighbors(std::move(neighbors))
|
||||
, _frozen_topology_guard(std::move(frozen_topology_guard))
|
||||
{
|
||||
if (ops_info && ops_info->as) {
|
||||
_abort_subscription = ops_info->as->subscribe([this] () noexcept {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user