Compare commits

..

1 Commits

Author SHA1 Message Date
Amnon Heiman
607ca719d7 Enable prometheus_allow_protobuf by default
Change the prometheus_allow_protobuf configuration to true by default.
This allows ScyllaDB server to serve Prometheus protobuf format (enables
native histogram support) if asked so by the monitoring server.

Update config help text/docs to reflect protobuf support (drop
“experimental” wording).

Add cluster tests to validate the default is enabled, can be overridden,
and /metrics returns protobuf when requested via Accept header (and
falls back to text when disabled).

Fixes #27817
co-Author: mykaul <mykaul@scylladb.com>

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2026-01-19 09:40:49 +02:00
442 changed files with 3490 additions and 11788 deletions

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix // Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name // Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`; const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const regex = new RegExp(pattern); const regex = new RegExp(pattern);
if (!regex.test(body)) { if (!regex.test(body)) {

View File

@@ -14,8 +14,7 @@ env:
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service
SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log
permissions: permissions: {}
contents: read
# cancel the in-progress run upon a repush # cancel the in-progress run upon a repush
concurrency: concurrency:
@@ -35,6 +34,8 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with: with:
submodules: true submodules: true
- run: |
sudo dnf -y install clang-tools-extra
- name: Generate compilation database - name: Generate compilation database
run: | run: |
cmake \ cmake \

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags # Default scylla product/version tags
PRODUCT=scylla PRODUCT=scylla
VERSION=2026.2.0-dev VERSION=2026.1.0-dev
if test -f version if test -f version
then then

View File

@@ -244,10 +244,7 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
// Check if two JSON-encoded values match with the CONTAINS relation // Check if two JSON-encoded values match with the CONTAINS relation
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) { bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) { if (!v1) {
return false;
}
if (!v2.IsObject() || v2.MemberCount() == 0) {
return false; return false;
} }
const auto& kv1 = *v1->MemberBegin(); const auto& kv1 = *v1->MemberBegin();

View File

@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
} }
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept { void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
if (_should_add_to_response) { if (_should_add_to_reponse) {
auto consumption = rjson::empty_object(); auto consumption = rjson::empty_object();
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units()); rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
rjson::add(response, "ConsumedCapacity", std::move(consumption)); rjson::add(response, "ConsumedCapacity", std::move(consumption));
@@ -53,9 +53,7 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
} }
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) { static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
// by using division with modulo instead of addition before division
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
if (is_quorum) { if (is_quorum) {
half_units *= 2; half_units *= 2;

View File

@@ -28,9 +28,9 @@ namespace alternator {
class consumed_capacity_counter { class consumed_capacity_counter {
public: public:
consumed_capacity_counter() = default; consumed_capacity_counter() = default;
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){} consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
bool operator()() const noexcept { bool operator()() const noexcept {
return _should_add_to_response; return _should_add_to_reponse;
} }
consumed_capacity_counter& operator +=(uint64_t bytes); consumed_capacity_counter& operator +=(uint64_t bytes);
@@ -44,7 +44,7 @@ public:
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request); static bool should_add_capacity(const rjson::value& request);
protected: protected:
bool _should_add_to_response = false; bool _should_add_to_reponse = false;
}; };
class rcu_consumed_capacity_counter : public consumed_capacity_counter { class rcu_consumed_capacity_counter : public consumed_capacity_counter {

View File

@@ -17,7 +17,6 @@
#include "auth/service.hh" #include "auth/service.hh"
#include "db/config.hh" #include "db/config.hh"
#include "db/view/view_build_status.hh" #include "db/view/view_build_status.hh"
#include "locator/tablets.hh"
#include "mutation/tombstone.hh" #include "mutation/tombstone.hh"
#include "locator/abstract_replication_strategy.hh" #include "locator/abstract_replication_strategy.hh"
#include "utils/log.hh" #include "utils/log.hh"
@@ -834,13 +833,11 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes); total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() }; const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done // Note: we don't care when the notification of other shards will finish, as long as it will be done
// A race condition is possible: if a DescribeTable request arrives on a different shard before // it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// that shard receives the cached size, it will recalculate independently. This is acceptable because: // the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// 1. Both calculations will cache their results with an expiry time // with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value // In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// 3. Even if expiry times match, different shards may briefly return different table sizes // which is also fine, as the specification doesn't give precision guarantees of any kind.
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
// exact precision for DescribeTable size information
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry); co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
} }
} }
@@ -1878,34 +1875,23 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
auto ts = group0_guard.write_timestamp(); auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations; utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode); auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
// Alternator Streams doesn't yet work when the table uses tablets (#23838) // Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) { if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) { if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) { if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). " co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'."); "If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
} }
} }
} }
// Creating an index in tablets mode requires the keyspace to be RF-rack-valid. // Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
// GSI and LSI indexes are based on materialized views which require RF-rack-validity to avoid consistency issues. // GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
if (!view_builders.empty() || _proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) { if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
try { co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
locator::assert_rf_rack_valid_keyspace(keyspace_name, _proxy.local_db().get_token_metadata_ptr(), *rs);
} catch (const std::invalid_argument& ex) {
if (!view_builders.empty()) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes and LocalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
} else {
co_return api_error::validation(fmt::format("Cannot create table '{}' with tablets: the configuration "
"option 'rf_rack_valid_keyspaces' is enabled, which enforces that tables using tablets can only be created in clusters "
"that have either 1 or 3 racks", table_name));
}
}
} }
try { try {
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts); schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
@@ -2128,12 +2114,9 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation(fmt::format( co_return api_error::validation(fmt::format(
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name)); "LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
} }
try { if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
locator::assert_rf_rack_valid_keyspace(keyspace_name, p.local().local_db().get_token_metadata_ptr(), !p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy()); co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
} catch (const std::invalid_argument& ex) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
} }
elogger.trace("Adding GSI {}", index_name); elogger.trace("Adding GSI {}", index_name);

View File

@@ -491,7 +491,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
if (!opts.enabled()) { if (!opts.enabled()) {
rjson::add(ret, "StreamDescription", std::move(stream_desc)); rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret)); return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
} }
// TODO: label // TODO: label
@@ -502,121 +502,123 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); return _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }).then([db, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)] (std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
std::optional<shard_id> last; auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
auto i = topologies.begin(); std::optional<shard_id> last;
// if we're a paged query, skip to the generation where we left of.
if (shard_start) {
i = topologies.find(shard_start->time);
}
// for parent-child stuff we need id:s to be sorted by token auto i = topologies.begin();
// (see explanation above) since we want to find closest // if we're a paged query, skip to the generation where we left of.
// token boundary when determining parent. if (shard_start) {
// #7346 - we processed and searched children/parents in i = topologies.find(shard_start->time);
// stored order, which is not necessarily token order, }
// so the finding of "closest" token boundary (using upper bound)
// could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return id1.token() < id2.token();
};
// #7409 - shards must be returned in lexicographical order, // for parent-child stuff we need id:s to be sorted by token
// normal bytes compare is string_traits<int8_t>::compare. // (see explanation above) since we want to find closest
// thus bytes 0x8000 is less than 0x0000. By doing unsigned // token boundary when determining parent.
// compare instead we inadvertently will sort in string lexical. // #7346 - we processed and searched children/parents in
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) { // stored order, which is not necessarily token order,
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0; // so the finding of "closest" token boundary (using upper bound)
}; // could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
// need a prev even if we are skipping stuff return id1.token() < id2.token();
if (i != topologies.begin()) { };
prev = std::prev(i);
}
for (; limit > 0 && i != e; prev = i, ++i) {
auto& [ts, sv] = *i;
last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// #7409 - shards must be returned in lexicographical order, // #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp); // normal bytes compare is string_traits<int8_t>::compare.
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
// compare instead we inadvertently will sort in string lexical.
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
};
if (shard_start) { // need a prev even if we are skipping stuff
// find next shard position if (i != topologies.begin()) {
lo = std::upper_bound(lo, end, shard_start->id, id_cmp); prev = std::prev(i);
shard_start = std::nullopt;
} }
if (lo != end && prev != e) { for (; limit > 0 && i != e; prev = i, ++i) {
// We want older stuff sorted in token order so we can find matching auto& [ts, sv] = *i;
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto& pids = prev->second.streams;
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
if (pid != pids.end()) {
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
}
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
last = std::nullopt; last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp);
if (shard_start) {
// find next shard position
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
shard_start = std::nullopt;
}
if (lo != end && prev != e) {
// We want older stuff sorted in token order so we can find matching
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto& pids = prev->second.streams;
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
if (pid != pids.end()) {
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
}
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
last = std::nullopt;
}
} }
}
if (last) { if (last) {
rjson::add(stream_desc, "LastEvaluatedShardId", *last); rjson::add(stream_desc, "LastEvaluatedShardId", *last);
} }
rjson::add(stream_desc, "Shards", std::move(shards)); rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc)); rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret)); return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
} }
enum class shard_iterator_type { enum class shard_iterator_type {
@@ -896,169 +898,172 @@ future<executor::request_return_type> executor::get_records(client_state& client
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul)); query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
service::storage_proxy::coordinator_query_result qr = co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)); co_return co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then(
cql3::selection::result_set_builder builder(*selection, gc_clock::now()); [this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable {
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection)); cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build(); auto result_set = builder.build();
auto records = rjson::empty_array(); auto records = rjson::empty_array();
auto& metadata = result_set->get_metadata(); auto& metadata = result_set->get_metadata();
auto op_index = std::distance(metadata.get_names().begin(), auto op_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) { std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == op_column_name; return cdef->name->name() == op_column_name;
}) })
); );
auto ts_index = std::distance(metadata.get_names().begin(), auto ts_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) { std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == timestamp_column_name; return cdef->name->name() == timestamp_column_name;
}) })
); );
auto eor_index = std::distance(metadata.get_names().begin(), auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) { std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name; return cdef->name->name() == eor_column_name;
}) })
); );
std::optional<utils::UUID> timestamp; std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object(); auto dynamodb = rjson::empty_object();
auto record = rjson::empty_object(); auto record = rjson::empty_object();
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter(); const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
using op_utype = std::underlying_type_t<cdc::operation>; using op_utype = std::underlying_type_t<cdc::operation>;
auto maybe_add_record = [&] { auto maybe_add_record = [&] {
if (!dynamodb.ObjectEmpty()) { if (!dynamodb.ObjectEmpty()) {
rjson::add(record, "dynamodb", std::move(dynamodb)); rjson::add(record, "dynamodb", std::move(dynamodb));
dynamodb = rjson::empty_object(); dynamodb = rjson::empty_object();
} }
if (!record.ObjectEmpty()) { if (!record.ObjectEmpty()) {
rjson::add(record, "awsRegion", rjson::from_string(dc_name)); rjson::add(record, "awsRegion", rjson::from_string(dc_name));
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp)); rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
rjson::add(record, "eventSource", "scylladb:alternator"); rjson::add(record, "eventSource", "scylladb:alternator");
rjson::add(record, "eventVersion", "1.1"); rjson::add(record, "eventVersion", "1.1");
rjson::push_back(records, std::move(record)); rjson::push_back(records, std::move(record));
record = rjson::empty_object(); record = rjson::empty_object();
--limit; --limit;
} }
}; };
for (auto& row : result_set->rows()) { for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index]))); auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index])); auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false; auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
if (!dynamodb.HasMember("Keys")) { if (!dynamodb.HasMember("Keys")) {
auto keys = rjson::empty_object(); auto keys = rjson::empty_object();
describe_single_item(*selection, row, key_names, keys); describe_single_item(*selection, row, key_names, keys);
rjson::add(dynamodb, "Keys", std::move(keys)); rjson::add(dynamodb, "Keys", std::move(keys));
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count()); rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts)); rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(dynamodb, "StreamViewType", type); rjson::add(dynamodb, "StreamViewType", type);
// TODO: SizeBytes // TODO: SizeBytes
} }
/** /**
* We merge rows with same timestamp into a single event. * We merge rows with same timestamp into a single event.
* This is pretty much needed, because a CDC row typically * This is pretty much needed, because a CDC row typically
* encodes ~half the info of an alternator write. * encodes ~half the info of an alternator write.
* *
* A big, big downside to how alternator records are written * A big, big downside to how alternator records are written
* (i.e. CQL), is that the distinction between INSERT and UPDATE * (i.e. CQL), is that the distinction between INSERT and UPDATE
* is somewhat lost/unmappable to actual eventName. * is somewhat lost/unmappable to actual eventName.
* A write (currently) always looks like an insert+modify * A write (currently) always looks like an insert+modify
* regardless whether we wrote existing record or not. * regardless whether we wrote existing record or not.
* *
* Maybe RMW ops could be done slightly differently so * Maybe RMW ops could be done slightly differently so
* we can distinguish them here... * we can distinguish them here...
* *
* For now, all writes will become MODIFY. * For now, all writes will become MODIFY.
* *
* Note: we do not check the current pre/post * Note: we do not check the current pre/post
* flags on CDC log, instead we use data to * flags on CDC log, instead we use data to
* drive what is returned. This is (afaict) * drive what is returned. This is (afaict)
* consistent with dynamo streams * consistent with dynamo streams
*/ */
switch (op) { switch (op) {
case cdc::operation::pre_image: case cdc::operation::pre_image:
case cdc::operation::post_image: case cdc::operation::post_image:
{ {
auto item = rjson::empty_object(); auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, nullptr, true); describe_single_item(*selection, row, attr_names, item, nullptr, true);
describe_single_item(*selection, row, key_names, item); describe_single_item(*selection, row, key_names, item);
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item)); rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
break;
}
case cdc::operation::update:
rjson::add(record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
}
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
break; break;
} }
case cdc::operation::update:
rjson::add(record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
}
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
break;
}
}
} }
}
auto ret = rjson::empty_object(); auto ret = rjson::empty_object();
auto nrecords = records.Size(); auto nrecords = records.Size();
rjson::add(ret, "Records", std::move(records)); rjson::add(ret, "Records", std::move(records));
if (nrecords != 0) { if (nrecords != 0) {
// #9642. Set next iterators threshold to > last // #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false); shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
// Note that here we unconditionally return NextShardIterator, // Note that here we unconditionally return NextShardIterator,
// without checking if maybe we reached the end-of-shard. If the // without checking if maybe we reached the end-of-shard. If the
// shard did end, then the next read will have nrecords == 0 and // shard did end, then the next read will have nrecords == 0 and
// will notice end end of shard and not return NextShardIterator. // will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter); rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time); _stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
co_return rjson::print(std::move(ret)); return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
} }
// ugh. figure out if we are and end-of-shard // ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); return _sdks.cdc_current_generation_timestamp({ normal_token_owners }).then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable {
auto& shard = iter.shard; auto& shard = iter.shard;
if (shard.time < ts && ts < high_ts) { if (shard.time < ts && ts < high_ts) {
// The DynamoDB documentation states that when a shard is // The DynamoDB documentation states that when a shard is
// closed, reading it until the end has NextShardIterator // closed, reading it until the end has NextShardIterator
// "set to null". Our test test_streams_closed_read // "set to null". Our test test_streams_closed_read
// confirms that by "null" they meant not set at all. // confirms that by "null" they meant not set at all.
} else { } else {
// We could have return the same iterator again, but we did // We could have return the same iterator again, but we did
// a search from it until high_ts and found nothing, so we // a search from it until high_ts and found nothing, so we
// can also start the next search from high_ts. // can also start the next search from high_ts.
// TODO: but why? It's simpler just to leave the iterator be. // TODO: but why? It's simpler just to leave the iterator be.
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true); shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
rjson::add(ret, "NextShardIterator", iter); rjson::add(ret, "NextShardIterator", iter);
} }
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time); _stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(ret)) { if (is_big(ret)) {
co_return make_streamed(std::move(ret)); return make_ready_future<executor::request_return_type>(make_streamed(std::move(ret)));
} }
co_return rjson::print(std::move(ret)); return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
});
} }
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) { bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {

View File

@@ -3051,7 +3051,7 @@
}, },
{ {
"name":"incremental_mode", "name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.", "description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"required":false, "required":false,
"allowMultiple":false, "allowMultiple":false,
"type":"string", "type":"string",

View File

@@ -2016,14 +2016,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag"); auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ","); auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf"); auto sfopt = req->get_query_param("sf");
db::snapshot_options opts = { auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
std::vector<sstring> keynames = split(req->get_query_param("kn"), ","); std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try { try {
if (column_families.empty()) { if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, opts); co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
} else { } else {
if (keynames.empty()) { if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified"); throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2031,7 +2029,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) { if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family"); throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
} }
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts); co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
} }
co_return json_void(); co_return json_void();
} catch (...) { } catch (...) {
@@ -2066,8 +2064,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req)); auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) { if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false}; co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
} }
compaction::compaction_stats stats; compaction::compaction_stats stats;

View File

@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req)); auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) { if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false}; co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
} }
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -209,11 +209,15 @@ future<> audit::stop_audit() {
}); });
} }
audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch) { audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table) {
if (!audit_instance().local_is_initialized()) { if (!audit_instance().local_is_initialized()) {
return nullptr; return nullptr;
} }
return std::make_unique<audit_info>(cat, keyspace, table, batch); return std::make_unique<audit_info>(cat, keyspace, table);
}
audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
} }
future<> audit::start(const db::config& cfg) { future<> audit::start(const db::config& cfg) {
@@ -263,21 +267,18 @@ future<> audit::log_login(const sstring& username, socket_address client_ip, boo
} }
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error) { future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error) {
auto audit_info = statement->get_audit_info(); cql3::statements::batch_statement* batch = dynamic_cast<cql3::statements::batch_statement*>(statement.get());
if (!audit_info) { if (batch != nullptr) {
return make_ready_future<>();
}
if (audit_info->batch()) {
cql3::statements::batch_statement* batch = static_cast<cql3::statements::batch_statement*>(statement.get());
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) { return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
return inspect(m.statement, query_state, options, error); return inspect(m.statement, query_state, options, error);
}); });
} else { } else {
if (audit::local_audit_instance().should_log(audit_info)) { auto audit_info = statement->get_audit_info();
if (bool(audit_info) && audit::local_audit_instance().should_log(audit_info)) {
return audit::local_audit_instance().log(audit_info, query_state, options, error); return audit::local_audit_instance().log(audit_info, query_state, options, error);
} }
return make_ready_future<>();
} }
return make_ready_future<>();
} }
future<> inspect_login(const sstring& username, socket_address client_ip, bool error) { future<> inspect_login(const sstring& username, socket_address client_ip, bool error) {

View File

@@ -75,13 +75,11 @@ class audit_info final {
sstring _keyspace; sstring _keyspace;
sstring _table; sstring _table;
sstring _query; sstring _query;
bool _batch;
public: public:
audit_info(statement_category cat, sstring keyspace, sstring table, bool batch) audit_info(statement_category cat, sstring keyspace, sstring table)
: _category(cat) : _category(cat)
, _keyspace(std::move(keyspace)) , _keyspace(std::move(keyspace))
, _table(std::move(table)) , _table(std::move(table))
, _batch(batch)
{ } { }
void set_query_string(const std::string_view& query_string) { void set_query_string(const std::string_view& query_string) {
_query = sstring(query_string); _query = sstring(query_string);
@@ -91,7 +89,6 @@ public:
const sstring& query() const { return _query; } const sstring& query() const { return _query; }
sstring category_string() const; sstring category_string() const;
statement_category category() const { return _category; } statement_category category() const { return _category; }
bool batch() const { return _batch; }
}; };
using audit_info_ptr = std::unique_ptr<audit_info>; using audit_info_ptr = std::unique_ptr<audit_info>;
@@ -129,7 +126,8 @@ public:
} }
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm); static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit(); static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false); static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm, audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp, cql3::query_processor& qp,
service::migration_manager& mm, service::migration_manager& mm,

View File

@@ -53,10 +53,10 @@ static std::string json_escape(std::string_view str) {
} }
future<> audit_syslog_storage_helper::syslog_send_helper(temporary_buffer<char> msg) { future<> audit_syslog_storage_helper::syslog_send_helper(const sstring& msg) {
try { try {
auto lock = co_await get_units(_semaphore, 1, std::chrono::hours(1)); auto lock = co_await get_units(_semaphore, 1, std::chrono::hours(1));
co_await _sender.send(_syslog_address, std::span(&msg, 1)); co_await _sender.send(_syslog_address, net::packet{msg.data(), msg.size()});
} }
catch (const std::exception& e) { catch (const std::exception& e) {
auto error_msg = seastar::format( auto error_msg = seastar::format(
@@ -90,7 +90,7 @@ future<> audit_syslog_storage_helper::start(const db::config& cfg) {
co_return; co_return;
} }
co_await syslog_send_helper(temporary_buffer<char>::copy_of("Initializing syslog audit backend.")); co_await syslog_send_helper("Initializing syslog audit backend.");
} }
future<> audit_syslog_storage_helper::stop() { future<> audit_syslog_storage_helper::stop() {
@@ -120,7 +120,7 @@ future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
audit_info->table(), audit_info->table(),
username); username);
co_await syslog_send_helper(std::move(msg).release()); co_await syslog_send_helper(msg);
} }
future<> audit_syslog_storage_helper::write_login(const sstring& username, future<> audit_syslog_storage_helper::write_login(const sstring& username,
@@ -139,7 +139,7 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
client_ip, client_ip,
username); username);
co_await syslog_send_helper(std::move(msg).release()); co_await syslog_send_helper(msg.c_str());
} }
} }

View File

@@ -26,7 +26,7 @@ class audit_syslog_storage_helper : public storage_helper {
net::datagram_channel _sender; net::datagram_channel _sender;
seastar::semaphore _semaphore; seastar::semaphore _semaphore;
future<> syslog_send_helper(seastar::temporary_buffer<char> msg); future<> syslog_send_helper(const sstring& msg);
public: public:
explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&); explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&);
virtual ~audit_syslog_storage_helper(); virtual ~audit_syslog_storage_helper();

View File

@@ -876,6 +876,22 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
continue; // some tables might not have been created if they were not used continue; // some tables might not have been created if they were not used
} }
// use longer than usual timeout as we scan the whole table
// but not infinite or very long as we want to fail reasonably fast
const auto t = 5min;
const timeout_config tc{t, t, t, t, t, t, t};
::service::client_state cs(::service::client_state::internal_tag{}, tc);
::service::query_state qs(cs, empty_service_permit());
auto rows = co_await qp.execute_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
qs,
{},
cql3::query_processor::cache_internal::no);
if (rows->empty()) {
continue;
}
std::vector<sstring> col_names; std::vector<sstring> col_names;
for (const auto& col : schema->all_columns()) { for (const auto& col : schema->all_columns()) {
col_names.push_back(col.name_as_cql_string()); col_names.push_back(col.name_as_cql_string());
@@ -884,51 +900,30 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
for (size_t i = 1; i < col_names.size(); ++i) { for (size_t i = 1; i < col_names.size(); ++i) {
val_binders_str += ", ?"; val_binders_str += ", ?";
} }
for (const auto& row : *rows) {
std::vector<mutation> collected; std::vector<data_value_or_unset> values;
// use longer than usual timeout as we scan the whole table for (const auto& col : schema->all_columns()) {
// but not infinite or very long as we want to fail reasonably fast if (row.has(col.name_as_text())) {
const auto t = 5min; values.push_back(
const timeout_config tc{t, t, t, t, t, t, t}; col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
::service::client_state cs(::service::client_state::internal_tag{}, tc); } else {
::service::query_state qs(cs, empty_service_permit()); values.push_back(unset_value{});
co_await qp.query_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
{},
1000,
[&qp, &cf_name, &col_names, &val_binders_str, &schema, ts, &collected] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
std::vector<data_value_or_unset> values;
for (const auto& col : schema->all_columns()) {
if (row.has(col.name_as_text())) {
values.push_back(
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
} else {
values.push_back(unset_value{});
}
} }
auto muts = co_await qp.get_mutations_internal( }
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})", auto muts = co_await qp.get_mutations_internal(
db::system_keyspace::NAME, seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
cf_name, db::system_keyspace::NAME,
fmt::join(col_names, ", "), cf_name,
val_binders_str), fmt::join(col_names, ", "),
internal_distributed_query_state(), val_binders_str),
ts, internal_distributed_query_state(),
std::move(values)); ts,
if (muts.size() != 1) { std::move(values));
on_internal_error(log, if (muts.size() != 1) {
format("expecting single insert mutation, got {}", muts.size())); on_internal_error(log,
} format("expecting single insert mutation, got {}", muts.size()));
}
collected.push_back(std::move(muts[0])); co_yield std::move(muts[0]);
co_return stop_iteration::no;
},
std::move(qs));
for (auto& m : collected) {
co_yield std::move(m);
} }
} }
co_yield co_await sys_ks.make_auth_version_mutation(ts, co_yield co_await sys_ks.make_auth_version_mutation(ts,

View File

@@ -52,6 +52,13 @@ static const class_registrator<
::service::migration_manager&, ::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager"); cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept { static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
if (role_name == meta::DEFAULT_SUPERUSER_NAME) { if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM; return db::consistency_level::QUORUM;
@@ -60,13 +67,13 @@ static db::consistency_level consistency_for_role(std::string_view role_name) no
return db::consistency_level::LOCAL_ONE; return db::consistency_level::LOCAL_ONE;
} }
future<std::optional<standard_role_manager::record>> standard_role_manager::legacy_find_record(std::string_view role_name) { static future<std::optional<record>> find_record(cql3::query_processor& qp, std::string_view role_name) {
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE {} = ?", const sstring query = seastar::format("SELECT * FROM {}.{} WHERE {} = ?",
get_auth_ks_name(_qp), get_auth_ks_name(qp),
meta::roles_table::name, meta::roles_table::name,
meta::roles_table::role_col_name); meta::roles_table::role_col_name);
const auto results = co_await _qp.execute_internal( const auto results = co_await qp.execute_internal(
query, query,
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_query_state(), internal_distributed_query_state(),
@@ -86,25 +93,8 @@ future<std::optional<standard_role_manager::record>> standard_role_manager::lega
: role_set())}); : role_set())});
} }
future<std::optional<standard_role_manager::record>> standard_role_manager::find_record(std::string_view role_name) { static future<record> require_record(cql3::query_processor& qp, std::string_view role_name) {
if (legacy_mode(_qp)) { return find_record(qp, role_name).then([role_name](std::optional<record> mr) {
return legacy_find_record(role_name);
}
auto name = sstring(role_name);
auto role = _cache.get(name);
if (!role) {
return make_ready_future<std::optional<record>>(std::nullopt);
}
return make_ready_future<std::optional<record>>(std::make_optional(record{
.name = std::move(name),
.is_superuser = role->is_superuser,
.can_login = role->can_login,
.member_of = role->member_of
}));
}
future<standard_role_manager::record> standard_role_manager::require_record(std::string_view role_name) {
return find_record(role_name).then([role_name](std::optional<record> mr) {
if (!mr) { if (!mr) {
throw nonexistant_role(role_name); throw nonexistant_role(role_name);
} }
@@ -396,7 +386,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
return fmt::to_string(fmt::join(assignments, ", ")); return fmt::to_string(fmt::join(assignments, ", "));
}; };
return require_record(role_name).then([this, role_name, &u, &mc](record) { return require_record(_qp, role_name).then([this, role_name, &u, &mc](record) {
if (!u.is_superuser && !u.can_login) { if (!u.is_superuser && !u.can_login) {
return make_ready_future<>(); return make_ready_future<>();
} }
@@ -630,17 +620,18 @@ standard_role_manager::revoke(std::string_view revokee_name, std::string_view ro
}); });
} }
future<> standard_role_manager::collect_roles( static future<> collect_roles(
cql3::query_processor& qp,
std::string_view grantee_name, std::string_view grantee_name,
bool recurse, bool recurse,
role_set& roles) { role_set& roles) {
return require_record(grantee_name).then([this, &roles, recurse](standard_role_manager::record r) { return require_record(qp, grantee_name).then([&qp, &roles, recurse](record r) {
return do_with(std::move(r.member_of), [this, &roles, recurse](const role_set& memberships) { return do_with(std::move(r.member_of), [&qp, &roles, recurse](const role_set& memberships) {
return do_for_each(memberships.begin(), memberships.end(), [this, &roles, recurse](const sstring& role_name) { return do_for_each(memberships.begin(), memberships.end(), [&qp, &roles, recurse](const sstring& role_name) {
roles.insert(role_name); roles.insert(role_name);
if (recurse) { if (recurse) {
return collect_roles(role_name, true, roles); return collect_roles(qp, role_name, true, roles);
} }
return make_ready_future<>(); return make_ready_future<>();
@@ -655,7 +646,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
return do_with( return do_with(
role_set{sstring(grantee_name)}, role_set{sstring(grantee_name)},
[this, grantee_name, recurse](role_set& roles) { [this, grantee_name, recurse](role_set& roles) {
return collect_roles(grantee_name, recurse, roles).then([&roles] { return roles; }); return collect_roles(_qp, grantee_name, recurse, roles).then([&roles] { return roles; });
}); });
} }
@@ -715,21 +706,27 @@ future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
} }
future<bool> standard_role_manager::exists(std::string_view role_name) { future<bool> standard_role_manager::exists(std::string_view role_name) {
return find_record(role_name).then([](std::optional<record> mr) { return find_record(_qp, role_name).then([](std::optional<record> mr) {
return static_cast<bool>(mr); return static_cast<bool>(mr);
}); });
} }
future<bool> standard_role_manager::is_superuser(std::string_view role_name) { future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
return require_record(role_name).then([](record r) { return require_record(_qp, role_name).then([](record r) {
return r.is_superuser; return r.is_superuser;
}); });
} }
future<bool> standard_role_manager::can_login(std::string_view role_name) { future<bool> standard_role_manager::can_login(std::string_view role_name) {
return require_record(role_name).then([](record r) { if (legacy_mode(_qp)) {
return r.can_login; const auto r = co_await require_record(_qp, role_name);
}); co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
} }
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) { future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {

View File

@@ -90,12 +90,6 @@ public:
private: private:
enum class membership_change { add, remove }; enum class membership_change { add, remove };
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
future<> create_legacy_metadata_tables_if_missing() const; future<> create_legacy_metadata_tables_if_missing() const;
@@ -113,14 +107,6 @@ private:
future<> legacy_modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change); future<> legacy_modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change);
future<> modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change, ::service::group0_batch& mc); future<> modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change, ::service::group0_batch& mc);
future<std::optional<record>> legacy_find_record(std::string_view role_name);
future<std::optional<record>> find_record(std::string_view role_name);
future<record> require_record(std::string_view role_name);
future<> collect_roles(
std::string_view grantee_name,
bool recurse,
role_set& roles);
}; };
} // namespace auth } // namespace auth

View File

@@ -725,9 +725,7 @@ raft_tests = set([
vector_search_tests = set([ vector_search_tests = set([
'test/vector_search/vector_store_client_test', 'test/vector_search/vector_store_client_test',
'test/vector_search/load_balancer_test', 'test/vector_search/load_balancer_test',
'test/vector_search/client_test', 'test/vector_search/client_test'
'test/vector_search/filter_test',
'test/vector_search/rescoring_test'
]) ])
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator' vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
@@ -817,9 +815,6 @@ arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clan
help='C compiler path') help='C compiler path')
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto', arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary') help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
# Workaround for https://github.com/mozilla/sccache/issues/2575
arg_parser.add_argument('--sccache-rust', action=argparse.BooleanOptionalAction, default=False,
help='Use sccache for rust code (if sccache is selected as compiler cache). Doesn\'t work with distributed builds.')
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False, add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
help='Use dpdk (from seastar dpdk sources)') help='Use dpdk (from seastar dpdk sources)')
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='', arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
@@ -950,7 +945,8 @@ scylla_core = (['message/messaging_service.cc',
'utils/crypt_sha512.cc', 'utils/crypt_sha512.cc',
'utils/logalloc.cc', 'utils/logalloc.cc',
'utils/large_bitset.cc', 'utils/large_bitset.cc',
'test/lib/limiting_data_source.cc', 'utils/buffer_input_stream.cc',
'utils/limiting_data_source.cc',
'utils/updateable_value.cc', 'utils/updateable_value.cc',
'message/dictionary_service.cc', 'message/dictionary_service.cc',
'utils/directories.cc', 'utils/directories.cc',
@@ -1038,9 +1034,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/functions/aggregate_fcts.cc', 'cql3/functions/aggregate_fcts.cc',
'cql3/functions/castas_fcts.cc', 'cql3/functions/castas_fcts.cc',
'cql3/functions/error_injection_fcts.cc', 'cql3/functions/error_injection_fcts.cc',
'cql3/statements/strong_consistency/modification_statement.cc',
'cql3/statements/strong_consistency/select_statement.cc',
'cql3/statements/strong_consistency/statement_helpers.cc',
'cql3/functions/vector_similarity_fcts.cc', 'cql3/functions/vector_similarity_fcts.cc',
'cql3/statements/cf_prop_defs.cc', 'cql3/statements/cf_prop_defs.cc',
'cql3/statements/cf_statement.cc', 'cql3/statements/cf_statement.cc',
@@ -1066,8 +1059,8 @@ scylla_core = (['message/messaging_service.cc',
'cql3/statements/raw/parsed_statement.cc', 'cql3/statements/raw/parsed_statement.cc',
'cql3/statements/property_definitions.cc', 'cql3/statements/property_definitions.cc',
'cql3/statements/update_statement.cc', 'cql3/statements/update_statement.cc',
'cql3/statements/broadcast_modification_statement.cc', 'cql3/statements/strongly_consistent_modification_statement.cc',
'cql3/statements/broadcast_select_statement.cc', 'cql3/statements/strongly_consistent_select_statement.cc',
'cql3/statements/delete_statement.cc', 'cql3/statements/delete_statement.cc',
'cql3/statements/prune_materialized_view_statement.cc', 'cql3/statements/prune_materialized_view_statement.cc',
'cql3/statements/batch_statement.cc', 'cql3/statements/batch_statement.cc',
@@ -1358,9 +1351,6 @@ scylla_core = (['message/messaging_service.cc',
'lang/wasm.cc', 'lang/wasm.cc',
'lang/wasm_alien_thread_runner.cc', 'lang/wasm_alien_thread_runner.cc',
'lang/wasm_instance_cache.cc', 'lang/wasm_instance_cache.cc',
'service/strong_consistency/groups_manager.cc',
'service/strong_consistency/coordinator.cc',
'service/strong_consistency/state_machine.cc',
'service/raft/group0_state_id_handler.cc', 'service/raft/group0_state_id_handler.cc',
'service/raft/group0_state_machine.cc', 'service/raft/group0_state_machine.cc',
'service/raft/group0_state_machine_merger.cc', 'service/raft/group0_state_machine_merger.cc',
@@ -1390,7 +1380,6 @@ scylla_core = (['message/messaging_service.cc',
'vector_search/dns.cc', 'vector_search/dns.cc',
'vector_search/client.cc', 'vector_search/client.cc',
'vector_search/clients.cc', 'vector_search/clients.cc',
'vector_search/filter.cc',
'vector_search/truststore.cc' 'vector_search/truststore.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \ ] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core + scylla_raft_core
@@ -1500,7 +1489,6 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/hinted_handoff.idl.hh', 'idl/hinted_handoff.idl.hh',
'idl/storage_proxy.idl.hh', 'idl/storage_proxy.idl.hh',
'idl/sstables.idl.hh', 'idl/sstables.idl.hh',
'idl/strong_consistency/state_machine.idl.hh',
'idl/group0_state_machine.idl.hh', 'idl/group0_state_machine.idl.hh',
'idl/mapreduce_request.idl.hh', 'idl/mapreduce_request.idl.hh',
'idl/replica_exception.idl.hh', 'idl/replica_exception.idl.hh',
@@ -1559,7 +1547,6 @@ scylla_perfs = ['test/perf/perf_alternator.cc',
'test/perf/perf_fast_forward.cc', 'test/perf/perf_fast_forward.cc',
'test/perf/perf_row_cache_update.cc', 'test/perf/perf_row_cache_update.cc',
'test/perf/perf_simple_query.cc', 'test/perf/perf_simple_query.cc',
'test/perf/perf_cql_raw.cc',
'test/perf/perf_sstable.cc', 'test/perf/perf_sstable.cc',
'test/perf/perf_tablets.cc', 'test/perf/perf_tablets.cc',
'test/perf/tablet_load_balancing.cc', 'test/perf/tablet_load_balancing.cc',
@@ -1797,8 +1784,6 @@ deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/filter_test'] = ['test/vector_search/filter_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/rescoring_test'] = ['test/vector_search/rescoring_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"] boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
@@ -2408,7 +2393,7 @@ def write_build_file(f,
# If compiler cache is available, prefix the compiler with it # If compiler cache is available, prefix the compiler with it
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
# For Rust, sccache is used via RUSTC_WRAPPER environment variable # For Rust, sccache is used via RUSTC_WRAPPER environment variable
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache and args.sccache_rust else '' rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache else ''
f.write(textwrap.dedent('''\ f.write(textwrap.dedent('''\
configure_args = {configure_args} configure_args = {configure_args}
builddir = {outdir} builddir = {outdir}
@@ -3152,7 +3137,7 @@ def configure_using_cmake(args):
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
# For Rust, sccache is used via RUSTC_WRAPPER # For Rust, sccache is used via RUSTC_WRAPPER
if 'sccache' in compiler_cache and args.sccache_rust: if 'sccache' in compiler_cache:
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
if args.date_stamp: if args.date_stamp:

View File

@@ -47,9 +47,6 @@ target_sources(cql3
functions/aggregate_fcts.cc functions/aggregate_fcts.cc
functions/castas_fcts.cc functions/castas_fcts.cc
functions/error_injection_fcts.cc functions/error_injection_fcts.cc
statements/strong_consistency/select_statement.cc
statements/strong_consistency/modification_statement.cc
statements/strong_consistency/statement_helpers.cc
functions/vector_similarity_fcts.cc functions/vector_similarity_fcts.cc
statements/cf_prop_defs.cc statements/cf_prop_defs.cc
statements/cf_statement.cc statements/cf_statement.cc
@@ -75,8 +72,8 @@ target_sources(cql3
statements/raw/parsed_statement.cc statements/raw/parsed_statement.cc
statements/property_definitions.cc statements/property_definitions.cc
statements/update_statement.cc statements/update_statement.cc
statements/broadcast_modification_statement.cc statements/strongly_consistent_modification_statement.cc
statements/broadcast_select_statement.cc statements/strongly_consistent_select_statement.cc
statements/delete_statement.cc statements/delete_statement.cc
statements/prune_materialized_view_statement.cc statements/prune_materialized_view_statement.cc
statements/batch_statement.cc statements/batch_statement.cc

View File

@@ -48,10 +48,8 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono
struct query_processor::remote { struct query_processor::remote {
remote(service::migration_manager& mm, service::mapreduce_service& fwd, remote(service::migration_manager& mm, service::mapreduce_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client, service::storage_service& ss, service::raft_group0_client& group0_client)
service::strong_consistency::coordinator& _sc_coordinator)
: mm(mm), mapreducer(fwd), ss(ss), group0_client(group0_client) : mm(mm), mapreducer(fwd), ss(ss), group0_client(group0_client)
, sc_coordinator(_sc_coordinator)
, gate("query_processor::remote") , gate("query_processor::remote")
{} {}
@@ -59,7 +57,6 @@ struct query_processor::remote {
service::mapreduce_service& mapreducer; service::mapreduce_service& mapreducer;
service::storage_service& ss; service::storage_service& ss;
service::raft_group0_client& group0_client; service::raft_group0_client& group0_client;
service::strong_consistency::coordinator& sc_coordinator;
seastar::named_gate gate; seastar::named_gate gate;
}; };
@@ -517,16 +514,9 @@ query_processor::~query_processor() {
} }
} }
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
query_processor::acquire_strongly_consistent_coordinator() {
auto [remote_, holder] = remote();
return {remote_.get().sc_coordinator, std::move(holder)};
}
void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer, void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer,
service::storage_service& ss, service::raft_group0_client& group0_client, service::storage_service& ss, service::raft_group0_client& group0_client) {
service::strong_consistency::coordinator& sc_coordinator) { _remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client);
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client, sc_coordinator);
} }
future<> query_processor::stop_remote() { future<> query_processor::stop_remote() {
@@ -870,7 +860,6 @@ struct internal_query_state {
sstring query_string; sstring query_string;
std::unique_ptr<query_options> opts; std::unique_ptr<query_options> opts;
statements::prepared_statement::checked_weak_ptr p; statements::prepared_statement::checked_weak_ptr p;
std::optional<service::query_state> qs;
bool more_results = true; bool more_results = true;
}; };
@@ -878,14 +867,10 @@ internal_query_state query_processor::create_paged_state(
const sstring& query_string, const sstring& query_string,
db::consistency_level cl, db::consistency_level cl,
const data_value_list& values, const data_value_list& values,
int32_t page_size, int32_t page_size) {
std::optional<service::query_state> qs) {
auto p = prepare_internal(query_string); auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, cl, page_size); auto opts = make_internal_options(p, values, cl, page_size);
if (!qs) { return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), true};
qs.emplace(query_state_for_internal_call());
}
return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), std::move(qs), true};
} }
bool query_processor::has_more_results(cql3::internal_query_state& state) const { bool query_processor::has_more_results(cql3::internal_query_state& state) const {
@@ -908,8 +893,9 @@ future<> query_processor::for_each_cql_result(
future<::shared_ptr<untyped_result_set>> future<::shared_ptr<untyped_result_set>>
query_processor::execute_paged_internal(internal_query_state& state) { query_processor::execute_paged_internal(internal_query_state& state) {
state.p->statement->validate(*this, service::client_state::for_internal_calls()); state.p->statement->validate(*this, service::client_state::for_internal_calls());
auto qs = query_state_for_internal_call();
::shared_ptr<cql_transport::messages::result_message> msg = ::shared_ptr<cql_transport::messages::result_message> msg =
co_await state.p->statement->execute(*this, *state.qs, *state.opts, std::nullopt); co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
class visitor : public result_message::visitor_base { class visitor : public result_message::visitor_base {
internal_query_state& _state; internal_query_state& _state;
@@ -1216,9 +1202,8 @@ future<> query_processor::query_internal(
db::consistency_level cl, db::consistency_level cl,
const data_value_list& values, const data_value_list& values,
int32_t page_size, int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f, noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f) {
std::optional<service::query_state> qs) { auto query_state = create_paged_state(query_string, cl, values, page_size);
auto query_state = create_paged_state(query_string, cl, values, page_size, std::move(qs));
co_return co_await for_each_cql_result(query_state, std::move(f)); co_return co_await for_each_cql_result(query_state, std::move(f));
} }

View File

@@ -44,10 +44,6 @@ class query_state;
class mapreduce_service; class mapreduce_service;
class raft_group0_client; class raft_group0_client;
namespace strong_consistency {
class coordinator;
}
namespace broadcast_tables { namespace broadcast_tables {
struct query; struct query;
} }
@@ -159,8 +155,7 @@ public:
~query_processor(); ~query_processor();
void start_remote(service::migration_manager&, service::mapreduce_service&, void start_remote(service::migration_manager&, service::mapreduce_service&,
service::storage_service& ss, service::raft_group0_client&, service::storage_service& ss, service::raft_group0_client&);
service::strong_consistency::coordinator&);
future<> stop_remote(); future<> stop_remote();
data_dictionary::database db() { data_dictionary::database db() {
@@ -179,9 +174,6 @@ public:
return _proxy; return _proxy;
} }
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
acquire_strongly_consistent_coordinator();
cql_stats& get_cql_stats() { cql_stats& get_cql_stats() {
return _cql_stats; return _cql_stats;
} }
@@ -330,7 +322,6 @@ public:
* page_size - maximum page size * page_size - maximum page size
* f - a function to be run on each row of the query result, * f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop * if the function returns stop_iteration::yes the iteration will stop
* qs - optional query state (default: std::nullopt)
* *
* \note This function is optimized for convenience, not performance. * \note This function is optimized for convenience, not performance.
*/ */
@@ -339,8 +330,7 @@ public:
db::consistency_level cl, db::consistency_level cl,
const data_value_list& values, const data_value_list& values,
int32_t page_size, int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f, noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
std::optional<service::query_state> qs = std::nullopt);
/* /*
* \brief iterate over all cql results using paging * \brief iterate over all cql results using paging
@@ -509,8 +499,7 @@ private:
const sstring& query_string, const sstring& query_string,
db::consistency_level, db::consistency_level,
const data_value_list& values, const data_value_list& values,
int32_t page_size, int32_t page_size);
std::optional<service::query_state> qs = std::nullopt);
/*! /*!
* \brief run a query using paging * \brief run a query using paging

View File

@@ -46,13 +46,6 @@ void metadata::add_non_serialized_column(lw_shared_ptr<column_specification> nam
_column_info->_names.emplace_back(std::move(name)); _column_info->_names.emplace_back(std::move(name));
} }
void metadata::hide_last_column() {
if (_column_info->_column_count == 0) {
utils::on_internal_error("Trying to hide a column when there are no columns visible.");
}
_column_info->_column_count--;
}
void metadata::set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state) { void metadata::set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state) {
_flags.set<flag::HAS_MORE_PAGES>(); _flags.set<flag::HAS_MORE_PAGES>();
_paging_state = std::move(paging_state); _paging_state = std::move(paging_state);

View File

@@ -73,7 +73,6 @@ public:
uint32_t value_count() const; uint32_t value_count() const;
void add_non_serialized_column(lw_shared_ptr<column_specification> name); void add_non_serialized_column(lw_shared_ptr<column_specification> name);
void hide_last_column();
public: public:
void set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state); void set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state);

View File

@@ -225,9 +225,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// The second hyphen is not really true because currently topological changes can // The second hyphen is not really true because currently topological changes can
// disturb it (see scylladb/scylladb#23345), but we ignore that. // disturb it (see scylladb/scylladb#23345), but we ignore that.
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs); locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
} catch (const std::invalid_argument& e) { } catch (const std::exception& e) {
if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) { if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) {
// wrap the exception manually here in a type that can be passed to the user. // There's no guarantee what the type of the exception will be, so we need to
// wrap it manually here in a type that can be passed to the user.
throw exceptions::invalid_request_exception(e.what()); throw exceptions::invalid_request_exception(e.what());
} else { } else {
// Even when RF-rack-validity is not enforced for the keyspace, we'd // Even when RF-rack-validity is not enforced for the keyspace, we'd

View File

@@ -123,9 +123,10 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
// We hold a group0_guard, so it's correct to check this here. // We hold a group0_guard, so it's correct to check this here.
// The topology or schema cannot change while we're performing this query. // The topology or schema cannot change while we're performing this query.
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs); locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
} catch (const std::invalid_argument& e) { } catch (const std::exception& e) {
if (replica::database::enforce_rf_rack_validity_for_keyspace(cfg, *ksm)) { if (replica::database::enforce_rf_rack_validity_for_keyspace(cfg, *ksm)) {
// wrap the exception in a type that can be passed to the user. // There's no guarantee what the type of the exception will be, so we need to
// wrap it manually here in a type that can be passed to the user.
throw exceptions::invalid_request_exception(e.what()); throw exceptions::invalid_request_exception(e.what());
} else { } else {
// Even when RF-rack-validity is not enforced for the keyspace, we'd // Even when RF-rack-validity is not enforced for the keyspace, we'd

View File

@@ -31,6 +31,8 @@
#include "db/config.hh" #include "db/config.hh"
#include "compaction/time_window_compaction_strategy.hh" #include "compaction/time_window_compaction_strategy.hh"
bool is_internal_keyspace(std::string_view name);
namespace cql3 { namespace cql3 {
namespace statements { namespace statements {
@@ -122,6 +124,10 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE); addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
#endif #endif
if (!_properties->get_compression_options() && !is_internal_keyspace(keyspace())) {
builder.set_compressor_params(db.get_config().sstable_compression_user_table_options());
}
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true); _properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
} }

View File

@@ -98,7 +98,6 @@ static locator::replication_strategy_config_options prepare_options(
const sstring& strategy_class, const sstring& strategy_class,
const locator::token_metadata& tm, const locator::token_metadata& tm,
bool rf_rack_valid_keyspaces, bool rf_rack_valid_keyspaces,
bool enforce_rack_list,
locator::replication_strategy_config_options options, locator::replication_strategy_config_options options,
const locator::replication_strategy_config_options& old_options, const locator::replication_strategy_config_options& old_options,
bool rack_list_enabled, bool rack_list_enabled,
@@ -108,7 +107,7 @@ static locator::replication_strategy_config_options prepare_options(
auto is_nts = locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) == "org.apache.cassandra.locator.NetworkTopologyStrategy"; auto is_nts = locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) == "org.apache.cassandra.locator.NetworkTopologyStrategy";
auto is_alter = !old_options.empty(); auto is_alter = !old_options.empty();
const auto& all_dcs = tm.get_datacenter_racks_token_owners(); const auto& all_dcs = tm.get_datacenter_racks_token_owners();
auto auto_expand_racks = uses_tablets && rack_list_enabled && (rf_rack_valid_keyspaces || enforce_rack_list); auto auto_expand_racks = uses_tablets && rf_rack_valid_keyspaces && rack_list_enabled;
logger.debug("prepare_options: {}: is_nts={} auto_expand_racks={} rack_list_enabled={} old_options={} new_options={} all_dcs={}", logger.debug("prepare_options: {}: is_nts={} auto_expand_racks={} rack_list_enabled={} old_options={} new_options={} all_dcs={}",
strategy_class, is_nts, auto_expand_racks, rack_list_enabled, old_options, options, all_dcs); strategy_class, is_nts, auto_expand_racks, rack_list_enabled, old_options, options, all_dcs);
@@ -418,7 +417,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets()); auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
bool uses_tablets = initial_tablets.has_value(); bool uses_tablets = initial_tablets.has_value();
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf; bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), cfg.enforce_rack_list(), get_replication_options(), {}, rack_list_enabled, uses_tablets); auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc, return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options()); std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
} }
@@ -435,7 +434,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
auto sc = get_replication_strategy_class(); auto sc = get_replication_strategy_class();
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf; bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
if (sc) { if (sc) {
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), cfg.enforce_rack_list(), get_replication_options(), old_options, rack_list_enabled, uses_tablets); options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
} else { } else {
sc = old->strategy_name(); sc = old->strategy_name();
options = old_options; options = old_options;

View File

@@ -11,7 +11,7 @@
#include "utils/assert.hh" #include "utils/assert.hh"
#include "cql3/cql_statement.hh" #include "cql3/cql_statement.hh"
#include "cql3/statements/modification_statement.hh" #include "cql3/statements/modification_statement.hh"
#include "cql3/statements/broadcast_modification_statement.hh" #include "cql3/statements/strongly_consistent_modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh" #include "cql3/statements/raw/modification_statement.hh"
#include "cql3/statements/prepared_statement.hh" #include "cql3/statements/prepared_statement.hh"
#include "cql3/expr/expr-utils.hh" #include "cql3/expr/expr-utils.hh"
@@ -29,8 +29,6 @@
#include "cql3/query_processor.hh" #include "cql3/query_processor.hh"
#include "service/storage_proxy.hh" #include "service/storage_proxy.hh"
#include "service/broadcast_tables/experimental/lang.hh" #include "service/broadcast_tables/experimental/lang.hh"
#include "cql3/statements/strong_consistency/modification_statement.hh"
#include "cql3/statements/strong_consistency/statement_helpers.hh"
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
@@ -548,7 +546,7 @@ modification_statement::process_where_clause(data_dictionary::database db, expr:
} }
} }
::shared_ptr<broadcast_modification_statement> ::shared_ptr<strongly_consistent_modification_statement>
modification_statement::prepare_for_broadcast_tables() const { modification_statement::prepare_for_broadcast_tables() const {
// FIXME: implement for every type of `modification_statement`. // FIXME: implement for every type of `modification_statement`.
throw service::broadcast_tables::unsupported_operation_error{}; throw service::broadcast_tables::unsupported_operation_error{};
@@ -556,27 +554,24 @@ modification_statement::prepare_for_broadcast_tables() const {
namespace raw { namespace raw {
::shared_ptr<cql_statement_opt_metadata>
modification_statement::prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) {
::shared_ptr<cql3::statements::modification_statement> statement = prepare(db, ctx, stats);
if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
return statement->prepare_for_broadcast_tables();
} else {
return statement;
}
}
std::unique_ptr<prepared_statement> std::unique_ptr<prepared_statement>
modification_statement::prepare(data_dictionary::database db, cql_stats& stats) { modification_statement::prepare(data_dictionary::database db, cql_stats& stats) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto meta = get_prepare_context(); auto meta = get_prepare_context();
auto statement = prepare_statement(db, meta, stats);
auto statement = std::invoke([&] -> shared_ptr<cql_statement> {
auto result = prepare(db, meta, stats);
if (strong_consistency::is_strongly_consistent(db, schema->ks_name())) {
return ::make_shared<strong_consistency::modification_statement>(std::move(result));
}
if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
return result->prepare_for_broadcast_tables();
}
return result;
});
auto partition_key_bind_indices = meta.get_partition_key_bind_indexes(*schema); auto partition_key_bind_indices = meta.get_partition_key_bind_indexes(*schema);
return std::make_unique<prepared_statement>(audit_info(), std::move(statement), meta, return std::make_unique<prepared_statement>(audit_info(), std::move(statement), meta, std::move(partition_key_bind_indices));
std::move(partition_key_bind_indices));
} }
::shared_ptr<cql3::statements::modification_statement> ::shared_ptr<cql3::statements::modification_statement>

View File

@@ -30,7 +30,7 @@ class operation;
namespace statements { namespace statements {
class broadcast_modification_statement; class strongly_consistent_modification_statement;
namespace raw { class modification_statement; } namespace raw { class modification_statement; }
@@ -113,15 +113,15 @@ public:
virtual void add_update_for_key(mutation& m, const query::clustering_range& range, const update_parameters& params, const json_cache_opt& json_cache) const = 0; virtual void add_update_for_key(mutation& m, const query::clustering_range& range, const update_parameters& params, const json_cache_opt& json_cache) const = 0;
uint32_t get_bound_terms() const override; virtual uint32_t get_bound_terms() const override;
const sstring& keyspace() const; virtual const sstring& keyspace() const;
const sstring& column_family() const; virtual const sstring& column_family() const;
bool is_counter() const; virtual bool is_counter() const;
bool is_view() const; virtual bool is_view() const;
int64_t get_timestamp(int64_t now, const query_options& options) const; int64_t get_timestamp(int64_t now, const query_options& options) const;
@@ -129,12 +129,12 @@ public:
std::optional<gc_clock::duration> get_time_to_live(const query_options& options) const; std::optional<gc_clock::duration> get_time_to_live(const query_options& options) const;
future<> check_access(query_processor& qp, const service::client_state& state) const override; virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
// Validate before execute, using client state and current schema // Validate before execute, using client state and current schema
void validate(query_processor&, const service::client_state& state) const override; void validate(query_processor&, const service::client_state& state) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override; virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
void add_operation(::shared_ptr<operation> op); void add_operation(::shared_ptr<operation> op);
@@ -256,9 +256,7 @@ public:
virtual json_cache_opt maybe_prepare_json_cache(const query_options& options) const; virtual json_cache_opt maybe_prepare_json_cache(const query_options& options) const;
virtual ::shared_ptr<broadcast_modification_statement> prepare_for_broadcast_tables() const; virtual ::shared_ptr<strongly_consistent_modification_statement> prepare_for_broadcast_tables() const;
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
protected: protected:
/** /**
@@ -266,7 +264,9 @@ protected:
* processed to check that they are compatible. * processed to check that they are compatible.
* @throws InvalidRequestException * @throws InvalidRequestException
*/ */
void validate_where_clause_for_conditions() const; virtual void validate_where_clause_for_conditions() const;
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
friend class raw::modification_statement; friend class raw::modification_statement;
}; };

View File

@@ -50,8 +50,8 @@ public:
protected: protected:
virtual audit::statement_category category() const override; virtual audit::statement_category category() const override;
virtual audit::audit_info_ptr audit_info() const override { virtual audit::audit_info_ptr audit_info() const override {
constexpr bool batch = true; // We don't audit batch statements. Instead we audit statements that are inside the batch.
return audit::audit::create_audit_info(category(), sstring(), sstring(), batch); return audit::audit::create_no_audit_info();
} }
}; };

View File

@@ -40,6 +40,7 @@ protected:
public: public:
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override; virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
::shared_ptr<cql_statement_opt_metadata> prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats);
::shared_ptr<cql3::statements::modification_statement> prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const; ::shared_ptr<cql3::statements::modification_statement> prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const;
void add_raw(sstring&& raw) { _raw_cql = std::move(raw); } void add_raw(sstring&& raw) { _raw_cql = std::move(raw); }
const sstring& get_raw_cql() const { return _raw_cql; } const sstring& get_raw_cql() const { return _raw_cql; }

View File

@@ -131,6 +131,8 @@ private:
void verify_ordering_is_valid(const prepared_orderings_type&, const schema&, const restrictions::statement_restrictions& restrictions) const; void verify_ordering_is_valid(const prepared_orderings_type&, const schema&, const restrictions::statement_restrictions& restrictions) const;
prepared_ann_ordering_type prepare_ann_ordering(const schema& schema, prepare_context& ctx, data_dictionary::database db) const;
// Checks whether this ordering reverses all results. // Checks whether this ordering reverses all results.
// We only allow leaving select results unchanged or reversing them. // We only allow leaving select results unchanged or reversing them.
bool is_ordering_reversed(const prepared_orderings_type&) const; bool is_ordering_reversed(const prepared_orderings_type&) const;

View File

@@ -8,8 +8,6 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/ */
#include "cql3/statements/strong_consistency/select_statement.hh"
#include "cql3/statements/strong_consistency/statement_helpers.hh"
#include "cql3/statements/select_statement.hh" #include "cql3/statements/select_statement.hh"
#include "cql3/expr/expression.hh" #include "cql3/expr/expression.hh"
#include "cql3/expr/evaluate.hh" #include "cql3/expr/evaluate.hh"
@@ -18,7 +16,7 @@
#include "cql3/statements/raw/select_statement.hh" #include "cql3/statements/raw/select_statement.hh"
#include "cql3/query_processor.hh" #include "cql3/query_processor.hh"
#include "cql3/statements/prune_materialized_view_statement.hh" #include "cql3/statements/prune_materialized_view_statement.hh"
#include "cql3/statements/broadcast_select_statement.hh" #include "cql3/statements/strongly_consistent_select_statement.hh"
#include "exceptions/exceptions.hh" #include "exceptions/exceptions.hh"
#include <seastar/core/future.hh> #include <seastar/core/future.hh>
@@ -27,14 +25,12 @@
#include "service/broadcast_tables/experimental/lang.hh" #include "service/broadcast_tables/experimental/lang.hh"
#include "service/qos/qos_common.hh" #include "service/qos/qos_common.hh"
#include "transport/messages/result_message.hh" #include "transport/messages/result_message.hh"
#include "cql3/functions/functions.hh"
#include "cql3/functions/as_json_function.hh" #include "cql3/functions/as_json_function.hh"
#include "cql3/selection/selection.hh" #include "cql3/selection/selection.hh"
#include "cql3/util.hh" #include "cql3/util.hh"
#include "cql3/restrictions/statement_restrictions.hh" #include "cql3/restrictions/statement_restrictions.hh"
#include "index/secondary_index.hh" #include "index/secondary_index.hh"
#include "types/vector.hh" #include "types/vector.hh"
#include "vector_search/filter.hh"
#include "validation.hh" #include "validation.hh"
#include "exceptions/unrecognized_entity_exception.hh" #include "exceptions/unrecognized_entity_exception.hh"
#include <optional> #include <optional>
@@ -372,9 +368,8 @@ uint64_t select_statement::get_inner_loop_limit(uint64_t limit, bool is_aggregat
} }
bool select_statement::needs_post_query_ordering() const { bool select_statement::needs_post_query_ordering() const {
// We need post-query ordering for queries with IN on the partition key and an ORDER BY // We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
// and ANN index queries with rescoring. return _restrictions->key_is_in_relation() && !_parameters->orderings().empty();
return static_cast<bool>(_ordering_comparator);
} }
struct select_statement_executor { struct select_statement_executor {
@@ -1963,46 +1958,14 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
})); }));
} }
struct ann_ordering_info { ::shared_ptr<cql3::statements::select_statement> vector_indexed_table_select_statement::prepare(data_dictionary::database db, schema_ptr schema,
secondary_index::index _index; uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
raw::select_statement::prepared_ann_ordering_type _prepared_ann_ordering; ::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
bool is_rescoring_enabled; ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
}; std::optional<expr::expression> per_partition_limit, cql_stats& stats, std::unique_ptr<attributes> attrs) {
static std::optional<ann_ordering_info> get_ann_ordering_info(
data_dictionary::database db,
schema_ptr schema,
lw_shared_ptr<const raw::select_statement::parameters> parameters,
prepare_context& ctx) {
if (parameters->orderings().empty()) {
return std::nullopt;
}
auto [column_id, ordering] = parameters->orderings().front();
const auto& ann_vector = std::get_if<raw::select_statement::ann_vector>(&ordering);
if (!ann_vector) {
return std::nullopt;
}
::shared_ptr<column_identifier> column = column_id->prepare_column_identifier(*schema);
const column_definition* def = schema->get_column_definition(column->name());
if (!def) {
throw exceptions::invalid_request_exception(
fmt::format("Undefined column name {}", column->text()));
}
if (!def->type->is_vector() || static_cast<const vector_type_impl*>(def->type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception("ANN ordering is only supported on float vector indexes");
}
auto e = expr::prepare_expression(*ann_vector, db, schema->ks_name(), nullptr, def->column_specification);
expr::fill_prepare_context(e, ctx);
raw::select_statement::prepared_ann_ordering_type prepared_ann_ordering = std::make_pair(std::move(def), std::move(e));
auto cf = db.find_column_family(schema); auto cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager(); auto& sim = cf.get_index_manager();
auto [index_opt, _] = restrictions->find_idx(sim);
auto indexes = sim.list_indexes(); auto indexes = sim.list_indexes();
auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) { auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) {
@@ -2015,89 +1978,26 @@ static std::optional<ann_ordering_info> get_ann_ordering_info(
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'"); throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
} }
return ann_ordering_info{ index_opt = *it;
*it,
std::move(prepared_ann_ordering),
secondary_index::vector_index::is_rescoring_enabled(it->metadata().options())
};
}
static uint32_t add_similarity_function_to_selectors( if (!index_opt) {
std::vector<selection::prepared_selector>& prepared_selectors, throw std::runtime_error("No index found.");
const ann_ordering_info& ann_ordering_info,
data_dictionary::database db,
schema_ptr schema) {
auto similarity_function_name = secondary_index::vector_index::get_cql_similarity_function_name(ann_ordering_info._index.metadata().options());
// Create the function name
auto func_name = functions::function_name::native_function(sstring(similarity_function_name));
// Create the function arguments
std::vector<expr::expression> args;
args.push_back(expr::column_value(ann_ordering_info._prepared_ann_ordering.first));
args.push_back(ann_ordering_info._prepared_ann_ordering.second);
// Get the function object
std::vector<shared_ptr<assignment_testable>> provided_args;
provided_args.push_back(expr::as_assignment_testable(args[0], expr::type_of(args[0])));
provided_args.push_back(expr::as_assignment_testable(args[1], expr::type_of(args[1])));
auto func = cql3::functions::instance().get(db, schema->ks_name(), func_name, provided_args, schema->ks_name(), schema->cf_name(), nullptr);
// Create the function call expression
expr::function_call similarity_func_call{
.func = func,
.args = std::move(args),
};
// Add the similarity function as a prepared selector (last)
prepared_selectors.push_back(selection::prepared_selector{
.expr = std::move(similarity_func_call),
.alias = nullptr,
});
return prepared_selectors.size() - 1;
}
static select_statement::ordering_comparator_type get_similarity_ordering_comparator(std::vector<selection::prepared_selector>& prepared_selectors, uint32_t similarity_column_index) {
auto type = expr::type_of(prepared_selectors[similarity_column_index].expr);
if (type->get_kind() != abstract_type::kind::float_kind) {
seastar::on_internal_error(logger, "Similarity function must return float type.");
} }
return [similarity_column_index, type] (const raw::select_statement::result_row_type& r1, const raw::select_statement::result_row_type& r2) {
auto& c1 = r1[similarity_column_index];
auto& c2 = r2[similarity_column_index];
auto f1 = c1 ? value_cast<float>(type->deserialize(*c1)) : std::numeric_limits<float>::quiet_NaN();
auto f2 = c2 ? value_cast<float>(type->deserialize(*c2)) : std::numeric_limits<float>::quiet_NaN();
if (std::isfinite(f1) && std::isfinite(f2)) {
return f1 > f2;
}
return std::isfinite(f1);
};
}
::shared_ptr<cql3::statements::select_statement> vector_indexed_table_select_statement::prepare(data_dictionary::database db, schema_ptr schema,
uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<attributes> attrs) {
auto prepared_filter = vector_search::prepare_filter(*restrictions, parameters->allow_filtering());
return ::make_shared<cql3::statements::vector_indexed_table_select_statement>(schema, bound_terms, parameters, std::move(selection), std::move(restrictions), return ::make_shared<cql3::statements::vector_indexed_table_select_statement>(schema, bound_terms, parameters, std::move(selection), std::move(restrictions),
std::move(group_by_cell_indices), is_reversed, std::move(ordering_comparator), std::move(prepared_ann_ordering), std::move(limit), std::move(group_by_cell_indices), is_reversed, std::move(ordering_comparator), std::move(prepared_ann_ordering), std::move(limit),
std::move(per_partition_limit), stats, index, std::move(prepared_filter), std::move(attrs)); std::move(per_partition_limit), stats, *index_opt, std::move(attrs));
} }
vector_indexed_table_select_statement::vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, vector_indexed_table_select_statement::vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions, ::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator,
prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<attributes> attrs)
vector_search::prepared_filter prepared_filter, std::unique_ptr<attributes> attrs)
: select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, limit, : select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, limit,
per_partition_limit, stats, std::move(attrs)} per_partition_limit, stats, std::move(attrs)}
, _index{index} , _index{index}
, _prepared_ann_ordering(std::move(prepared_ann_ordering)) , _prepared_ann_ordering(std::move(prepared_ann_ordering)) {
, _prepared_filter(std::move(prepared_filter)) {
if (!limit.has_value()) { if (!limit.has_value()) {
throw exceptions::invalid_request_exception("Vector ANN queries must have a limit specified"); throw exceptions::invalid_request_exception("Vector ANN queries must have a limit specified");
@@ -2132,19 +2032,13 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
auto aoe = abort_on_expiry(timeout); auto aoe = abort_on_expiry(timeout);
auto filter_json = _prepared_filter.to_json(options);
uint64_t fetch = static_cast<uint64_t>(std::ceil(limit * secondary_index::vector_index::get_oversampling(_index.metadata().options())));
auto pkeys = co_await qp.vector_store_client().ann( auto pkeys = co_await qp.vector_store_client().ann(
_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), fetch, filter_json, aoe.abort_source()); _schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, aoe.abort_source());
if (!pkeys.has_value()) { if (!pkeys.has_value()) {
co_await coroutine::return_exception( co_await coroutine::return_exception(
exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error()))); exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error())));
} }
if (pkeys->size() > limit && !secondary_index::vector_index::is_rescoring_enabled(_index.metadata().options())) {
pkeys->erase(pkeys->begin() + limit, pkeys->end());
}
co_return co_await query_base_table(qp, state, options, pkeys.value(), timeout); co_return co_await query_base_table(qp, state, options, pkeys.value(), timeout);
}); });
@@ -2161,11 +2055,11 @@ void vector_indexed_table_select_statement::update_stats() const {
} }
lw_shared_ptr<query::read_command> vector_indexed_table_select_statement::prepare_command_for_base_query( lw_shared_ptr<query::read_command> vector_indexed_table_select_statement::prepare_command_for_base_query(
query_processor& qp, service::query_state& state, const query_options& options, uint64_t fetch_limit) const { query_processor& qp, service::query_state& state, const query_options& options) const {
auto slice = make_partition_slice(options); auto slice = make_partition_slice(options);
return ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(slice), qp.proxy().get_max_result_size(slice), return ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(slice), qp.proxy().get_max_result_size(slice),
query::tombstone_limit(qp.proxy().get_tombstone_limit()), query::tombstone_limit(qp.proxy().get_tombstone_limit()),
query::row_limit(get_inner_loop_limit(fetch_limit, _selection->is_aggregate())), query::partition_limit(query::max_partitions), query::row_limit(get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate())), query::partition_limit(query::max_partitions),
_query_start_time_point, tracing::make_trace_info(state.get_trace_state()), query_id::create_null_id(), query::is_first_page::no, _query_start_time_point, tracing::make_trace_info(state.get_trace_state()), query_id::create_null_id(), query::is_first_page::no,
options.get_timestamp(state)); options.get_timestamp(state));
} }
@@ -2183,7 +2077,7 @@ std::vector<float> vector_indexed_table_select_statement::get_ann_ordering_vecto
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(query_processor& qp, future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(query_processor& qp,
service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys, service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys,
lowres_clock::time_point timeout) const { lowres_clock::time_point timeout) const {
auto command = prepare_command_for_base_query(qp, state, options, pkeys.size()); auto command = prepare_command_for_base_query(qp, state, options);
// For tables without clustering columns, we can optimize by querying // For tables without clustering columns, we can optimize by querying
// partition ranges instead of individual primary keys, since the // partition ranges instead of individual primary keys, since the
@@ -2222,7 +2116,6 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
query::result_merger{command->get_row_limit(), query::max_partitions}); query::result_merger{command->get_row_limit(), query::max_partitions});
co_return co_await wrap_result_to_error_message([this, &command, &options](auto result) { co_return co_await wrap_result_to_error_message([this, &command, &options](auto result) {
command->set_row_limit(get_limit(options, _limit));
return process_results(std::move(result), command, options, _query_start_time_point); return process_results(std::move(result), command, options, _query_start_time_point);
})(std::move(result)); })(std::move(result));
} }
@@ -2236,7 +2129,6 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only}, {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
std::nullopt) std::nullopt)
.then(wrap_result_to_error_message([this, &options, command](service::storage_proxy::coordinator_query_result qr) { .then(wrap_result_to_error_message([this, &options, command](service::storage_proxy::coordinator_query_result qr) {
command->set_row_limit(get_limit(options, _limit));
return this->process_results(std::move(qr.query_result), command, options, _query_start_time_point); return this->process_results(std::move(qr.query_result), command, options, _query_start_time_point);
})); }));
} }
@@ -2331,41 +2223,32 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
prepared_selectors = maybe_jsonize_select_clause(std::move(prepared_selectors), db, schema); prepared_selectors = maybe_jsonize_select_clause(std::move(prepared_selectors), db, schema);
std::optional<ann_ordering_info> ann_ordering_info_opt = get_ann_ordering_info(db, schema, _parameters, ctx); auto aggregation_depth = 0u;
bool is_ann_query = ann_ordering_info_opt.has_value();
if (prepared_selectors.empty() && (!_group_by_columns.empty() || (is_ann_query && ann_ordering_info_opt->is_rescoring_enabled))) { // Force aggregation if GROUP BY is used. This will wrap every column x as first(x).
// We have a "SELECT * GROUP BY" or "SELECT * ORDER BY ANN" with rescoring enabled. If we leave prepared_selectors if (!_group_by_columns.empty()) {
// empty, below we choose selection::wildcard() for SELECT *, and either: aggregation_depth = std::max(aggregation_depth, 1u);
// - forget to do the "levellize" trick needed for the GROUP BY. See #16531. if (prepared_selectors.empty()) {
// - forget to add the similarity function needed for ORDER BY ANN with rescoring. See below. // We have a "SELECT * GROUP BY". If we leave prepared_selectors
// So we need to set prepared_selectors. // empty, below we choose selection::wildcard() for SELECT *, and
auto all_columns = selection::selection::wildcard_columns(schema); // forget to do the "levellize" trick needed for the GROUP BY.
std::vector<::shared_ptr<selection::raw_selector>> select_all; // So we need to set prepared_selectors. See #16531.
select_all.reserve(all_columns.size()); auto all_columns = selection::selection::wildcard_columns(schema);
for (const column_definition *cdef : all_columns) { std::vector<::shared_ptr<selection::raw_selector>> select_all;
auto name = ::make_shared<cql3::column_identifier::raw>(cdef->name_as_text(), true); select_all.reserve(all_columns.size());
select_all.push_back(::make_shared<selection::raw_selector>( for (const column_definition *cdef : all_columns) {
expr::unresolved_identifier(std::move(name)), nullptr)); auto name = ::make_shared<cql3::column_identifier::raw>(cdef->name_as_text(), true);
select_all.push_back(::make_shared<selection::raw_selector>(
expr::unresolved_identifier(std::move(name)), nullptr));
}
prepared_selectors = selection::raw_selector::to_prepared_selectors(select_all, *schema, db, keyspace());
} }
prepared_selectors = selection::raw_selector::to_prepared_selectors(select_all, *schema, db, keyspace());
} }
for (auto& ps : prepared_selectors) { for (auto& ps : prepared_selectors) {
expr::fill_prepare_context(ps.expr, ctx); expr::fill_prepare_context(ps.expr, ctx);
} }
// Force aggregation if GROUP BY is used. This will wrap every column x as first(x).
auto aggregation_depth = _group_by_columns.empty() ? 0u : 1u;
select_statement::ordering_comparator_type ordering_comparator;
bool hide_last_column = false;
if (is_ann_query && ann_ordering_info_opt->is_rescoring_enabled) {
uint32_t similarity_column_index = add_similarity_function_to_selectors(prepared_selectors, *ann_ordering_info_opt, db, schema);
hide_last_column = true;
ordering_comparator = get_similarity_ordering_comparator(prepared_selectors, similarity_column_index);
}
for (auto& ps : prepared_selectors) { for (auto& ps : prepared_selectors) {
aggregation_depth = std::max(aggregation_depth, expr::aggregation_depth(ps.expr)); aggregation_depth = std::max(aggregation_depth, expr::aggregation_depth(ps.expr));
} }
@@ -2383,11 +2266,6 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
? selection::selection::wildcard(schema) ? selection::selection::wildcard(schema)
: selection::selection::from_selectors(db, schema, keyspace(), levellized_prepared_selectors); : selection::selection::from_selectors(db, schema, keyspace(), levellized_prepared_selectors);
if (is_ann_query && hide_last_column) {
// Hide the similarity selector from the client by reducing column_count
selection->get_result_metadata()->hide_last_column();
}
// Cassandra 5.0.2 disallows PER PARTITION LIMIT with aggregate queries // Cassandra 5.0.2 disallows PER PARTITION LIMIT with aggregate queries
// but only if GROUP BY is not used. // but only if GROUP BY is not used.
// See #9879 for more details. // See #9879 for more details.
@@ -2395,6 +2273,8 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
throw exceptions::invalid_request_exception("PER PARTITION LIMIT is not allowed with aggregate queries."); throw exceptions::invalid_request_exception("PER PARTITION LIMIT is not allowed with aggregate queries.");
} }
bool is_ann_query = !_parameters->orderings().empty() && std::holds_alternative<select_statement::ann_vector>(_parameters->orderings().front().second);
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering() || is_ann_query, auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering() || is_ann_query,
restrictions::check_indexes(!_parameters->is_mutation_fragments())); restrictions::check_indexes(!_parameters->is_mutation_fragments()));
@@ -2402,14 +2282,19 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
validate_distinct_selection(*schema, *selection, *restrictions); validate_distinct_selection(*schema, *selection, *restrictions);
} }
select_statement::ordering_comparator_type ordering_comparator;
bool is_reversed_ = false; bool is_reversed_ = false;
std::optional<prepared_ann_ordering_type> prepared_ann_ordering;
auto orderings = _parameters->orderings(); auto orderings = _parameters->orderings();
if (!orderings.empty() && !is_ann_query) { if (!orderings.empty()) {
std::visit([&](auto&& ordering) { std::visit([&](auto&& ordering) {
using T = std::decay_t<decltype(ordering)>; using T = std::decay_t<decltype(ordering)>;
if constexpr (!std::is_same_v<T, select_statement::ann_vector>) { if constexpr (std::is_same_v<T, select_statement::ann_vector>) {
prepared_ann_ordering = prepare_ann_ordering(*schema, ctx, db);
} else {
SCYLLA_ASSERT(!for_view); SCYLLA_ASSERT(!for_view);
verify_ordering_is_allowed(*_parameters, *restrictions); verify_ordering_is_allowed(*_parameters, *restrictions);
prepared_orderings_type prepared_orderings = prepare_orderings(*schema); prepared_orderings_type prepared_orderings = prepare_orderings(*schema);
@@ -2422,7 +2307,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
} }
std::vector<sstring> warnings; std::vector<sstring> warnings;
if (!is_ann_query) { if (!prepared_ann_ordering.has_value()) {
check_needs_filtering(*restrictions, db.get_config().strict_allow_filtering(), warnings); check_needs_filtering(*restrictions, db.get_config().strict_allow_filtering(), warnings);
ensure_filtering_columns_retrieval(db, *selection, *restrictions); ensure_filtering_columns_retrieval(db, *selection, *restrictions);
} }
@@ -2476,21 +2361,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
&& restrictions->partition_key_restrictions_size() == schema->partition_key_size()); && restrictions->partition_key_restrictions_size() == schema->partition_key_size());
}; };
if (strong_consistency::is_strongly_consistent(db, schema->ks_name())) { if (_parameters->is_prune_materialized_view()) {
stmt = ::make_shared<strong_consistency::select_statement>(
schema,
ctx.bound_variables_size(),
_parameters,
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed_,
std::move(ordering_comparator),
prepare_limit(db, ctx, _limit),
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs));
} else if (_parameters->is_prune_materialized_view()) {
stmt = ::make_shared<cql3::statements::prune_materialized_view_statement>( stmt = ::make_shared<cql3::statements::prune_materialized_view_statement>(
schema, schema,
ctx.bound_variables_size(), ctx.bound_variables_size(),
@@ -2519,10 +2390,10 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
prepare_limit(db, ctx, _per_partition_limit), prepare_limit(db, ctx, _per_partition_limit),
stats, stats,
std::move(prepared_attrs)); std::move(prepared_attrs));
} else if (is_ann_query) { } else if (prepared_ann_ordering) {
stmt = vector_indexed_table_select_statement::prepare(db, schema, ctx.bound_variables_size(), _parameters, std::move(selection), std::move(restrictions), stmt = vector_indexed_table_select_statement::prepare(db, schema, ctx.bound_variables_size(), _parameters, std::move(selection), std::move(restrictions),
std::move(group_by_cell_indices), is_reversed_, std::move(ordering_comparator), std::move(ann_ordering_info_opt->_prepared_ann_ordering), std::move(group_by_cell_indices), is_reversed_, std::move(ordering_comparator), std::move(*prepared_ann_ordering),
prepare_limit(db, ctx, _limit), prepare_limit(db, ctx, _per_partition_limit), stats, ann_ordering_info_opt->_index, std::move(prepared_attrs)); prepare_limit(db, ctx, _limit), prepare_limit(db, ctx, _per_partition_limit), stats, std::move(prepared_attrs));
} else if (restrictions->uses_secondary_indexing()) { } else if (restrictions->uses_secondary_indexing()) {
stmt = view_indexed_table_select_statement::prepare( stmt = view_indexed_table_select_statement::prepare(
db, db,
@@ -2554,7 +2425,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
std::move(prepared_attrs) std::move(prepared_attrs)
); );
} else if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) { } else if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
stmt = ::make_shared<cql3::statements::broadcast_select_statement>( stmt = ::make_shared<cql3::statements::strongly_consistent_select_statement>(
schema, schema,
ctx.bound_variables_size(), ctx.bound_variables_size(),
_parameters, _parameters,
@@ -2744,6 +2615,28 @@ void select_statement::verify_ordering_is_valid(const prepared_orderings_type& o
} }
} }
select_statement::prepared_ann_ordering_type select_statement::prepare_ann_ordering(const schema& schema, prepare_context& ctx, data_dictionary::database db) const {
auto [column_id, ordering] = _parameters->orderings().front();
const auto& ann_vector = std::get_if<select_statement::ann_vector>(&ordering);
SCYLLA_ASSERT(ann_vector);
::shared_ptr<column_identifier> column = column_id->prepare_column_identifier(schema);
const column_definition* def = schema.get_column_definition(column->name());
if (!def) {
throw exceptions::invalid_request_exception(
fmt::format("Undefined column name {}", column->text()));
}
if (!def->type->is_vector() || static_cast<const vector_type_impl*>(def->type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception("ANN ordering is only supported on float vector indexes");
}
auto e = expr::prepare_expression(*ann_vector, db, keyspace(), nullptr, def->column_specification);
expr::fill_prepare_context(e, ctx);
return std::make_pair(std::move(def), std::move(e));
}
select_statement::ordering_comparator_type select_statement::get_ordering_comparator(const prepared_orderings_type& orderings, select_statement::ordering_comparator_type select_statement::get_ordering_comparator(const prepared_orderings_type& orderings,
selection::selection& selection, selection::selection& selection,
const restrictions::statement_restrictions& restrictions) { const restrictions::statement_restrictions& restrictions) {

View File

@@ -22,7 +22,6 @@
#include "locator/host_id.hh" #include "locator/host_id.hh"
#include "service/cas_shard.hh" #include "service/cas_shard.hh"
#include "vector_search/vector_store_client.hh" #include "vector_search/vector_store_client.hh"
#include "vector_search/filter.hh"
namespace service { namespace service {
class client_state; class client_state;
@@ -363,7 +362,6 @@ private:
class vector_indexed_table_select_statement : public select_statement { class vector_indexed_table_select_statement : public select_statement {
secondary_index::index _index; secondary_index::index _index;
prepared_ann_ordering_type _prepared_ann_ordering; prepared_ann_ordering_type _prepared_ann_ordering;
vector_search::prepared_filter _prepared_filter;
mutable gc_clock::time_point _query_start_time_point; mutable gc_clock::time_point _query_start_time_point;
public: public:
@@ -373,13 +371,13 @@ public:
lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit, ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<cql3::attributes> attrs); std::optional<expr::expression> per_partition_limit, cql_stats& stats, std::unique_ptr<cql3::attributes> attrs);
vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, vector_indexed_table_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions, ::shared_ptr<selection::selection> selection, ::shared_ptr<const restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator,
prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit, std::optional<expr::expression> per_partition_limit, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit, std::optional<expr::expression> per_partition_limit,
cql_stats& stats, const secondary_index::index& index, vector_search::prepared_filter prepared_filter, std::unique_ptr<cql3::attributes> attrs); cql_stats& stats, const secondary_index::index& index, std::unique_ptr<cql3::attributes> attrs);
private: private:
future<::shared_ptr<cql_transport::messages::result_message>> do_execute( future<::shared_ptr<cql_transport::messages::result_message>> do_execute(
@@ -387,7 +385,7 @@ private:
void update_stats() const; void update_stats() const;
lw_shared_ptr<query::read_command> prepare_command_for_base_query(query_processor& qp, service::query_state& state, const query_options& options, uint64_t fetch_limit) const; lw_shared_ptr<query::read_command> prepare_command_for_base_query(query_processor& qp, service::query_state& state, const query_options& options) const;
std::vector<float> get_ann_ordering_vector(const query_options& options) const; std::vector<float> get_ann_ordering_vector(const query_options& options) const;

View File

@@ -1,82 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "modification_statement.hh"
#include "transport/messages/result_message.hh"
#include "cql3/query_processor.hh"
#include "service/strong_consistency/coordinator.hh"
#include "cql3/statements/strong_consistency/statement_helpers.hh"
namespace cql3::statements::strong_consistency {
static logging::logger logger("sc_modification_statement");
modification_statement::modification_statement(shared_ptr<base_statement> statement)
: cql_statement_opt_metadata(&timeout_config::write_timeout)
, _statement(std::move(statement))
{
}
using result_message = cql_transport::messages::result_message;
future<shared_ptr<result_message>> modification_statement::execute(query_processor& qp, service::query_state& qs,
const query_options& options, std::optional<service::group0_guard> guard) const
{
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<result_message>>);
}
future<shared_ptr<result_message>> modification_statement::execute_without_checking_exception_message(
query_processor& qp, service::query_state& qs, const query_options& options,
std::optional<service::group0_guard> guard) const
{
auto json_cache = base_statement::json_cache_opt{};
const auto keys = _statement->build_partition_keys(options, json_cache);
if (keys.size() != 1 || !query::is_single_partition(keys[0])) {
throw exceptions::invalid_request_exception("Strongly consistent queries can only target a single partition");
}
if (_statement->requires_read()) {
throw exceptions::invalid_request_exception("Strongly consistent updates don't support data prefetch");
}
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
const auto mutate_result = co_await coordinator.get().mutate(_statement->s,
keys[0].start()->value().token(),
[&](api::timestamp_type ts) {
const auto prefetch_data = update_parameters::prefetch_data(_statement->s);
const auto ttl = _statement->get_time_to_live(options);
const auto params = update_parameters(_statement->s, options, ts, ttl, prefetch_data);
const auto ranges = _statement->create_clustering_ranges(options, json_cache);
auto muts = _statement->apply_updates(keys, ranges, params, json_cache);
if (muts.size() != 1) {
on_internal_error(logger, ::format("statement '{}' has unexpected number of mutations {}",
raw_cql_statement, muts.size()));
}
return std::move(*muts.begin());
});
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
co_return co_await redirect_statement(qp, options, redirect->target);
}
co_return seastar::make_shared<result_message::void_message>();
}
future<> modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
return _statement->check_access(qp, state);
}
uint32_t modification_statement::get_bound_terms() const {
return _statement->get_bound_terms();
}
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return _statement->depends_on(ks_name, cf_name);
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "cql3/cql_statement.hh"
#include "cql3/expr/expression.hh"
#include "cql3/statements/modification_statement.hh"
namespace cql3::statements::strong_consistency {
class modification_statement : public cql_statement_opt_metadata {
using result_message = cql_transport::messages::result_message;
using base_statement = cql3::statements::modification_statement;
shared_ptr<base_statement> _statement;
public:
modification_statement(shared_ptr<base_statement> statement);
future<shared_ptr<result_message>> execute(query_processor& qp, service::query_state& state,
const query_options& options, std::optional<service::group0_guard> guard) const override;
future<shared_ptr<result_message>> execute_without_checking_exception_message(query_processor& qp,
service::query_state& qs, const query_options& options,
std::optional<service::group0_guard> guard) const override;
future<> check_access(query_processor& qp, const service::client_state& state) const override;
uint32_t get_bound_terms() const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
};
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "select_statement.hh"
#include "query/query-request.hh"
#include "cql3/query_processor.hh"
#include "service/strong_consistency/coordinator.hh"
#include "cql3/statements/strong_consistency/statement_helpers.hh"
namespace cql3::statements::strong_consistency {
using result_message = cql_transport::messages::result_message;
future<::shared_ptr<result_message>> select_statement::do_execute(query_processor& qp,
service::query_state& state,
const query_options& options) const
{
const auto key_ranges = _restrictions->get_partition_key_ranges(options);
if (key_ranges.size() != 1 || !query::is_single_partition(key_ranges[0])) {
throw exceptions::invalid_request_exception("Strongly consistent queries can only target a single partition");
}
const auto now = gc_clock::now();
auto read_command = make_lw_shared<query::read_command>(
_query_schema->id(),
_query_schema->version(),
make_partition_slice(options),
query::max_result_size(query::result_memory_limiter::maximum_result_size),
query::tombstone_limit(query::tombstone_limit::max),
query::row_limit(get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate())),
query::partition_limit(query::max_partitions),
now,
tracing::make_trace_info(state.get_trace_state()),
query_id::create_null_id(),
query::is_first_page::no,
options.get_timestamp(state));
const auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
auto query_result = co_await coordinator.get().query(_query_schema, *read_command,
key_ranges, state.get_trace_state(), timeout);
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&query_result)) {
co_return co_await redirect_statement(qp, options, redirect->target);
}
co_return co_await process_results(get<lw_shared_ptr<query::result>>(std::move(query_result)),
read_command, options, now);
}
}

View File

@@ -1,26 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "cql3/cql_statement.hh"
#include "cql3/statements/select_statement.hh"
namespace cql3::statements::strong_consistency {
class select_statement : public cql3::statements::select_statement {
using result_message = cql_transport::messages::result_message;
public:
using cql3::statements::select_statement::select_statement;
future<::shared_ptr<cql_transport::messages::result_message>> do_execute(query_processor& qp,
service::query_state& state, const query_options& options) const override;
};
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "statement_helpers.hh"
#include "transport/messages/result_message_base.hh"
#include "cql3/query_processor.hh"
#include "replica/database.hh"
#include "locator/tablet_replication_strategy.hh"
namespace cql3::statements::strong_consistency {
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(query_processor& qp,
const query_options& options,
const locator::tablet_replica& target)
{
const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id();
if (target.host != my_host_id) {
throw exceptions::invalid_request_exception(format(
"Strongly consistent writes can be executed only on the leader node, "
"leader id {}, current host id {}",
target.host, my_host_id));
}
auto&& func_values_cache = const_cast<cql3::query_options&>(options).take_cached_pk_function_calls();
co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache));
}
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name) {
const auto* tablet_aware_rs = db.find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware();
return tablet_aware_rs && tablet_aware_rs->get_consistency() != data_dictionary::consistency_config_option::eventual;
}
}

View File

@@ -1,23 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "cql3/cql_statement.hh"
#include "locator/tablets.hh"
namespace cql3::statements::strong_consistency {
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(
query_processor& qp,
const query_options& options,
const locator::tablet_replica& target);
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name);
}

View File

@@ -9,7 +9,7 @@
*/ */
#include "cql3/statements/broadcast_modification_statement.hh" #include "cql3/statements/strongly_consistent_modification_statement.hh"
#include <optional> #include <optional>
@@ -28,11 +28,11 @@
namespace cql3 { namespace cql3 {
static logging::logger logger("broadcast_modification_statement"); static logging::logger logger("strongly_consistent_modification_statement");
namespace statements { namespace statements {
broadcast_modification_statement::broadcast_modification_statement( strongly_consistent_modification_statement::strongly_consistent_modification_statement(
uint32_t bound_terms, uint32_t bound_terms,
schema_ptr schema, schema_ptr schema,
broadcast_tables::prepared_update query) broadcast_tables::prepared_update query)
@@ -43,7 +43,7 @@ broadcast_modification_statement::broadcast_modification_statement(
{ } { }
future<::shared_ptr<cql_transport::messages::result_message>> future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const { strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
return execute_without_checking_exception_message(qp, qs, options, std::move(guard)) return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>); .then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
} }
@@ -63,7 +63,7 @@ evaluate_prepared(
} }
future<::shared_ptr<cql_transport::messages::result_message>> future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const { strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
if (this_shard_id() != 0) { if (this_shard_id() != 0) {
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{}); co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
} }
@@ -103,11 +103,11 @@ broadcast_modification_statement::execute_without_checking_exception_message(que
), result); ), result);
} }
uint32_t broadcast_modification_statement::get_bound_terms() const { uint32_t strongly_consistent_modification_statement::get_bound_terms() const {
return _bound_terms; return _bound_terms;
} }
future<> broadcast_modification_statement::check_access(query_processor& qp, const service::client_state& state) const { future<> strongly_consistent_modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
auto f = state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::MODIFY); auto f = state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::MODIFY);
if (_query.value_condition.has_value()) { if (_query.value_condition.has_value()) {
f = f.then([this, &state] { f = f.then([this, &state] {
@@ -117,7 +117,7 @@ future<> broadcast_modification_statement::check_access(query_processor& qp, con
return f; return f;
} }
bool broadcast_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const { bool strongly_consistent_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return _schema->ks_name() == ks_name && (!cf_name || _schema->cf_name() == *cf_name); return _schema->ks_name() == ks_name && (!cf_name || _schema->cf_name() == *cf_name);
} }

View File

@@ -27,13 +27,13 @@ struct prepared_update {
} }
class broadcast_modification_statement : public cql_statement_opt_metadata { class strongly_consistent_modification_statement : public cql_statement_opt_metadata {
const uint32_t _bound_terms; const uint32_t _bound_terms;
const schema_ptr _schema; const schema_ptr _schema;
const broadcast_tables::prepared_update _query; const broadcast_tables::prepared_update _query;
public: public:
broadcast_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query); strongly_consistent_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query);
virtual future<::shared_ptr<cql_transport::messages::result_message>> virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override; execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;

View File

@@ -9,7 +9,7 @@
*/ */
#include "cql3/statements/broadcast_select_statement.hh" #include "cql3/statements/strongly_consistent_select_statement.hh"
#include <seastar/core/future.hh> #include <seastar/core/future.hh>
#include <seastar/core/on_internal_error.hh> #include <seastar/core/on_internal_error.hh>
@@ -24,7 +24,7 @@ namespace cql3 {
namespace statements { namespace statements {
static logging::logger logger("broadcast_select_statement"); static logging::logger logger("strongly_consistent_select_statement");
static static
expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) { expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) {
@@ -58,7 +58,7 @@ bool is_selecting_only_value(const cql3::selection::selection& selection) {
selection.get_columns()[0]->name() == "value"; selection.get_columns()[0]->name() == "value";
} }
broadcast_select_statement::broadcast_select_statement(schema_ptr schema, uint32_t bound_terms, strongly_consistent_select_statement::strongly_consistent_select_statement(schema_ptr schema, uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters, lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection, ::shared_ptr<selection::selection> selection,
::shared_ptr<const restrictions::statement_restrictions> restrictions, ::shared_ptr<const restrictions::statement_restrictions> restrictions,
@@ -73,7 +73,7 @@ broadcast_select_statement::broadcast_select_statement(schema_ptr schema, uint32
_query{prepare_query()} _query{prepare_query()}
{ } { }
broadcast_tables::prepared_select broadcast_select_statement::prepare_query() const { broadcast_tables::prepared_select strongly_consistent_select_statement::prepare_query() const {
if (!is_selecting_only_value(*_selection)) { if (!is_selecting_only_value(*_selection)) {
throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed"); throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed");
} }
@@ -94,7 +94,7 @@ evaluate_prepared(
} }
future<::shared_ptr<cql_transport::messages::result_message>> future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const { strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
if (this_shard_id() != 0) { if (this_shard_id() != 0) {
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{}); co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
} }

View File

@@ -25,12 +25,12 @@ struct prepared_select {
} }
class broadcast_select_statement : public select_statement { class strongly_consistent_select_statement : public select_statement {
const broadcast_tables::prepared_select _query; const broadcast_tables::prepared_select _query;
broadcast_tables::prepared_select prepare_query() const; broadcast_tables::prepared_select prepare_query() const;
public: public:
broadcast_select_statement(schema_ptr schema, strongly_consistent_select_statement(schema_ptr schema,
uint32_t bound_terms, uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters, lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection, ::shared_ptr<selection::selection> selection,

View File

@@ -13,7 +13,7 @@
#include "cql3/expr/expression.hh" #include "cql3/expr/expression.hh"
#include "cql3/expr/evaluate.hh" #include "cql3/expr/evaluate.hh"
#include "cql3/expr/expr-utils.hh" #include "cql3/expr/expr-utils.hh"
#include "cql3/statements/broadcast_modification_statement.hh" #include "cql3/statements/strongly_consistent_modification_statement.hh"
#include "service/broadcast_tables/experimental/lang.hh" #include "service/broadcast_tables/experimental/lang.hh"
#include "raw/update_statement.hh" #include "raw/update_statement.hh"
@@ -333,7 +333,7 @@ std::optional<expr::expression> get_value_condition(const expr::expression& the_
return binop->rhs; return binop->rhs;
} }
::shared_ptr<broadcast_modification_statement> ::shared_ptr<strongly_consistent_modification_statement>
update_statement::prepare_for_broadcast_tables() const { update_statement::prepare_for_broadcast_tables() const {
if (attrs) { if (attrs) {
if (attrs->is_time_to_live_set()) { if (attrs->is_time_to_live_set()) {
@@ -359,7 +359,7 @@ update_statement::prepare_for_broadcast_tables() const {
.value_condition = get_value_condition(_condition), .value_condition = get_value_condition(_condition),
}; };
return ::make_shared<broadcast_modification_statement>( return ::make_shared<strongly_consistent_modification_statement>(
get_bound_terms(), get_bound_terms(),
s, s,
query query

View File

@@ -45,7 +45,7 @@ private:
virtual void execute_operations_for_key(mutation& m, const clustering_key_prefix& prefix, const update_parameters& params, const json_cache_opt& json_cache) const; virtual void execute_operations_for_key(mutation& m, const clustering_key_prefix& prefix, const update_parameters& params, const json_cache_opt& json_cache) const;
public: public:
virtual ::shared_ptr<broadcast_modification_statement> prepare_for_broadcast_tables() const override; virtual ::shared_ptr<strongly_consistent_modification_statement> prepare_for_broadcast_tables() const override;
}; };
/* /*

View File

@@ -323,9 +323,6 @@ void cache_mutation_reader::touch_partition() {
inline inline
future<> cache_mutation_reader::fill_buffer() { future<> cache_mutation_reader::fill_buffer() {
if (const auto& ex = get_abort_exception(); ex) {
return make_exception_future<>(ex);
}
if (_state == state::before_static_row) { if (_state == state::before_static_row) {
touch_partition(); touch_partition();
auto after_static_row = [this] { auto after_static_row = [this] {

View File

@@ -621,6 +621,25 @@ db::config::config(std::shared_ptr<db::extensions> exts)
* @GroupDescription: Provides an overview of the group. * @GroupDescription: Provides an overview of the group.
*/ */
/** /**
* @Group Ungrouped properties
*/
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
"true: auto-adjust memtable shares for flush processes")
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
"Set to 0 to disable automatic flushing all tables before major compaction.")
/**
* @Group Initialization properties * @Group Initialization properties
* @GroupDescription The minimal properties needed for configuring a cluster. * @GroupDescription The minimal properties needed for configuring a cluster.
*/ */
@@ -1299,7 +1318,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.") , prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.")
, prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.") , prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.")
, prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.") , prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.")
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, false, "If set allows the experimental Prometheus protobuf with native histogram") , prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, true, "Enable Prometheus protobuf with native histogram. Set to false to force text exposition format.")
, abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.") , abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.")
, murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.") , murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.")
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.") , unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
@@ -1322,7 +1341,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts}, , sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
"Server-global user table compression options. If enabled, all user tables" "Server-global user table compression options. If enabled, all user tables"
"will be compressed using the provided options, unless overridden" "will be compressed using the provided options, unless overridden"
"by compression options in the table schema. User tables are all tables in non-system keyspaces. The available options are:\n" "by compression options in the table schema. The available options are:\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n" "* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n" "* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n" "* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
@@ -1375,10 +1394,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.") "Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2, , reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2,
"Admit new reads while there are less than this number of requests that need CPU.") "Admit new reads while there are less than this number of requests that need CPU.")
, reader_concurrency_semaphore_preemptive_abort_factor(this, "reader_concurrency_semaphore_preemptive_abort_factor", liveness::LiveUpdate, value_status::Used, 0.3,
"Admit new reads while their remaining time is more than this factor times their timeout times when arrived to a semaphore. Its vale means\n"
"* <= 0.0 means new reads will never get rejected during admission\n"
"* >= 1.0 means new reads will always get rejected during admission\n")
, view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2, , view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2,
"Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.") "Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4, , view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4,
@@ -1569,14 +1584,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.") , enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false, , rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid " "Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
"keyspaces, attempting to start a node with this option ON will fail. " "keyspaces, attempting to start a node with this option ON will fail.")
"DEPRECATED. Use enforce_rack_list instead.")
, enforce_rack_list(this, "enforce_rack_list", liveness::MustRestart, value_status::Used, false,
"Enforce rack list for tablet keyspaces. "
"When the option is on, CREATE STATEMENT expands numeric rfs to rack lists "
"and ALTER STATEMENT is allowed only when rack lists are used in all DCs."
"Additionally, if there are existing tablet keyspaces with numeric rf in any DC "
"attempting to start a node with this option ON will fail.")
// FIXME: make frequency per table in order to reduce work in each iteration. // FIXME: make frequency per table in order to reduce work in each iteration.
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration. // Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60, , tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
@@ -1587,25 +1595,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.") "Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100, , minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.") "Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
/**
* @Group Ungrouped properties
*/
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
"true: auto-adjust memtable shares for flush processes")
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
"Set to 0 to disable automatic flushing all tables before major compaction.")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages") , default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'") , logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout") , log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")
@@ -1796,21 +1785,6 @@ const db::extensions& db::config::extensions() const {
return *_extensions; return *_extensions;
} }
compression_parameters db::config::get_sstable_compression_user_table_options(bool dicts_feature_enabled) const {
if (sstable_compression_user_table_options.is_set()
|| dicts_feature_enabled
|| !sstable_compression_user_table_options().uses_dictionary_compressor()) {
return sstable_compression_user_table_options();
} else {
// Fall back to non-dict if dictionary compression is not enabled cluster-wide.
auto options = sstable_compression_user_table_options();
auto params = options.get_options();
auto algo = compression_parameters::non_dict_equivalent(options.get_algorithm());
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(algo));
return compression_parameters{params};
}
}
std::map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() { std::map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
// We decided against using the construct-on-first-use idiom here: // We decided against using the construct-on-first-use idiom here:
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807 // https://github.com/scylladb/scylla/pull/5369#discussion_r353614807

View File

@@ -185,6 +185,13 @@ public:
* All values and documentation taken from * All values and documentation taken from
* http://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html * http://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html
*/ */
named_value<double> background_writer_scheduling_quota;
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name; named_value<sstring> cluster_name;
named_value<sstring> listen_address; named_value<sstring> listen_address;
named_value<sstring> listen_interface; named_value<sstring> listen_interface;
@@ -412,13 +419,7 @@ public:
named_value<bool> enable_sstables_mc_format; named_value<bool> enable_sstables_mc_format;
named_value<bool> enable_sstables_md_format; named_value<bool> enable_sstables_md_format;
named_value<sstring> sstable_format; named_value<sstring> sstable_format;
// NOTE: Do not use this option directly.
// Use get_sstable_compression_user_table_options() instead.
named_value<compression_parameters> sstable_compression_user_table_options; named_value<compression_parameters> sstable_compression_user_table_options;
compression_parameters get_sstable_compression_user_table_options(bool dicts_feature_enabled) const;
named_value<bool> sstable_compression_dictionaries_allow_in_ddl; named_value<bool> sstable_compression_dictionaries_allow_in_ddl;
named_value<bool> sstable_compression_dictionaries_enable_writing; named_value<bool> sstable_compression_dictionaries_enable_writing;
named_value<float> sstable_compression_dictionaries_memory_budget_fraction; named_value<float> sstable_compression_dictionaries_memory_budget_fraction;
@@ -439,7 +440,6 @@ public:
named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier; named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier; named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_cpu_concurrency; named_value<uint32_t> reader_concurrency_semaphore_cpu_concurrency;
named_value<float> reader_concurrency_semaphore_preemptive_abort_factor;
named_value<uint32_t> view_update_reader_concurrency_semaphore_serialize_limit_multiplier; named_value<uint32_t> view_update_reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_kill_limit_multiplier; named_value<uint32_t> view_update_reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_cpu_concurrency; named_value<uint32_t> view_update_reader_concurrency_semaphore_cpu_concurrency;
@@ -599,21 +599,12 @@ public:
named_value<bool> enable_create_table_with_compact_storage; named_value<bool> enable_create_table_with_compact_storage;
named_value<bool> rf_rack_valid_keyspaces; named_value<bool> rf_rack_valid_keyspaces;
named_value<bool> enforce_rack_list;
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds; named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
named_value<bool> force_capacity_based_balancing; named_value<bool> force_capacity_based_balancing;
named_value<float> size_based_balance_threshold_percentage; named_value<float> size_based_balance_threshold_percentage;
named_value<uint64_t> minimal_tablet_size_for_balancing; named_value<uint64_t> minimal_tablet_size_for_balancing;
named_value<double> background_writer_scheduling_quota;
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
static const sstring default_tls_priority; static const sstring default_tls_priority;
private: private:
template<typename T> template<typename T>

View File

@@ -31,23 +31,19 @@ size_t quorum_for(const locator::effective_replication_map& erm) {
return replication_factor ? (replication_factor / 2) + 1 : 0; return replication_factor ? (replication_factor / 2) + 1 : 0;
} }
static size_t get_replication_factor_for_dc(const locator::effective_replication_map& erm, const sstring& dc) { size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
using namespace locator; using namespace locator;
const auto& rs = erm.get_replication_strategy(); const auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) { if (rs.get_type() == replication_strategy_type::network_topology) {
const network_topology_strategy* nts = const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs); static_cast<const network_topology_strategy*>(&rs);
return nts->get_replication_factor(dc); size_t replication_factor = nrs->get_replication_factor(dc);
return replication_factor ? (replication_factor / 2) + 1 : 0;
} }
return erm.get_replication_factor(); return quorum_for(erm);
}
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
auto rf = get_replication_factor_for_dc(erm, dc);
return rf ? (rf / 2) + 1 : 0;
} }
size_t block_for_local_serial(const locator::effective_replication_map& erm) { size_t block_for_local_serial(const locator::effective_replication_map& erm) {
@@ -192,30 +188,18 @@ void assure_sufficient_live_nodes(
return pending <= live ? live - pending : 0; return pending <= live ? live - pending : 0;
}; };
auto make_rf_zero_error_msg = [cl] (const sstring& local_dc) {
return format("Cannot achieve consistency level {} in datacenter '{}' with replication factor 0. "
"Ensure the keyspace is replicated to this datacenter or use a non-local consistency level.", cl, local_dc);
};
const auto& topo = erm.get_topology(); const auto& topo = erm.get_topology();
const sstring& local_dc = topo.get_datacenter();
switch (cl) { switch (cl) {
case consistency_level::ANY: case consistency_level::ANY:
// local hint is acceptable, and local node is always live // local hint is acceptable, and local node is always live
break; break;
case consistency_level::LOCAL_ONE: case consistency_level::LOCAL_ONE:
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, 1, 0);
}
if (topo.count_local_endpoints(live_endpoints) < topo.count_local_endpoints(pending_endpoints) + 1) { if (topo.count_local_endpoints(live_endpoints) < topo.count_local_endpoints(pending_endpoints) + 1) {
throw exceptions::unavailable_exception(cl, 1, 0); throw exceptions::unavailable_exception(cl, 1, 0);
} }
break; break;
case consistency_level::LOCAL_QUORUM: { case consistency_level::LOCAL_QUORUM: {
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, need, 0);
}
size_t local_live = topo.count_local_endpoints(live_endpoints); size_t local_live = topo.count_local_endpoints(live_endpoints);
size_t pending = topo.count_local_endpoints(pending_endpoints); size_t pending = topo.count_local_endpoints(pending_endpoints);
if (local_live < need + pending) { if (local_live < need + pending) {

View File

@@ -158,7 +158,7 @@ void hint_endpoint_manager::cancel_draining() noexcept {
_sender.cancel_draining(); _sender.cancel_draining();
} }
hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, fs::path hint_directory, manager& shard_manager, scheduling_group send_sg) hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, fs::path hint_directory, manager& shard_manager)
: _key(key) : _key(key)
, _shard_manager(shard_manager) , _shard_manager(shard_manager)
, _store_gate("hint_endpoint_manager") , _store_gate("hint_endpoint_manager")
@@ -169,7 +169,7 @@ hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, fs::path hi
// Approximate the position of the last written hint by using the same formula as for segment id calculation in commitlog // Approximate the position of the last written hint by using the same formula as for segment id calculation in commitlog
// TODO: Should this logic be deduplicated with what is in the commitlog? // TODO: Should this logic be deduplicated with what is in the commitlog?
, _last_written_rp(this_shard_id(), std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count()) , _last_written_rp(this_shard_id(), std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count())
, _sender(*this, _shard_manager.local_storage_proxy(), _shard_manager.local_db(), _shard_manager.local_gossiper(), send_sg) , _sender(*this, _shard_manager.local_storage_proxy(), _shard_manager.local_db(), _shard_manager.local_gossiper())
{} {}
hint_endpoint_manager::hint_endpoint_manager(hint_endpoint_manager&& other) hint_endpoint_manager::hint_endpoint_manager(hint_endpoint_manager&& other)

View File

@@ -63,7 +63,7 @@ private:
hint_sender _sender; hint_sender _sender;
public: public:
hint_endpoint_manager(const endpoint_id& key, std::filesystem::path hint_directory, manager& shard_manager, scheduling_group send_sg); hint_endpoint_manager(const endpoint_id& key, std::filesystem::path hint_directory, manager& shard_manager);
hint_endpoint_manager(hint_endpoint_manager&&); hint_endpoint_manager(hint_endpoint_manager&&);
~hint_endpoint_manager(); ~hint_endpoint_manager();

View File

@@ -122,7 +122,7 @@ const column_mapping& hint_sender::get_column_mapping(lw_shared_ptr<send_one_fil
return cm_it->second; return cm_it->second;
} }
hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy,replica::database& local_db, const gms::gossiper& local_gossiper, scheduling_group sg) noexcept hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy,replica::database& local_db, const gms::gossiper& local_gossiper) noexcept
: _stopped(make_ready_future<>()) : _stopped(make_ready_future<>())
, _ep_key(parent.end_point_key()) , _ep_key(parent.end_point_key())
, _ep_manager(parent) , _ep_manager(parent)
@@ -130,7 +130,7 @@ hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy&
, _resource_manager(_shard_manager._resource_manager) , _resource_manager(_shard_manager._resource_manager)
, _proxy(local_storage_proxy) , _proxy(local_storage_proxy)
, _db(local_db) , _db(local_db)
, _hints_cpu_sched_group(sg) , _hints_cpu_sched_group(_db.get_streaming_scheduling_group())
, _gossiper(local_gossiper) , _gossiper(local_gossiper)
, _file_update_mutex(_ep_manager.file_update_mutex()) , _file_update_mutex(_ep_manager.file_update_mutex())
{} {}

View File

@@ -120,7 +120,7 @@ private:
std::multimap<db::replay_position, lw_shared_ptr<std::optional<promise<>>>> _replay_waiters; std::multimap<db::replay_position, lw_shared_ptr<std::optional<promise<>>>> _replay_waiters;
public: public:
hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy, replica::database& local_db, const gms::gossiper& local_gossiper, scheduling_group sg) noexcept; hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy, replica::database& local_db, const gms::gossiper& local_gossiper) noexcept;
~hint_sender(); ~hint_sender();
/// \brief A constructor that should be called from the copy/move-constructor of hint_endpoint_manager. /// \brief A constructor that should be called from the copy/move-constructor of hint_endpoint_manager.

View File

@@ -142,7 +142,7 @@ future<> directory_initializer::ensure_rebalanced() {
} }
manager::manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, manager::manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, int64_t max_hint_window_ms,
resource_manager& res_manager, sharded<replica::database>& db, scheduling_group sg) resource_manager& res_manager, sharded<replica::database>& db)
: _hints_dir(fs::path(hints_directory) / fmt::to_string(this_shard_id())) : _hints_dir(fs::path(hints_directory) / fmt::to_string(this_shard_id()))
, _host_filter(std::move(filter)) , _host_filter(std::move(filter))
, _proxy(proxy) , _proxy(proxy)
@@ -150,7 +150,6 @@ manager::manager(service::storage_proxy& proxy, sstring hints_directory, host_fi
, _local_db(db.local()) , _local_db(db.local())
, _draining_eps_gate(seastar::format("hints::manager::{}", _hints_dir.native())) , _draining_eps_gate(seastar::format("hints::manager::{}", _hints_dir.native()))
, _resource_manager(res_manager) , _resource_manager(res_manager)
, _hints_sending_sched_group(sg)
{ {
if (utils::get_local_injector().enter("decrease_hints_flush_period")) { if (utils::get_local_injector().enter("decrease_hints_flush_period")) {
hints_flush_period = std::chrono::seconds{1}; hints_flush_period = std::chrono::seconds{1};
@@ -416,7 +415,7 @@ hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const
try { try {
std::filesystem::path hint_directory = hints_dir() / (_uses_host_id ? fmt::to_string(host_id) : fmt::to_string(ip)); std::filesystem::path hint_directory = hints_dir() / (_uses_host_id ? fmt::to_string(host_id) : fmt::to_string(ip));
auto [it, _] = _ep_managers.emplace(host_id, hint_endpoint_manager{host_id, std::move(hint_directory), *this, _hints_sending_sched_group}); auto [it, _] = _ep_managers.emplace(host_id, hint_endpoint_manager{host_id, std::move(hint_directory), *this});
hint_endpoint_manager& ep_man = it->second; hint_endpoint_manager& ep_man = it->second;
manager_logger.trace("Created an endpoint manager for {}", host_id); manager_logger.trace("Created an endpoint manager for {}", host_id);

View File

@@ -133,7 +133,6 @@ private:
hint_stats _stats; hint_stats _stats;
seastar::metrics::metric_groups _metrics; seastar::metrics::metric_groups _metrics;
scheduling_group _hints_sending_sched_group;
// We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will // We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will
// still represent IP addresses. But after the migration, they will start representing host IDs. // still represent IP addresses. But after the migration, they will start representing host IDs.
@@ -156,7 +155,7 @@ private:
public: public:
manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter,
int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db, scheduling_group sg); int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db);
manager(const manager&) = delete; manager(const manager&) = delete;
manager& operator=(const manager&) = delete; manager& operator=(const manager&) = delete;

View File

@@ -24,7 +24,7 @@
#include "readers/forwardable.hh" #include "readers/forwardable.hh"
#include "readers/nonforwardable.hh" #include "readers/nonforwardable.hh"
#include "cache_mutation_reader.hh" #include "cache_mutation_reader.hh"
#include "replica/partition_snapshot_reader.hh" #include "partition_snapshot_reader.hh"
#include "keys/clustering_key_filter.hh" #include "keys/clustering_key_filter.hh"
#include "utils/assert.hh" #include "utils/assert.hh"
#include "utils/updateable_value.hh" #include "utils/updateable_value.hh"
@@ -845,12 +845,12 @@ mutation_reader row_cache::make_nonpopulating_reader(schema_ptr schema, reader_p
cache_entry& e = *i; cache_entry& e = *i;
upgrade_entry(e); upgrade_entry(e);
tracing::trace(ts, "Reading partition {} from cache", pos); tracing::trace(ts, "Reading partition {} from cache", pos);
return replica::make_partition_snapshot_reader<false, dummy_accounter>( return make_partition_snapshot_flat_reader<false, dummy_accounter>(
schema, schema,
std::move(permit), std::move(permit),
e.key(), e.key(),
query::clustering_key_filter_ranges(slice.row_ranges(*schema, e.key().key())), query::clustering_key_filter_ranges(slice.row_ranges(*schema, e.key().key())),
e.partition().read(_tracker.region(), _tracker.memtable_cleaner(), &_tracker, phase_of(pos)), e.partition().read(_tracker.region(), _tracker.memtable_cleaner(), nullptr, phase_of(pos)),
false, false,
_tracker.region(), _tracker.region(),
_read_section, _read_section,

View File

@@ -96,16 +96,16 @@ static logging::logger diff_logger("schema_diff");
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db { namespace db {
namespace { namespace {
const auto set_use_schema_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) { const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (builder.ks_name() == schema_tables::NAME) { if (ks_name == schema_tables::NAME) {
builder.enable_schema_commitlog(); props.enable_schema_commitlog();
} }
}); });
const auto set_group0_table_options = const auto set_group0_table_options =
schema_builder::register_schema_initializer([](schema_builder& builder) { schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (builder.ks_name() == schema_tables::NAME) { if (ks_name == schema_tables::NAME) {
// all schema tables are group0 tables // all schema tables are group0 tables
builder.set_is_group0_table(true); props.is_group0_table = true;
} }
}); });
} }

View File

@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
}); });
} }
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) { future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
if (tag.empty()) { if (tag.empty()) {
throw std::runtime_error("You must supply a snapshot name."); throw std::runtime_error("You must supply a snapshot name.");
} }
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names)); std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
}; };
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable { return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts); return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
}); });
} }
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) { future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) { co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag); return check_snapshot_not_exist(ks_name, tag);
}); });
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) { co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts); return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
}); });
} }
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) { future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
if (ks_name.empty()) { if (ks_name.empty()) {
throw std::runtime_error("You must supply a keyspace name"); throw std::runtime_error("You must supply a keyspace name");
} }
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
throw std::runtime_error("You must supply a snapshot name."); throw std::runtime_error("You must supply a snapshot name.");
} }
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable { return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts); return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
}); });
} }
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) { future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
co_await check_snapshot_not_exist(ks_name, tag, tables); co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts); co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
} }
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) { future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {

View File

@@ -38,14 +38,10 @@ class backup_task_impl;
} // snapshot namespace } // snapshot namespace
struct snapshot_options {
bool skip_flush = false;
gc_clock::time_point created_at = gc_clock::now();
std::optional<gc_clock::time_point> expires_at;
};
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> { class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
public: public:
using skip_flush = bool_class<class skip_flush_tag>;
struct table_snapshot_details { struct table_snapshot_details {
int64_t total; int64_t total;
int64_t live; int64_t live;
@@ -74,8 +70,8 @@ public:
* *
* @param tag the tag given to the snapshot; may not be null or empty * @param tag the tag given to the snapshot; may not be null or empty
*/ */
future<> take_snapshot(sstring tag, snapshot_options opts = {}) { future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
return take_snapshot(tag, {}, opts); return take_snapshot(tag, {}, sf);
} }
/** /**
@@ -84,7 +80,7 @@ public:
* @param tag the tag given to the snapshot; may not be null or empty * @param tag the tag given to the snapshot; may not be null or empty
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all" * @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
*/ */
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {}); future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
/** /**
* Takes the snapshot of multiple tables. A snapshot name must be specified. * Takes the snapshot of multiple tables. A snapshot name must be specified.
@@ -93,7 +89,7 @@ public:
* @param tables a vector of tables names to snapshot * @param tables a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty * @param tag the tag given to the snapshot; may not be null or empty
*/ */
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {}); future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
/** /**
* Remove the snapshot with the given name from the given keyspaces. * Remove the snapshot with the given name from the given keyspaces.
@@ -131,8 +127,8 @@ private:
friend class snapshot::backup_task_impl; friend class snapshot::backup_task_impl;
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} ); future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {}); future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
}; };
} }

View File

@@ -42,11 +42,11 @@ extern logging::logger cdc_log;
namespace db { namespace db {
namespace { namespace {
const auto set_wait_for_sync_to_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) { const auto set_wait_for_sync_to_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if ((builder.ks_name() == system_distributed_keyspace::NAME_EVERYWHERE && builder.cf_name() == system_distributed_keyspace::CDC_GENERATIONS_V2) || if ((ks_name == system_distributed_keyspace::NAME_EVERYWHERE && cf_name == system_distributed_keyspace::CDC_GENERATIONS_V2) ||
(builder.ks_name() == system_distributed_keyspace::NAME && builder.cf_name() == system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION)) (ks_name == system_distributed_keyspace::NAME && cf_name == system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION))
{ {
builder.set_wait_for_sync_to_commitlog(true); props.wait_for_sync_to_commitlog = true;
} }
}); });
} }

View File

@@ -66,24 +66,24 @@ static thread_local auto sstableinfo_type = user_type_impl::get_instance(
namespace db { namespace db {
namespace { namespace {
const auto set_null_sharder = schema_builder::register_schema_initializer([](schema_builder& builder) { const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
// tables in the "system" keyspace which need to use null sharder // tables in the "system" keyspace which need to use null sharder
static const std::unordered_set<sstring> tables = { static const std::unordered_set<sstring> tables = {
// empty // empty
}; };
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) { if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
builder.set_use_null_sharder(true); props.use_null_sharder = true;
} }
}); });
const auto set_wait_for_sync_to_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) { const auto set_wait_for_sync_to_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
static const std::unordered_set<sstring> tables = { static const std::unordered_set<sstring> tables = {
system_keyspace::PAXOS, system_keyspace::PAXOS,
}; };
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) { if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
builder.set_wait_for_sync_to_commitlog(true); props.wait_for_sync_to_commitlog = true;
} }
}); });
const auto set_use_schema_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) { const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
static const std::unordered_set<sstring> tables = { static const std::unordered_set<sstring> tables = {
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY, schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
system_keyspace::BROADCAST_KV_STORE, system_keyspace::BROADCAST_KV_STORE,
@@ -108,18 +108,18 @@ namespace {
system_keyspace::ROLE_MEMBERS, system_keyspace::ROLE_MEMBERS,
system_keyspace::ROLE_ATTRIBUTES, system_keyspace::ROLE_ATTRIBUTES,
system_keyspace::ROLE_PERMISSIONS, system_keyspace::ROLE_PERMISSIONS,
system_keyspace::CDC_LOCAL, system_keyspace::v3::CDC_LOCAL,
system_keyspace::DICTS, system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS, system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES, system_keyspace::CLIENT_ROUTES,
}; };
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) { if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
builder.enable_schema_commitlog(); props.enable_schema_commitlog();
} }
}); });
const auto set_group0_table_options = const auto set_group0_table_options =
schema_builder::register_schema_initializer([](schema_builder& builder) { schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
static const std::unordered_set<sstring> tables = { static const std::unordered_set<sstring> tables = {
// scylla_local may store a replicated tombstone related to schema // scylla_local may store a replicated tombstone related to schema
// (see `make_group0_schema_version_mutation`), so we include it in the group0 tables list. // (see `make_group0_schema_version_mutation`), so we include it in the group0 tables list.
@@ -142,8 +142,8 @@ namespace {
system_keyspace::CLIENT_ROUTES, system_keyspace::CLIENT_ROUTES,
system_keyspace::REPAIR_TASKS, system_keyspace::REPAIR_TASKS,
}; };
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) { if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
builder.set_is_group0_table(true); props.is_group0_table = true;
} }
}); });
} }
@@ -918,7 +918,7 @@ schema_ptr system_keyspace::corrupt_data() {
return scylla_local; return scylla_local;
} }
schema_ptr system_keyspace::batches() { schema_ptr system_keyspace::v3::batches() {
static thread_local auto schema = [] { static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHES), NAME, BATCHES, schema_builder builder(generate_legacy_id(NAME, BATCHES), NAME, BATCHES,
// partition key // partition key
@@ -946,7 +946,53 @@ schema_ptr system_keyspace::batches() {
return schema; return schema;
} }
schema_ptr system_keyspace::truncated() { schema_ptr system_keyspace::v3::built_indexes() {
// identical to ours, but ours otoh is a mix-in of the 3.x series cassandra one
return db::system_keyspace::built_indexes();
}
schema_ptr system_keyspace::v3::local() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, LOCAL), NAME, LOCAL,
// partition key
{{"key", utf8_type}},
// clustering key
{},
// regular columns
{
{"bootstrapped", utf8_type},
{"broadcast_address", inet_addr_type},
{"cluster_name", utf8_type},
{"cql_version", utf8_type},
{"data_center", utf8_type},
{"gossip_generation", int32_type},
{"host_id", uuid_type},
{"listen_address", inet_addr_type},
{"native_protocol_version", utf8_type},
{"partitioner", utf8_type},
{"rack", utf8_type},
{"release_version", utf8_type},
{"rpc_address", inet_addr_type},
{"schema_version", uuid_type},
{"thrift_version", utf8_type},
{"tokens", set_type_impl::get_instance(utf8_type, true)},
{"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"information about the local node"
);
builder.set_gc_grace_seconds(0);
builder.with_hash_version();
return builder.build(schema_builder::compact_storage::no);
}();
return schema;
}
schema_ptr system_keyspace::v3::truncated() {
static thread_local auto local = [] { static thread_local auto local = [] {
schema_builder builder(generate_legacy_id(NAME, TRUNCATED), NAME, TRUNCATED, schema_builder builder(generate_legacy_id(NAME, TRUNCATED), NAME, TRUNCATED,
// partition key // partition key
@@ -976,7 +1022,7 @@ schema_ptr system_keyspace::truncated() {
thread_local data_type replay_position_type = tuple_type_impl::get_instance({long_type, int32_type}); thread_local data_type replay_position_type = tuple_type_impl::get_instance({long_type, int32_type});
schema_ptr system_keyspace::commitlog_cleanups() { schema_ptr system_keyspace::v3::commitlog_cleanups() {
static thread_local auto local = [] { static thread_local auto local = [] {
schema_builder builder(generate_legacy_id(NAME, COMMITLOG_CLEANUPS), NAME, COMMITLOG_CLEANUPS, schema_builder builder(generate_legacy_id(NAME, COMMITLOG_CLEANUPS), NAME, COMMITLOG_CLEANUPS,
// partition key // partition key
@@ -1003,7 +1049,47 @@ schema_ptr system_keyspace::commitlog_cleanups() {
return local; return local;
} }
schema_ptr system_keyspace::available_ranges() { schema_ptr system_keyspace::v3::peers() {
// identical
return db::system_keyspace::peers();
}
schema_ptr system_keyspace::v3::peer_events() {
// identical
return db::system_keyspace::peer_events();
}
schema_ptr system_keyspace::v3::range_xfers() {
// identical
return db::system_keyspace::range_xfers();
}
schema_ptr system_keyspace::v3::compaction_history() {
// identical
return db::system_keyspace::compaction_history();
}
schema_ptr system_keyspace::v3::sstable_activity() {
// identical
return db::system_keyspace::sstable_activity();
}
schema_ptr system_keyspace::v3::size_estimates() {
// identical
return db::system_keyspace::size_estimates();
}
schema_ptr system_keyspace::v3::large_partitions() {
// identical
return db::system_keyspace::large_partitions();
}
schema_ptr system_keyspace::v3::scylla_local() {
// identical
return db::system_keyspace::scylla_local();
}
schema_ptr system_keyspace::v3::available_ranges() {
static thread_local auto schema = [] { static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, AVAILABLE_RANGES), NAME, AVAILABLE_RANGES, schema_builder builder(generate_legacy_id(NAME, AVAILABLE_RANGES), NAME, AVAILABLE_RANGES,
// partition key // partition key
@@ -1026,7 +1112,7 @@ schema_ptr system_keyspace::available_ranges() {
return schema; return schema;
} }
schema_ptr system_keyspace::views_builds_in_progress() { schema_ptr system_keyspace::v3::views_builds_in_progress() {
static thread_local auto schema = [] { static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, VIEWS_BUILDS_IN_PROGRESS), NAME, VIEWS_BUILDS_IN_PROGRESS, schema_builder builder(generate_legacy_id(NAME, VIEWS_BUILDS_IN_PROGRESS), NAME, VIEWS_BUILDS_IN_PROGRESS,
// partition key // partition key
@@ -1049,7 +1135,7 @@ schema_ptr system_keyspace::views_builds_in_progress() {
return schema; return schema;
} }
schema_ptr system_keyspace::built_views() { schema_ptr system_keyspace::v3::built_views() {
static thread_local auto schema = [] { static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_VIEWS), NAME, BUILT_VIEWS, schema_builder builder(generate_legacy_id(NAME, BUILT_VIEWS), NAME, BUILT_VIEWS,
// partition key // partition key
@@ -1072,7 +1158,7 @@ schema_ptr system_keyspace::built_views() {
return schema; return schema;
} }
schema_ptr system_keyspace::scylla_views_builds_in_progress() { schema_ptr system_keyspace::v3::scylla_views_builds_in_progress() {
static thread_local auto schema = [] { static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS); auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, std::make_optional(id)) return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, std::make_optional(id))
@@ -1088,7 +1174,7 @@ schema_ptr system_keyspace::scylla_views_builds_in_progress() {
return schema; return schema;
} }
/*static*/ schema_ptr system_keyspace::cdc_local() { /*static*/ schema_ptr system_keyspace::v3::cdc_local() {
static thread_local auto cdc_local = [] { static thread_local auto cdc_local = [] {
schema_builder builder(generate_legacy_id(NAME, CDC_LOCAL), NAME, CDC_LOCAL, schema_builder builder(generate_legacy_id(NAME, CDC_LOCAL), NAME, CDC_LOCAL,
// partition key // partition key
@@ -2094,21 +2180,21 @@ future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) {
co_await std::visit(make_visitor( co_await std::visit(make_visitor(
[this] (cdc::generation_id_v1 id) -> future<> { [this] (cdc::generation_id_v1 id) -> future<> {
co_await execute_cql( co_await execute_cql(
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", CDC_LOCAL), format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
sstring(CDC_LOCAL), id.ts); sstring(v3::CDC_LOCAL), id.ts);
}, },
[this] (cdc::generation_id_v2 id) -> future<> { [this] (cdc::generation_id_v2 id) -> future<> {
co_await execute_cql( co_await execute_cql(
format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", CDC_LOCAL), format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", v3::CDC_LOCAL),
sstring(CDC_LOCAL), id.ts, id.id); sstring(v3::CDC_LOCAL), id.ts, id.id);
} }
), gen_id); ), gen_id);
} }
future<std::optional<cdc::generation_id>> system_keyspace::get_cdc_generation_id() { future<std::optional<cdc::generation_id>> system_keyspace::get_cdc_generation_id() {
auto msg = co_await execute_cql( auto msg = co_await execute_cql(
format("SELECT streams_timestamp, uuid FROM system.{} WHERE key = ?", CDC_LOCAL), format("SELECT streams_timestamp, uuid FROM system.{} WHERE key = ?", v3::CDC_LOCAL),
sstring(CDC_LOCAL)); sstring(v3::CDC_LOCAL));
if (msg->empty()) { if (msg->empty()) {
co_return std::nullopt; co_return std::nullopt;
@@ -2134,19 +2220,19 @@ static const sstring CDC_REWRITTEN_KEY = "rewritten";
future<> system_keyspace::cdc_set_rewritten(std::optional<cdc::generation_id_v1> gen_id) { future<> system_keyspace::cdc_set_rewritten(std::optional<cdc::generation_id_v1> gen_id) {
if (gen_id) { if (gen_id) {
return execute_cql( return execute_cql(
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", CDC_LOCAL), format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
CDC_REWRITTEN_KEY, gen_id->ts).discard_result(); CDC_REWRITTEN_KEY, gen_id->ts).discard_result();
} else { } else {
// Insert just the row marker. // Insert just the row marker.
return execute_cql( return execute_cql(
format("INSERT INTO system.{} (key) VALUES (?)", CDC_LOCAL), format("INSERT INTO system.{} (key) VALUES (?)", v3::CDC_LOCAL),
CDC_REWRITTEN_KEY).discard_result(); CDC_REWRITTEN_KEY).discard_result();
} }
} }
future<bool> system_keyspace::cdc_is_rewritten() { future<bool> system_keyspace::cdc_is_rewritten() {
// We don't care about the actual timestamp; it's additional information for debugging purposes. // We don't care about the actual timestamp; it's additional information for debugging purposes.
return execute_cql(format("SELECT key FROM system.{} WHERE key = ?", CDC_LOCAL), CDC_REWRITTEN_KEY) return execute_cql(format("SELECT key FROM system.{} WHERE key = ?", v3::CDC_LOCAL), CDC_REWRITTEN_KEY)
.then([] (::shared_ptr<cql3::untyped_result_set> msg) { .then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return !msg->empty(); return !msg->empty();
}); });
@@ -2290,11 +2376,11 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
scylla_local(), db::schema_tables::scylla_table_schema_history(), scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(), repair_history(),
repair_tasks(), repair_tasks(),
views_builds_in_progress(), built_views(), v3::views_builds_in_progress(), v3::built_views(),
scylla_views_builds_in_progress(), v3::scylla_views_builds_in_progress(),
truncated(), v3::truncated(),
commitlog_cleanups(), v3::commitlog_cleanups(),
cdc_local(), v3::cdc_local(),
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(), raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(), topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history() dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
@@ -2317,7 +2403,7 @@ static bool maybe_write_in_user_memory(schema_ptr s) {
return (s.get() == system_keyspace::batchlog().get()) return (s.get() == system_keyspace::batchlog().get())
|| (s.get() == system_keyspace::batchlog_v2().get()) || (s.get() == system_keyspace::batchlog_v2().get())
|| (s.get() == system_keyspace::paxos().get()) || (s.get() == system_keyspace::paxos().get())
|| s == system_keyspace::scylla_views_builds_in_progress(); || s == system_keyspace::v3::scylla_views_builds_in_progress();
} }
future<> system_keyspace::make( future<> system_keyspace::make(
@@ -2603,7 +2689,7 @@ mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::v
future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) { future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)", sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
SCYLLA_VIEWS_BUILDS_IN_PROGRESS); v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
return execute_cql( return execute_cql(
std::move(req), std::move(req),
std::move(ks_name), std::move(ks_name),
@@ -2619,7 +2705,7 @@ future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_n
// before all shards are registered. // before all shards are registered.
// if another shard has already registered, this won't overwrite its status. if it hasn't registered, we insert // if another shard has already registered, this won't overwrite its status. if it hasn't registered, we insert
// a status with first_token=null and next_token=null, indicating it hasn't made progress. // a status with first_token=null and next_token=null, indicating it hasn't made progress.
auto&& schema = db::system_keyspace::scylla_views_builds_in_progress(); auto&& schema = db::system_keyspace::v3::scylla_views_builds_in_progress();
auto timestamp = api::new_timestamp(); auto timestamp = api::new_timestamp();
mutation m{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks_name))}; mutation m{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks_name))};
@@ -2637,7 +2723,7 @@ future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_n
future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) { future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)", sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
SCYLLA_VIEWS_BUILDS_IN_PROGRESS); v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
return execute_cql( return execute_cql(
std::move(req), std::move(req),
std::move(ks_name), std::move(ks_name),
@@ -2648,14 +2734,14 @@ future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring vi
future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) { future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
return execute_cql( return execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS), format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
std::move(ks_name), std::move(ks_name),
std::move(view_name)).discard_result(); std::move(view_name)).discard_result();
} }
future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) { future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) {
return execute_cql( return execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS), format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
std::move(ks_name), std::move(ks_name),
std::move(view_name), std::move(view_name),
int32_t(this_shard_id())).discard_result(); int32_t(this_shard_id())).discard_result();
@@ -2663,20 +2749,20 @@ future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring vi
future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) { future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) {
return execute_cql( return execute_cql(
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", BUILT_VIEWS), format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
std::move(ks_name), std::move(ks_name),
std::move(view_name)).discard_result(); std::move(view_name)).discard_result();
} }
future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) { future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) {
return execute_cql( return execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", BUILT_VIEWS), format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
std::move(ks_name), std::move(ks_name),
std::move(view_name)).discard_result(); std::move(view_name)).discard_result();
} }
future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_views() { future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_views() {
return execute_cql(format("SELECT * FROM system.{}", BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) { return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return *cql_result return *cql_result
| std::views::transform([] (const cql3::untyped_result_set::row& row) { | std::views::transform([] (const cql3::untyped_result_set::row& row) {
auto ks_name = row.get_as<sstring>("keyspace_name"); auto ks_name = row.get_as<sstring>("keyspace_name");
@@ -2688,7 +2774,7 @@ future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_view
future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_view_build_progress() { future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_view_build_progress() {
return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}", return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) { v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
std::vector<view_build_progress> progress; std::vector<view_build_progress> progress;
for (auto& row : *cql_result) { for (auto& row : *cql_result) {
auto ks_name = row.get_as<sstring>("keyspace_name"); auto ks_name = row.get_as<sstring>("keyspace_name");
@@ -3141,8 +3227,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
co_return ret; co_return ret;
} }
const bool strongly_consistent_tables = _db.features().strongly_consistent_tables;
for (auto& row : *rs) { for (auto& row : *rs) {
if (!row.has("host_id")) { if (!row.has("host_id")) {
// There are no clustering rows, only the static row. // There are no clustering rows, only the static row.
@@ -3379,9 +3463,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.session = service::session_id(some_row.get_as<utils::UUID>("session")); ret.session = service::session_id(some_row.get_as<utils::UUID>("session"));
} }
if (strongly_consistent_tables) { if (some_row.has("tablet_balancing_enabled")) {
ret.tablet_balancing_enabled = false;
} else if (some_row.has("tablet_balancing_enabled")) {
ret.tablet_balancing_enabled = some_row.get_as<bool>("tablet_balancing_enabled"); ret.tablet_balancing_enabled = some_row.get_as<bool>("tablet_balancing_enabled");
} else { } else {
ret.tablet_balancing_enabled = true; ret.tablet_balancing_enabled = true;

View File

@@ -127,8 +127,6 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
static schema_ptr raft_snapshot_config(); static schema_ptr raft_snapshot_config();
static schema_ptr local(); static schema_ptr local();
static schema_ptr truncated();
static schema_ptr commitlog_cleanups();
static schema_ptr peers(); static schema_ptr peers();
static schema_ptr peer_events(); static schema_ptr peer_events();
static schema_ptr range_xfers(); static schema_ptr range_xfers();
@@ -139,10 +137,7 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
static schema_ptr large_rows(); static schema_ptr large_rows();
static schema_ptr large_cells(); static schema_ptr large_cells();
static schema_ptr corrupt_data(); static schema_ptr corrupt_data();
static schema_ptr batches(); static schema_ptr scylla_local();
static schema_ptr available_ranges();
static schema_ptr built_views();
static schema_ptr cdc_local();
future<> force_blocking_flush(sstring cfname); future<> force_blocking_flush(sstring cfname);
// This function is called when the system.peers table is read, // This function is called when the system.peers table is read,
// and it fixes some types of inconsistencies that can occur // and it fixes some types of inconsistencies that can occur
@@ -209,12 +204,6 @@ public:
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks"; static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
static constexpr auto CLIENT_ROUTES = "client_routes"; static constexpr auto CLIENT_ROUTES = "client_routes";
static constexpr auto VERSIONS = "versions"; static constexpr auto VERSIONS = "versions";
static constexpr auto BATCHES = "batches";
static constexpr auto AVAILABLE_RANGES = "available_ranges";
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
static constexpr auto BUILT_VIEWS = "built_views";
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
static constexpr auto CDC_LOCAL = "cdc_local";
// auth // auth
static constexpr auto ROLES = "roles"; static constexpr auto ROLES = "roles";
@@ -222,6 +211,42 @@ public:
static constexpr auto ROLE_ATTRIBUTES = "role_attributes"; static constexpr auto ROLE_ATTRIBUTES = "role_attributes";
static constexpr auto ROLE_PERMISSIONS = "role_permissions"; static constexpr auto ROLE_PERMISSIONS = "role_permissions";
struct v3 {
static constexpr auto BATCHES = "batches";
static constexpr auto PAXOS = "paxos";
static constexpr auto BUILT_INDEXES = "IndexInfo";
static constexpr auto LOCAL = "local";
static constexpr auto PEERS = "peers";
static constexpr auto PEER_EVENTS = "peer_events";
static constexpr auto RANGE_XFERS = "range_xfers";
static constexpr auto COMPACTION_HISTORY = "compaction_history";
static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
static constexpr auto SIZE_ESTIMATES = "size_estimates";
static constexpr auto AVAILABLE_RANGES = "available_ranges";
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
static constexpr auto BUILT_VIEWS = "built_views";
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
static constexpr auto CDC_LOCAL = "cdc_local";
static schema_ptr batches();
static schema_ptr built_indexes();
static schema_ptr local();
static schema_ptr truncated();
static schema_ptr commitlog_cleanups();
static schema_ptr peers();
static schema_ptr peer_events();
static schema_ptr range_xfers();
static schema_ptr compaction_history();
static schema_ptr sstable_activity();
static schema_ptr size_estimates();
static schema_ptr large_partitions();
static schema_ptr scylla_local();
static schema_ptr available_ranges();
static schema_ptr views_builds_in_progress();
static schema_ptr built_views();
static schema_ptr scylla_views_builds_in_progress();
static schema_ptr cdc_local();
};
// Partition estimates for a given range of tokens. // Partition estimates for a given range of tokens.
struct range_estimates { struct range_estimates {
schema_ptr schema; schema_ptr schema;
@@ -239,7 +264,6 @@ public:
static schema_ptr batchlog_v2(); static schema_ptr batchlog_v2();
static schema_ptr paxos(); static schema_ptr paxos();
static schema_ptr built_indexes(); // TODO (from Cassandra): make private static schema_ptr built_indexes(); // TODO (from Cassandra): make private
static schema_ptr scylla_local();
static schema_ptr raft(); static schema_ptr raft();
static schema_ptr raft_snapshots(); static schema_ptr raft_snapshots();
static schema_ptr repair_history(); static schema_ptr repair_history();
@@ -259,8 +283,6 @@ public:
static schema_ptr dicts(); static schema_ptr dicts();
static schema_ptr view_building_tasks(); static schema_ptr view_building_tasks();
static schema_ptr client_routes(); static schema_ptr client_routes();
static schema_ptr views_builds_in_progress();
static schema_ptr scylla_views_builds_in_progress();
// auth // auth
static schema_ptr roles(); static schema_ptr roles();

View File

@@ -195,7 +195,7 @@ public:
return mutation_reader(std::make_unique<build_progress_reader>( return mutation_reader(std::make_unique<build_progress_reader>(
s, s,
std::move(permit), std::move(permit),
_db.find_column_family(s->ks_name(), system_keyspace::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
range, range,
slice, slice,
std::move(trace_state), std::move(trace_state),

View File

@@ -23,7 +23,6 @@
#include <seastar/core/future-util.hh> #include <seastar/core/future-util.hh>
#include <seastar/core/coroutine.hh> #include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/maybe_yield.hh> #include <seastar/coroutine/maybe_yield.hh>
#include <flat_map> #include <flat_map>
@@ -66,7 +65,6 @@
#include "mutation/timestamp.hh" #include "mutation/timestamp.hh"
#include "utils/assert.hh" #include "utils/assert.hh"
#include "utils/small_vector.hh" #include "utils/small_vector.hh"
#include "view_builder.hh"
#include "view_info.hh" #include "view_info.hh"
#include "view_update_checks.hh" #include "view_update_checks.hh"
#include "types/list.hh" #include "types/list.hh"
@@ -2240,20 +2238,12 @@ void view_builder::setup_metrics() {
} }
future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) { future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
auto step_fiber = make_ready_future<>();
try { try {
view_builder_init_state vbi; view_builder_init_state vbi;
auto fail = defer([&barrier] mutable { barrier.abort(); }); auto fail = defer([&barrier] mutable { barrier.abort(); });
// Semaphore usage invariants: // Guard the whole startup routine with a semaphore,
// - One unit of _sem serializes all per-shard bookkeeping that mutates view-builder state // so that it's not intercepted by `on_drop_view`, `on_create_view`
// (_base_to_build_step, _built_views, build_status, reader resets). // or `on_update_view` events.
// - The unit is held for the whole operation, including the async chain, until the state
// is stable for the next operation on that shard.
// - Cross-shard operations acquire _sem on shard 0 for the duration of the broadcast.
// Other shards acquire their own _sem only around their local handling; shard 0 skips
// the local acquire because it already holds the unit from the dispatcher.
// Guard the whole startup routine with a semaphore so that it's not intercepted by
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
auto units = co_await get_units(_sem, view_builder_semaphore_units); auto units = co_await get_units(_sem, view_builder_semaphore_units);
// Wait for schema agreement even if we're a seed node. // Wait for schema agreement even if we're a seed node.
co_await mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as); co_await mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as);
@@ -2274,10 +2264,8 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
_mnotifier.register_listener(this); _mnotifier.register_listener(this);
co_await calculate_shard_build_step(vbi); co_await calculate_shard_build_step(vbi);
_current_step = _base_to_build_step.begin(); _current_step = _base_to_build_step.begin();
// Waited on indirectly in stop().
// If preparation above fails, run_in_background() is not invoked, just (void)_build_step.trigger();
// the start_in_background() emits a warning into logs and resolves
step_fiber = run_in_background();
} catch (...) { } catch (...) {
auto ex = std::current_exception(); auto ex = std::current_exception();
auto ll = log_level::error; auto ll = log_level::error;
@@ -2292,12 +2280,10 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
} }
vlogger.log(ll, "start aborted: {}", ex); vlogger.log(ll, "start aborted: {}", ex);
} }
co_await std::move(step_fiber);
} }
future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) { future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
_step_fiber = start_in_background(mm, std::move(barrier)); _started = start_in_background(mm, std::move(barrier));
return make_ready_future<>(); return make_ready_future<>();
} }
@@ -2307,12 +2293,12 @@ future<> view_builder::drain() {
} }
vlogger.info("Draining view builder"); vlogger.info("Draining view builder");
_as.request_abort(); _as.request_abort();
co_await std::move(_started);
co_await _mnotifier.unregister_listener(this); co_await _mnotifier.unregister_listener(this);
co_await _vug.drain(); co_await _vug.drain();
co_await _sem.wait(); co_await _sem.wait();
_sem.broken(); _sem.broken();
_build_step.broken(); co_await _build_step.join();
co_await std::move(_step_fiber);
co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair<const table_id, build_step>& p) { co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair<const table_id, build_step>& p) {
return p.second.reader.close(); return p.second.reader.close();
}); });
@@ -2681,59 +2667,63 @@ static bool should_ignore_tablet_keyspace(const replica::database& db, const sst
return db.features().view_building_coordinator && db.has_keyspace(ks_name) && db.find_keyspace(ks_name).uses_tablets(); return db.features().view_building_coordinator && db.has_keyspace(ks_name) && db.find_keyspace(ks_name).uses_tablets();
} }
future<view_builder::view_builder_units> view_builder::get_or_adopt_view_builder_lock(view_builder_units_opt units) {
co_return units ? std::move(*units) : co_await get_units(_sem, view_builder_semaphore_units);
}
future<> view_builder::dispatch_create_view(sstring ks_name, sstring view_name) { future<> view_builder::dispatch_create_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) { if (should_ignore_tablet_keyspace(_db, ks_name)) {
co_return; return make_ready_future<>();
} }
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
auto units = co_await get_or_adopt_view_builder_lock(std::nullopt); // This runs on shard 0 only; seed the global rows before broadcasting.
co_await handle_seed_view_build_progress(ks_name, view_name); return handle_seed_view_build_progress(ks_name, view_name).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return container().invoke_on_all([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable {
co_await coroutine::all( return vb.handle_create_view_local(std::move(ks_name), std::move(view_name));
[this, ks_name, view_name, units = std::move(units)] mutable -> future<> { });
co_await handle_create_view_local(ks_name, view_name, std::move(units)); }, });
[this, ks_name, view_name] mutable -> future<> { });
co_await container().invoke_on_others([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable -> future<> {
return vb.handle_create_view_local(ks_name, view_name, std::nullopt); }); });
} }
future<> view_builder::handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name) { future<> view_builder::handle_seed_view_build_progress(sstring ks_name, sstring view_name) {
auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto& step = get_or_create_build_step(view->view_info()->base_id()); auto& step = get_or_create_build_step(view->view_info()->base_id());
return _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token()); return _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
} }
future<> view_builder::handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units) { future<> view_builder::handle_create_view_local(sstring ks_name, sstring view_name){
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::move(units)); if (this_shard_id() == 0) {
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
} else {
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
});
}
}
future<> view_builder::handle_create_view_local_impl(sstring ks_name, sstring view_name) {
auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto& step = get_or_create_build_step(view->view_info()->base_id()); auto& step = get_or_create_build_step(view->view_info()->base_id());
try { return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
co_await coroutine::all( return flush_base(step.base, _as);
[&step] -> future<> { }).then([this, view, &step] () {
co_await step.base->await_pending_writes(); },
[&step] -> future<> {
co_await step.base->await_pending_streams(); });
co_await flush_base(step.base, _as);
// This resets the build step to the current token. It may result in views currently // This resets the build step to the current token. It may result in views currently
// being built to receive duplicate updates, but it simplifies things as we don't have // being built to receive duplicate updates, but it simplifies things as we don't have
// to keep around a list of new views to build the next time the reader crosses a token // to keep around a list of new views to build the next time the reader crosses a token
// threshold. // threshold.
co_await initialize_reader_at_current_token(step); return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
co_await add_new_view(view, step); return add_new_view(view, step);
} catch (abort_requested_exception&) { }).then_wrapped([this, view] (future<>&& f) {
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name()); try {
} catch (raft::request_aborted&) { f.get();
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name()); } catch (abort_requested_exception&) {
} catch (...) { vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception()); } catch (raft::request_aborted&) {
} vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
} catch (...) {
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
}
_build_step.signal(); // Waited on indirectly in stop().
static_cast<void>(_build_step.trigger());
});
});
} }
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
@@ -2770,55 +2760,62 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) { future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) { if (should_ignore_tablet_keyspace(_db, ks_name)) {
co_return; return make_ready_future<>();
} }
auto units = co_await get_or_adopt_view_builder_lock(std::nullopt); return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
// This runs on shard 0 only; broadcast local cleanup before global cleanup.
co_await coroutine::all( return container().invoke_on_all([ks_name, view_name] (view_builder& vb) mutable {
[this, ks_name, view_name, units = std::move(units)] mutable -> future<> { return vb.handle_drop_view_local(std::move(ks_name), std::move(view_name));
co_await handle_drop_view_local(ks_name, view_name, std::move(units)); }, }).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
[this, ks_name, view_name] mutable -> future<> { return handle_drop_view_global_cleanup(std::move(ks_name), std::move(view_name));
co_await container().invoke_on_others([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable -> future<> { });
return vb.handle_drop_view_local(ks_name, view_name, std::nullopt); });}); });
co_await handle_drop_view_global_cleanup(ks_name, view_name);
} }
future<> view_builder::handle_drop_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units) { future<> view_builder::handle_drop_view_local(sstring ks_name, sstring view_name) {
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::move(units)); if (this_shard_id() == 0) {
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name); return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
} else {
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
});
}
}
for (auto& [_, step] : _base_to_build_step) { future<> view_builder::handle_drop_view_local_impl(sstring ks_name, sstring view_name) {
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) { vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
continue; // The view is absent from the database at this point, so find it by brute force.
} ([&, this] {
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) { for (auto& [_, step] : _base_to_build_step) {
if (it->view->cf_name() == view_name) { if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
_built_views.erase(it->view->id()); continue;
step.build_status.erase(it); }
co_return; for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
if (it->view->cf_name() == view_name) {
_built_views.erase(it->view->id());
step.build_status.erase(it);
return;
}
} }
} }
} })();
return make_ready_future<>();
} }
future<> view_builder::handle_drop_view_global_cleanup(const sstring& ks_name, const sstring& view_name) { future<> view_builder::handle_drop_view_global_cleanup(sstring ks_name, sstring view_name) {
if (this_shard_id() != 0) { if (this_shard_id() != 0) {
co_return; return make_ready_future<>();
} }
vlogger.info0("Starting view global cleanup {}.{}", ks_name, view_name); vlogger.info0("Starting view global cleanup {}.{}", ks_name, view_name);
return when_all_succeed(
try { _sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name),
co_await coroutine::all( _sys_ks.remove_built_view(ks_name, view_name),
[this, &ks_name, &view_name] -> future<> { remove_view_build_status(ks_name, view_name))
co_await _sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name); }, .discard_result()
[this, &ks_name, &view_name] -> future<> { .handle_exception([ks_name, view_name] (std::exception_ptr ep) {
co_await _sys_ks.remove_built_view(ks_name, view_name); }, vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
[this, &ks_name, &view_name] -> future<> { });
co_await remove_view_build_status(ks_name, view_name); });
} catch (...) {
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, std::current_exception());
}
} }
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) { void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
@@ -2832,15 +2829,14 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
})); }));
} }
future<> view_builder::run_in_background() { future<> view_builder::do_build_step() {
return seastar::async([this] { // Run the view building in the streaming scheduling group
// so that it doesn't impact other tasks with higher priority.
seastar::thread_attributes attr;
attr.sched_group = _db.get_streaming_scheduling_group();
return seastar::async(std::move(attr), [this] {
exponential_backoff_retry r(1s, 1min); exponential_backoff_retry r(1s, 1min);
while (!_as.abort_requested()) { while (!_base_to_build_step.empty() && !_as.abort_requested()) {
try {
_build_step.wait([this] { return !_base_to_build_step.empty(); }).get();
} catch (const seastar::broken_condition_variable&) {
return;
}
auto units = get_units(_sem, view_builder_semaphore_units).get(); auto units = get_units(_sem, view_builder_semaphore_units).get();
++_stats.steps_performed; ++_stats.steps_performed;
try { try {
@@ -3711,7 +3707,7 @@ void validate_view_keyspace(const data_dictionary::database& db, std::string_vie
try { try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs); locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs);
} catch (const std::invalid_argument& e) { } catch (const std::exception& e) {
throw std::logic_error(fmt::format( throw std::logic_error(fmt::format(
"Materialized views and secondary indexes are not supported on the keyspace '{}': {}", "Materialized views and secondary indexes are not supported on the keyspace '{}': {}",
keyspace_name, e.what())); keyspace_name, e.what()));

View File

@@ -11,13 +11,13 @@
#include "query/query-request.hh" #include "query/query-request.hh"
#include "service/migration_listener.hh" #include "service/migration_listener.hh"
#include "service/raft/raft_group0_client.hh" #include "service/raft/raft_group0_client.hh"
#include "utils/serialized_action.hh"
#include "utils/cross-shard-barrier.hh" #include "utils/cross-shard-barrier.hh"
#include "replica/database.hh" #include "replica/database.hh"
#include <seastar/core/abort_source.hh> #include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh> #include <seastar/core/future.hh>
#include <seastar/core/semaphore.hh> #include <seastar/core/semaphore.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh> #include <seastar/core/sharded.hh>
#include <seastar/core/shared_future.hh> #include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh> #include <seastar/core/shared_ptr.hh>
@@ -104,12 +104,6 @@ class view_update_generator;
* redo the missing step, for simplicity. * redo the missing step, for simplicity.
*/ */
class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service<view_builder> { class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service<view_builder> {
//aliasing for semaphore units that will be used throughout the class
using view_builder_units = semaphore_units<named_semaphore_exception_factory>;
//aliasing for optional semaphore units that will be used throughout the class
using view_builder_units_opt = std::optional<view_builder_units>;
/** /**
* Keeps track of the build progress for a particular view. * Keeps track of the build progress for a particular view.
* When the view is built, next_token == first_token. * When the view is built, next_token == first_token.
@@ -174,24 +168,14 @@ class view_builder final : public service::migration_listener::only_view_notific
reader_permit _permit; reader_permit _permit;
base_to_build_step_type _base_to_build_step; base_to_build_step_type _base_to_build_step;
base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
condition_variable _build_step; serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
static constexpr size_t view_builder_semaphore_units = 1; static constexpr size_t view_builder_semaphore_units = 1;
// Ensures bookkeeping operations are serialized, meaning that while we execute // Ensures bookkeeping operations are serialized, meaning that while we execute
// a build step we don't consider newly added or removed views. This simplifies // a build step we don't consider newly added or removed views. This simplifies
// the algorithms. Also synchronizes an operation wrt. a call to stop(). // the algorithms. Also synchronizes an operation wrt. a call to stop().
// Semaphore usage invariants:
// - One unit of _sem serializes all per-shard bookkeeping that mutates view-builder state
// (_base_to_build_step, _built_views, build_status, reader resets).
// - The unit is held for the whole operation, including the async chain, until the state
// is stable for the next operation on that shard.
// - Cross-shard operations acquire _sem on shard 0 for the duration of the broadcast.
// Other shards acquire their own _sem only around their local handling; shard 0 skips
// the local acquire because it already holds the unit from the dispatcher.
// Guard the whole startup routine with a semaphore so that it's not intercepted by
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}}; seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::abort_source _as; seastar::abort_source _as;
future<> _step_fiber = make_ready_future<>(); future<> _started = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view. // Used to coordinate between shards the conclusion of the build process for a particular view.
std::unordered_set<table_id> _built_views; std::unordered_set<table_id> _built_views;
// Used for testing. // Used for testing.
@@ -278,18 +262,19 @@ private:
void setup_shard_build_step(view_builder_init_state& vbi, std::vector<system_keyspace_view_name>, std::vector<system_keyspace_view_build_progress>); void setup_shard_build_step(view_builder_init_state& vbi, std::vector<system_keyspace_view_name>, std::vector<system_keyspace_view_build_progress>);
future<> calculate_shard_build_step(view_builder_init_state& vbi); future<> calculate_shard_build_step(view_builder_init_state& vbi);
future<> add_new_view(view_ptr, build_step&); future<> add_new_view(view_ptr, build_step&);
future<> run_in_background(); future<> do_build_step();
void execute(build_step&, exponential_backoff_retry); void execute(build_step&, exponential_backoff_retry);
future<> maybe_mark_view_as_built(view_ptr, dht::token); future<> maybe_mark_view_as_built(view_ptr, dht::token);
future<> mark_as_built(view_ptr); future<> mark_as_built(view_ptr);
void setup_metrics(); void setup_metrics();
future<> dispatch_create_view(sstring ks_name, sstring view_name); future<> dispatch_create_view(sstring ks_name, sstring view_name);
future<> dispatch_drop_view(sstring ks_name, sstring view_name); future<> dispatch_drop_view(sstring ks_name, sstring view_name);
future<> handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name); future<> handle_seed_view_build_progress(sstring ks_name, sstring view_name);
future<> handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units); future<> handle_create_view_local(sstring ks_name, sstring view_name);
future<> handle_drop_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units); future<> handle_drop_view_local(sstring ks_name, sstring view_name);
future<> handle_drop_view_global_cleanup(const sstring& ks_name, const sstring& view_name); future<> handle_create_view_local_impl(sstring ks_name, sstring view_name);
future<view_builder_units> get_or_adopt_view_builder_lock(view_builder_units_opt units); future<> handle_drop_view_local_impl(sstring ks_name, sstring view_name);
future<> handle_drop_view_global_cleanup(sstring ks_name, sstring view_name);
template <typename Func1, typename Func2> template <typename Func1, typename Func2>
future<> write_view_build_status(Func1&& fn_group0, Func2&& fn_sys_dist) { future<> write_view_build_status(Func1&& fn_group0, Func2&& fn_sys_dist) {

View File

@@ -242,7 +242,7 @@ future<> view_building_worker::create_staging_sstable_tasks() {
utils::UUID_gen::get_time_UUID(), view_building_task::task_type::process_staging, false, utils::UUID_gen::get_time_UUID(), view_building_task::task_type::process_staging, false,
table_id, ::table_id{}, {my_host_id, sst_info.shard}, sst_info.last_token table_id, ::table_id{}, {my_host_id, sst_info.shard}, sst_info.last_token
}; };
auto mut = co_await _sys_ks.make_view_building_task_mutation(guard.write_timestamp(), task); auto mut = co_await _group0.client().sys_ks().make_view_building_task_mutation(guard.write_timestamp(), task);
cmuts.emplace_back(std::move(mut)); cmuts.emplace_back(std::move(mut));
} }
} }
@@ -386,6 +386,7 @@ future<> view_building_worker::update_built_views() {
auto schema = _db.find_schema(table_id); auto schema = _db.find_schema(table_id);
return std::make_pair(schema->ks_name(), schema->cf_name()); return std::make_pair(schema->ks_name(), schema->cf_name());
}; };
auto& sys_ks = _group0.client().sys_ks();
std::set<std::pair<sstring, sstring>> built_views; std::set<std::pair<sstring, sstring>> built_views;
for (auto& [id, statuses]: _vb_state_machine.views_state.status_map) { for (auto& [id, statuses]: _vb_state_machine.views_state.status_map) {
@@ -394,22 +395,22 @@ future<> view_building_worker::update_built_views() {
} }
} }
auto local_built = co_await _sys_ks.load_built_views() | std::views::filter([&] (auto& v) { auto local_built = co_await sys_ks.load_built_views() | std::views::filter([&] (auto& v) {
return !_db.has_keyspace(v.first) || _db.find_keyspace(v.first).uses_tablets(); return !_db.has_keyspace(v.first) || _db.find_keyspace(v.first).uses_tablets();
}) | std::ranges::to<std::set>(); }) | std::ranges::to<std::set>();
// Remove dead entries // Remove dead entries
for (auto& view: local_built) { for (auto& view: local_built) {
if (!built_views.contains(view)) { if (!built_views.contains(view)) {
co_await _sys_ks.remove_built_view(view.first, view.second); co_await sys_ks.remove_built_view(view.first, view.second);
} }
} }
// Add new entries // Add new entries
for (auto& view: built_views) { for (auto& view: built_views) {
if (!local_built.contains(view)) { if (!local_built.contains(view)) {
co_await _sys_ks.mark_view_as_built(view.first, view.second); co_await sys_ks.mark_view_as_built(view.first, view.second);
co_await _sys_ks.remove_view_build_progress_across_all_shards(view.first, view.second); co_await sys_ks.remove_view_build_progress_across_all_shards(view.first, view.second);
} }
} }
} }

View File

@@ -68,8 +68,6 @@ public:
.with_column("peer", inet_addr_type, column_kind::partition_key) .with_column("peer", inet_addr_type, column_kind::partition_key)
.with_column("dc", utf8_type) .with_column("dc", utf8_type)
.with_column("up", boolean_type) .with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("status", utf8_type) .with_column("status", utf8_type)
.with_column("load", utf8_type) .with_column("load", utf8_type)
.with_column("tokens", int32_type) .with_column("tokens", int32_type)
@@ -109,11 +107,8 @@ public:
if (tm.get_topology().has_node(hostid)) { if (tm.get_topology().has_node(hostid)) {
// Not all entries in gossiper are present in the topology // Not all entries in gossiper are present in the topology
auto& node = tm.get_topology().get_node(hostid); sstring dc = tm.get_topology().get_location(hostid).dc;
sstring dc = node.dc_rack().dc;
set_cell(cr, "dc", dc); set_cell(cr, "dc", dc);
set_cell(cr, "draining", node.is_draining());
set_cell(cr, "excluded", node.is_excluded());
} }
if (ownership.contains(eps.get_ip())) { if (ownership.contains(eps.get_ip())) {
@@ -1139,8 +1134,6 @@ public:
set_cell(r.cells(), "dc", node.dc()); set_cell(r.cells(), "dc", node.dc());
set_cell(r.cells(), "rack", node.rack()); set_cell(r.cells(), "rack", node.rack());
set_cell(r.cells(), "up", _gossiper.local().is_alive(host)); set_cell(r.cells(), "up", _gossiper.local().is_alive(host));
set_cell(r.cells(), "draining", node.is_draining());
set_cell(r.cells(), "excluded", node.is_excluded());
if (auto ip = _gossiper.local().get_address_map().find(host)) { if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip))); set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
} }
@@ -1151,9 +1144,6 @@ public:
if (stats && stats->capacity.contains(host)) { if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host); auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity))); set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
if (auto ts_iter = stats->tablet_stats.find(host); ts_iter != stats->tablet_stats.end()) {
set_cell(r.cells(), "effective_capacity", data_value(int64_t(ts_iter->second.effective_capacity)));
}
if (auto utilization = load.get_allocated_utilization(host)) { if (auto utilization = load.get_allocated_utilization(host)) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization))); set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
@@ -1178,12 +1168,9 @@ private:
.with_column("rack", utf8_type) .with_column("rack", utf8_type)
.with_column("ip", inet_addr_type) .with_column("ip", inet_addr_type)
.with_column("up", boolean_type) .with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("tablets_allocated", long_type) .with_column("tablets_allocated", long_type)
.with_column("tablets_allocated_per_shard", double_type) .with_column("tablets_allocated_per_shard", double_type)
.with_column("storage_capacity", long_type) .with_column("storage_capacity", long_type)
.with_column("effective_capacity", long_type)
.with_column("storage_allocated_load", long_type) .with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type) .with_column("storage_allocated_utilization", double_type)
.with_column("storage_load", long_type) .with_column("storage_load", long_type)
@@ -1497,7 +1484,7 @@ future<> initialize_virtual_tables(
co_await add_table(std::make_unique<cdc_streams_table>(db, ss)); co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local()))); db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); db.find_column_family(system_keyspace::v3::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db))); db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
} }

View File

@@ -11,6 +11,5 @@
namespace debug { namespace debug {
seastar::sharded<replica::database>* volatile the_database = nullptr; seastar::sharded<replica::database>* volatile the_database = nullptr;
seastar::scheduling_group streaming_scheduling_group;
} }

View File

@@ -17,7 +17,7 @@ class database;
namespace debug { namespace debug {
extern seastar::sharded<replica::database>* volatile the_database; extern seastar::sharded<replica::database>* volatile the_database;
extern seastar::scheduling_group streaming_scheduling_group;
} }

View File

@@ -114,10 +114,6 @@ WantedBy=local-fs.target scylla-server.service
'''[1:-1] '''[1:-1]
with open('/etc/systemd/system/var-lib-systemd-coredump.mount', 'w') as f: with open('/etc/systemd/system/var-lib-systemd-coredump.mount', 'w') as f:
f.write(dot_mount) f.write(dot_mount)
# in case we have old mounts in deleted state hanging around from older installation
# systemd doesn't seem to be able to deal with those properly, and assume they are still active
# and doesn't do anything about them
run('umount /var/lib/systemd/coredump', shell=True, check=False)
os.makedirs('/var/lib/scylla/coredump', exist_ok=True) os.makedirs('/var/lib/scylla/coredump', exist_ok=True)
systemd_unit.reload() systemd_unit.reload()
systemd_unit('var-lib-systemd-coredump.mount').enable() systemd_unit('var-lib-systemd-coredump.mount').enable()

View File

@@ -1,14 +1,6 @@
### a dictionary of redirections ### a dictionary of redirections
#old path: new path #old path: new path
# Remove an outdated KB
/stable/kb/perftune-modes-sync.html: /stable/kb/index.html
# Remove the troubleshooting page relevant for Open Source only
/stable/troubleshooting/missing-dotmount-files.html: /troubleshooting/index.html
# Move the diver information to another project # Move the diver information to another project
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html /stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html

View File

@@ -190,7 +190,7 @@ then every rack in every datacenter receives a replica, except for racks compris
of only :doc:`zero-token nodes </architecture/zero-token-nodes>`. Racks added after of only :doc:`zero-token nodes </architecture/zero-token-nodes>`. Racks added after
the keyspace creation do not receive replicas. the keyspace creation do not receive replicas.
When ``enforce_rack_list`` (or (deprecated) ``rf_rack_valid_keyspaces``) is enabled in the config and the keyspace is tablet-based, When ``rf_rack_valid_keyspaces``` is enabled in the config and the keyspace is tablet-based,
the numeric replication factor is automatically expanded into a rack list when the statement is the numeric replication factor is automatically expanded into a rack list when the statement is
executed, which can be observed in the DESCRIBE output afterwards. If the numeric RF is smaller than executed, which can be observed in the DESCRIBE output afterwards. If the numeric RF is smaller than
the number of racks in a DC, a subset of racks is chosen arbitrarily. the number of racks in a DC, a subset of racks is chosen arbitrarily.
@@ -1026,29 +1026,7 @@ You can enable the after-repair tombstone GC by setting the ``repair`` mode usin
ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'repair'} ; ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'repair'} ;
To support writes arriving out-of-order -- either due to natural delays, or user provided timestamps -- the repair mode has a propagation delay. The following modes are available:
Out-of-order writes present a problem for repair mode tombstone gc. Consider the following example sequence of events:
1) Write ``DELETE FROM table WHERE key = K1`` arrives at the node.
2) Repair is run.
3) Compaction runs and garbage collects the tombstone for ``key = K1``.
4) Write ``INSERT INTO table (key, ...) VALUES (K1, ...)`` arrives at the node with timestamp smaller than that of the delete. The tombstone for ``key = K1`` should apply to this write, but it is already garbage collected, so this data is resurrected.
Propagation delay solves this problem by establishing a window before repair, where tombstones are not yet garbage collectible: a tombstone is garbage collectible if it was written before the last repair by at least the propagation delay.
The value of the propagation delay can be set via the ``propagation_delay_in_seconds`` parameter:
.. code-block:: cql
CREATE TABLE ks.cf (key blob PRIMARY KEY, val blob) WITH tombstone_gc = {'mode':'repair', 'propagation_delay_in_seconds': 120};
.. code-block:: cql
ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'repair', 'propagation_delay_in_seconds': 120};
The default value of the propagation delay is 1 hour. This parameter should only be changed if your application uses user provided timestamps and writes and deletes can arrive out-of-order by more than the default 1 hour.
The following tombstone gc modes are available:
.. list-table:: .. list-table::
:widths: 20 80 :widths: 20 80

View File

@@ -241,8 +241,8 @@ Currently, the possible orderings are limited by the :ref:`clustering order <clu
.. _vector-queries: .. _vector-queries:
Vector queries :label-note:`ScyllaDB Cloud` Vector queries
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
The ``ORDER BY`` clause can also be used with vector columns to perform the approximate nearest neighbor (ANN) search. The ``ORDER BY`` clause can also be used with vector columns to perform the approximate nearest neighbor (ANN) search.
When using vector columns, the syntax is as follows: When using vector columns, the syntax is as follows:
@@ -280,25 +280,11 @@ For example::
FROM ImageEmbeddings FROM ImageEmbeddings
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5; ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
.. warning::
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key. Currently, vector queries do not support filtering with ``WHERE`` clause,
grouping with ``GROUP BY`` and paging. This will be added in the future releases.
For example::
SELECT image_id FROM ImageEmbeddings
WHERE user_id = 'user123'
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
Other filtering scenarios are currently not supported.
.. note::
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
Vector indexes do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information
about Vector Search is available in the
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
.. _limit-clause: .. _limit-clause:

View File

@@ -129,15 +129,17 @@ More on :doc:`Local Secondary Indexes </features/local-secondary-indexes>`
.. _create-vector-index-statement: .. _create-vector-index-statement:
Vector Index :label-note:`ScyllaDB Cloud` Vector Index :label-caution:`Experimental` :label-note:`ScyllaDB Cloud`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note:: .. note::
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled. Vector indexes are supported in ScyllaDB Cloud only in the clusters that have the vector search feature enabled.
Vector indexes do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information Moreover, vector indexes are an experimental feature that:
about Vector Search is available in the
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_. * is not suitable for production use,
* does not guarantee backward compatibility between ScyllaDB versions,
* does not support all the features of ScyllaDB (e.g., tracing, filtering, TTL).
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
similarity search on vector data. similarity search on vector data.
@@ -175,26 +177,6 @@ The following options are supported for vector indexes. All of them are optional
| | as ``efSearch``. Higher values lead to better recall (i.e., more relevant results are found) | | | | as ``efSearch``. Higher values lead to better recall (i.e., more relevant results are found) | |
| | but increase query latency. Supported values are integers between 1 and 4096. | | | | but increase query latency. Supported values are integers between 1 and 4096. | |
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+ +------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
| ``quantization`` | The quantization method to use for compressing vectors in Vector Index. Vectors in base table | ``f32`` |
| | are never compressed. Supported values (case-insensitive) are: | |
| | | |
| | * ``f32``: 32-bit single-precision IEEE 754 floating-point. | |
| | * ``f16``: 16-bit standard half-precision floating-point (IEEE 754). | |
| | * ``bf16``: 16-bit "Brain" floating-point (optimized for ML workloads). | |
| | * ``i8``: 8-bit signed integer. | |
| | * ``b1``: 1-bit binary value (packed 8 per byte). | |
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
| ``oversampling`` | A multiplier for the candidate set size during the search phase. For example, if a query asks for 10 | ``1.0`` |
| | similar vectors (``LIMIT 10``) and ``oversampling`` is 2.0, the search will initially retrieve 20 | |
| | candidates. This can improve accuracy at the cost of latency. Supported values are | |
| | floating-point numbers between 1.0 (no oversampling) and 100.0. | |
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
| ``rescoring`` | Flag enabling recalculation of similarity scores with full precision and re-ranking of the candidate set.| ``false`` |
| | Valid only for quantization below ``f32``. Supported values are: | |
| | | |
| | * ``true``: Enable rescoring. | |
| | * ``false``: Disable rescoring. | |
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
.. _drop-index-statement: .. _drop-index-statement:

View File

@@ -665,14 +665,6 @@ it is not possible to update only some elements of a vector (without updating th
Types stored in a vector are not implicitly frozen, so if you want to store a frozen collection or Types stored in a vector are not implicitly frozen, so if you want to store a frozen collection or
frozen UDT in a vector, you need to explicitly wrap them using `frozen` keyword. frozen UDT in a vector, you need to explicitly wrap them using `frozen` keyword.
.. note::
The main application of vectors is to support vector search capabilities, which
are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
Note that Vector Search clusters do not support all ScyllaDB features (e.g., tracing, TTL, paging, and grouping). More information
about Vector Search is available in the
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
.. .. _custom-types: .. .. _custom-types:
.. Custom Types .. Custom Types

View File

@@ -221,87 +221,6 @@ scylla-bucket/prefix/
``` ```
See the API [documentation](#copying-sstables-on-s3-backup) for more details about the actual backup request. See the API [documentation](#copying-sstables-on-s3-backup) for more details about the actual backup request.
### The snapshot manifest
Each table snapshot directory contains a manifest.json file that lists the contents of the snapshot and some metadata.
The json structure is as follows:
```
{
"manifest": {
"version": "1.0",
"scope": "node"
},
"node": {
"host_id": "<UUID>",
"datacenter": "mydc",
"rack": "myrack"
},
"snapshot": {
"name": "snapshot name",
"created_at": seconds_since_epoch,
"expires_at": seconds_since_epoch | null,
},
"table": {
"keyspace_name": "my_keyspace",
"table_name": "my_table",
"table_id": "<UUID>",
"tablets_type": "none|powof2",
"tablet_count": N
},
"sstables": [
{
"id": "67e35000-d8c6-11f0-9599-060de9f3bd1b",
"toc_name": "me-3gw7_0ndy_3wlq829wcsddgwha1n-big-TOC.txt",
"data_size": 75,
"index_size": 8,
"first_token": -8629266958227979430,
"last_token": 9168982884335614769,
},
{
"id": "67e35000-d8c6-11f0-85dc-0625e9f3bd1b",
"toc_name": "me-3gw7_0ndy_3wlq821a6cqlbmxrtn-big-TOC.txt",
"data_size": 73,
"index_size": 8,
"first_token": 221146791717891383,
"last_token": 7354559975791427036,
},
...
],
"files": [ ... ]
}
The `manifest` member contains the following attributes:
- `version` - respresenting the version of the manifest itself. It is incremented when members are added or removed from the manifest.
- `scope` - the scope of metadata stored in this manifest file. The following scopes are supported:
- `node` - the manifest describes all SSTables owned by this node in this snapshot.
The `node` member contains metadata about this node that enables datacenter- or rack-aware restore.
- `host_id` - is the node's unique host_id (a UUID).
- `datacenter` - is the node's datacenter.
- `rack` - is the node's rack.
The `snapshot` member contains metadata about the snapshot.
- `name` - is the snapshot name (a.k.a. `tag`)
- `created_at` - is the time when the snapshot was created.
- `expires_at` - is an optional time when the snapshot expires and can be dropped, if a TTL was set for the snapshot. If there is no TTL, `expires_at` may be omitted, set to null, or set to 0.
The `table` member contains metadata about the table being snapshot.
- `keyspace_name` and `table_name` - are self-explanatory.
- `table_id` - a universally unique identifier (UUID) of the table set when the table is created.
- `tablets_type`:
- `none` - if the keyspace uses vnodes replication
- `powof2` - if the keyspace uses tables replication, and the tablet token ranges are based on powers of 2.
- `tablet_count` - Optional. If `tablets_type` is not `none`, contains the number of tablets allcated in the table. If `tablets_type` is `powof2`, tablet_count would be a power of 2.
The `sstables` member is a list containing metadata about the SSTables in the snapshot.
- `id` - is the STable's unique id (a UUID). It is carried over with the SSTable when it's streamed as part of tablet migration, even if it gets a new generation.
- `toc_name` - is the name of the SSTable Table Of Contents (TOC) component.
- `data_size` and `index_size` - are the sizes of the SSTable's data and index components, respectively. They can be used to estimate how much disk space is needed for restore.
- `first_token` and `last_token` - are the first and last tokens in the SSTable, respectively. They can be used to determine if a SSTable is fully contained in a (tablet) token range to enable efficient file-based streaming of the SSTable.
The optional `files` member may contain a list of non-SSTable files included in the snapshot directory, not including the manifest.json file and schema.cql.
```
3. `CREATE KEYSPACE` with S3/GS storage 3. `CREATE KEYSPACE` with S3/GS storage
When creating a keyspace with S3/GS storage, the data is stored under the bucket passed as argument to the `CREATE KEYSPACE` statement. When creating a keyspace with S3/GS storage, the data is stored under the bucket passed as argument to the `CREATE KEYSPACE` statement.

View File

@@ -78,7 +78,6 @@ Permits are in one of the following states:
* `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability; * `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability;
* `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed; * `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed;
* `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume; * `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume;
* `preemptive_aborted` - the permit timed out or was rejected during admission as it was detected the read might time out later during execution;
Note that some older releases will have different names for some of these states or lack some of the states altogether: Note that some older releases will have different names for some of these states or lack some of the states altogether:

View File

@@ -6,10 +6,10 @@ same amount of disk space. This means that the number of tablets located on a no
proportional to the gross disk capacity of that node. Because the used disk space of proportional to the gross disk capacity of that node. Because the used disk space of
different tablets can vary greatly, this could create imbalance in disk utilization. different tablets can vary greatly, this could create imbalance in disk utilization.
Size based load balancing aims to achieve better disk utilization across nodes in a Size based load balancing aims to achieve better disk utilization accross nodes in a
cluster. The load balancer will continuously gather information about available disk cluster. The load balancer will continuously gather information about available disk
space and tablet sizes from all the nodes. It then incrementally computes tablet space and tablet sizes from all the nodes. It then incrementally computes tablet
migration plans which equalize disk utilization across the cluster. migration plans which equalize disk utilization accross the cluster.
# Basic operation # Basic operation
@@ -75,7 +75,7 @@ migrations), and will wait for correct tablet sizes to arrive after the next ``l
refresh by the topology coordinator. refresh by the topology coordinator.
One exception to this are nodes which have been excluded from the cluster. These nodes One exception to this are nodes which have been excluded from the cluster. These nodes
are down and therefore are not able to send fresh ``load_stats``. But they have to be drained are down and therefor are not able to send fresh ``load_stats``. But they have to be drained
of their tablets (via tablet rebuild), and the balancer must do this even with incomplete of their tablets (via tablet rebuild), and the balancer must do this even with incomplete
tablet data. So, only excluded nodes are allowed to have missing tablet sizes. tablet data. So, only excluded nodes are allowed to have missing tablet sizes.

View File

@@ -357,7 +357,6 @@ Schema:
CREATE TABLE system.load_per_node ( CREATE TABLE system.load_per_node (
node uuid PRIMARY KEY, node uuid PRIMARY KEY,
dc text, dc text,
effective_capacity bigint,
rack text, rack text,
storage_allocated_load bigint, storage_allocated_load bigint,
storage_allocated_utilization double, storage_allocated_utilization double,
@@ -373,7 +372,6 @@ Columns:
* `storage_allocated_load` - Disk space allocated for tablets, assuming each tablet has a fixed size (target_tablet_size). * `storage_allocated_load` - Disk space allocated for tablets, assuming each tablet has a fixed size (target_tablet_size).
* `storage_allocated_utilization` - Fraction of node's disk capacity taken for `storage_allocated_load`, where 1.0 means full utilization. * `storage_allocated_utilization` - Fraction of node's disk capacity taken for `storage_allocated_load`, where 1.0 means full utilization.
* `storage_capacity` - Total disk capacity in bytes. Used to compute `storage_allocated_utilization`. By default equal to file system's capacity. * `storage_capacity` - Total disk capacity in bytes. Used to compute `storage_allocated_utilization`. By default equal to file system's capacity.
* `effective_capacity` - Sum of available disk space and tablet sizes on a node. Used to compute load on a node for size based balancing.
* `storage_load` - Disk space allocated for tablets, computed with actual tablet sizes. Can be null if some of the tablet sizes are not known. * `storage_load` - Disk space allocated for tablets, computed with actual tablet sizes. Can be null if some of the tablet sizes are not known.
* `storage_utilization` - Fraction of node's disk capacity taken for `storage_load` (with actual tablet sizes), where 1.0 means full utilization. Can be null if some of the tablet sizes are not known. * `storage_utilization` - Fraction of node's disk capacity taken for `storage_load` (with actual tablet sizes), where 1.0 means full utilization. Can be null if some of the tablet sizes are not known.
* `tablets_allocated` - Number of tablet replicas on the node. Migrating tablets are accounted as if migration already finished. * `tablets_allocated` - Number of tablet replicas on the node. Migrating tablets are accounted as if migration already finished.

View File

@@ -124,7 +124,6 @@ There are several test directories that are excluded from orchestration by `test
- test/cql - test/cql
- test/cqlpy - test/cqlpy
- test/rest_api - test/rest_api
- test/scylla_gdb
This means that `test.py` will not run tests directly, but will delegate all work to `pytest`. This means that `test.py` will not run tests directly, but will delegate all work to `pytest`.
That's why all these directories do not have `suite.yaml` files. That's why all these directories do not have `suite.yaml` files.

View File

@@ -46,11 +46,14 @@ stateDiagram-v2
state replacing { state replacing {
rp_join_group0 : join_group0 rp_join_group0 : join_group0
rp_left_token_ring : left_token_ring rp_left_token_ring : left_token_ring
rp_tablet_draining : tablet_draining
rp_write_both_read_old : write_both_read_old rp_write_both_read_old : write_both_read_old
rp_write_both_read_new : write_both_read_new rp_write_both_read_new : write_both_read_new
[*] --> rp_join_group0 [*] --> rp_join_group0
rp_join_group0 --> rp_left_token_ring: rollback rp_join_group0 --> rp_left_token_ring: rollback
rp_join_group0 --> rp_write_both_read_old rp_join_group0 --> rp_tablet_draining
rp_tablet_draining --> rp_write_both_read_old
rp_tablet_draining --> rp_left_token_ring: rollback
rp_join_group0 --> [*]: rejected rp_join_group0 --> [*]: rejected
rp_write_both_read_old --> rp_write_both_read_new: streaming completed rp_write_both_read_old --> rp_write_both_read_new: streaming completed
rp_write_both_read_old --> rp_left_token_ring: rollback rp_write_both_read_old --> rp_left_token_ring: rollback
@@ -66,34 +69,34 @@ stateDiagram-v2
normal --> decommissioning: leave normal --> decommissioning: leave
normal --> removing: remove normal --> removing: remove
state decommissioning { state decommissioning {
de_tablet_migration0 : tablet_migration (draining)
de_tablet_migration1 : tablet_migration
[*] --> de_tablet_migration0
de_tablet_migration0 --> [*]: aborted
de_left_token_ring : left_token_ring de_left_token_ring : left_token_ring
de_tablet_draining : tablet_draining
de_tablet_migration : tablet_migration
de_write_both_read_old: write_both_read_old de_write_both_read_old: write_both_read_old
de_write_both_read_new : write_both_read_new de_write_both_read_new : write_both_read_new
de_rollback_to_normal : rollback_to_normal de_rollback_to_normal : rollback_to_normal
de_rollback_to_normal --> de_tablet_migration1 [*] --> de_tablet_draining
de_tablet_migration0 --> de_write_both_read_old de_tablet_draining --> de_rollback_to_normal: rollback
de_tablet_migration1 --> [*] de_rollback_to_normal --> de_tablet_migration
de_tablet_draining --> de_write_both_read_old
de_tablet_migration --> [*]
de_write_both_read_old --> de_write_both_read_new: streaming completed de_write_both_read_old --> de_write_both_read_new: streaming completed
de_write_both_read_old --> de_rollback_to_normal: rollback de_write_both_read_old --> de_rollback_to_normal: rollback
de_write_both_read_new --> de_left_token_ring de_write_both_read_new --> de_left_token_ring
de_left_token_ring --> [*] de_left_token_ring --> [*]
} }
state removing { state removing {
re_tablet_migration0 : tablet_migration (draining)
re_tablet_migration1 : tablet_migration
[*] --> re_tablet_migration0
re_tablet_migration0 --> [*]: aborted
re_left_token_ring : left_token_ring re_left_token_ring : left_token_ring
re_tablet_draining : tablet_draining
re_tablet_migration : tablet_migration
re_write_both_read_old : write_both_read_old re_write_both_read_old : write_both_read_old
re_write_both_read_new : write_both_read_new re_write_both_read_new : write_both_read_new
re_rollback_to_normal : rollback_to_normal re_rollback_to_normal : rollback_to_normal
re_rollback_to_normal --> re_tablet_migration1 [*] --> re_tablet_draining
re_tablet_migration1 --> [*] re_tablet_draining --> re_rollback_to_normal: rollback
re_tablet_migration0 --> re_write_both_read_old re_rollback_to_normal --> re_tablet_migration
re_tablet_migration --> [*]
re_tablet_draining --> re_write_both_read_old
re_write_both_read_old --> re_write_both_read_new: streaming completed re_write_both_read_old --> re_write_both_read_new: streaming completed
re_write_both_read_old --> re_rollback_to_normal: rollback re_write_both_read_old --> re_rollback_to_normal: rollback
re_write_both_read_new --> re_left_token_ring re_write_both_read_new --> re_left_token_ring
@@ -178,27 +181,6 @@ are the currently supported global topology operations:
contain replicas of the table being truncated. It uses [sessions](#Topology guards) contain replicas of the table being truncated. It uses [sessions](#Topology guards)
to make sure that no stale RPCs are executed outside of the scope of the request. to make sure that no stale RPCs are executed outside of the scope of the request.
## Tablet draining
Presence of node requests of type `leave` (decommission) and `remove` will cause the load
balancer to start migrating tablets away from those nodes. Until they are drained
of tablets, the requests are in paused state and are not picked by topology
coordinator for execution.
Paused requests are also pending, the "topology_request" field is engaged.
The node's state is still `normal` when request is paused.
Canceling the requests will stop the draining process, because absence of a request
lifts the draining state on the node.
When tablet scheduler is done with migrating all tablets away from a draining node,
the associated request will be unpaused, and can be picked by topology coordinator
for execution. This time it will enter the `write_both_read_old` transition and proceed
with the vnode part.
This allows multiple requests to drain tablets in parallel, and other requests to not be blocked
by long `leave` and `remove` requests.
## Zero-token nodes ## Zero-token nodes
Zero-token nodes (the nodes started with `join_ring=false`) never own tokens or become Zero-token nodes (the nodes started with `join_ring=false`) never own tokens or become
@@ -239,6 +221,12 @@ that there are no tablet transitions in the system.
Tablets are migrated in parallel and independently. Tablets are migrated in parallel and independently.
There is a variant of tablet migration track called tablet draining track, which is invoked
as a step of certain topology operations (e.g. decommission, removenode). Its goal is to readjust tablet replicas
so that a given topology change can proceed. For example, when decommissioning a node, we
need to migrate tablet replicas away from the node being decommissioned.
Tablet draining happens before making changes to vnode-based replication.
## Node replace with tablets ## Node replace with tablets
Tablet replicas on the replaced node are rebuilt after the replacing node is already in the normal state and Tablet replicas on the replaced node are rebuilt after the replacing node is already in the normal state and

View File

@@ -1,23 +0,0 @@
.. _automatic-repair:
Automatic Repair
================
Traditionally, launching `repairs </operating-scylla/procedures/maintenance/repair>`_ in a ScyllaDB cluster is left to an external process, typically done via `Scylla Manager <https://manager.docs.scylladb.com/stable/repair/index.html>`_.
Automatic repair offers built-in scheduling in ScyllaDB itself. If the time since the last repair is greater than the configured repair interval, ScyllaDB will start a repair for the tablet `tablet </architecture/tablets>`_ automatically.
Repairs are spread over time and among nodes and shards, to avoid load spikes or any adverse effects on user workloads.
To enable automatic repair, add this to the configuration (``scylla.yaml``):
.. code-block:: yaml
auto_repair_enabled_default: true
auto_repair_threshold_default_in_seconds: 86400
This will enable automatic repair for all tables with a repair period of 1 day. This configuration has to be set on each node, to an identical value.
More featureful configuration methods will be implemented in the future.
To disable, set ``auto_repair_enabled_default: false``.
Automatic repair relies on `Incremental Repair </features/incremental-repair>`_ and as such it only works with `tablet </architecture/tablets>`_ tables.

View File

@@ -3,7 +3,7 @@
Incremental Repair Incremental Repair
================== ==================
ScyllaDB's standard `repair </operating-scylla/procedures/maintenance/repair>`_ process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency. ScyllaDB's standard repair process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
The core idea of incremental repair is to repair only the data that has been written or changed since the last repair was run. It intelligently skips data that has already been verified, dramatically reducing the time, I/O, and CPU resources required for the repair operation. The core idea of incremental repair is to repair only the data that has been written or changed since the last repair was run. It intelligently skips data that has already been verified, dramatically reducing the time, I/O, and CPU resources required for the repair operation.
@@ -28,7 +28,8 @@ Incremental Repair is only supported for tables that use the tablets architectur
Incremental Repair Modes Incremental Repair Modes
------------------------ ------------------------
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation. Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
The available modes are: The available modes are:
@@ -37,12 +38,7 @@ The available modes are:
* ``disabled``: Completely disables the incremental repair logic for the current operation. The repair behaves like a classic, non-incremental repair, and it does not read or update any incremental repair status markers. * ``disabled``: Completely disables the incremental repair logic for the current operation. The repair behaves like a classic, non-incremental repair, and it does not read or update any incremental repair status markers.
The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental. The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental. It can also be specified with the REST API, e.g., curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
It can also be specified with the REST API, e.g.:
.. code::
curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
Benefits of Incremental Repair Benefits of Incremental Repair
------------------------------ ------------------------------
@@ -51,8 +47,6 @@ Benefits of Incremental Repair
* **Reduced Resource Usage:** Consumes significantly less CPU, I/O, and network bandwidth compared to a full repair. * **Reduced Resource Usage:** Consumes significantly less CPU, I/O, and network bandwidth compared to a full repair.
* **More Frequent Repairs:** The efficiency of incremental repair allows you to run it more frequently, ensuring a higher level of data consistency across your cluster at all times. * **More Frequent Repairs:** The efficiency of incremental repair allows you to run it more frequently, ensuring a higher level of data consistency across your cluster at all times.
Tables using Incremental Repair can schedule repairs in ScyllaDB itself, with `Automatic Repair </features/automatic-repair>`_.
Notes Notes
----- -----

View File

@@ -17,7 +17,6 @@ This document highlights ScyllaDB's key data modeling features.
Workload Prioritization </features/workload-prioritization> Workload Prioritization </features/workload-prioritization>
Backup and Restore </features/backup-and-restore> Backup and Restore </features/backup-and-restore>
Incremental Repair </features/incremental-repair/> Incremental Repair </features/incremental-repair/>
Automatic Repair </features/automatic-repair/>
Vector Search </features/vector-search/> Vector Search </features/vector-search/>
.. panel-box:: .. panel-box::
@@ -45,7 +44,5 @@ This document highlights ScyllaDB's key data modeling features.
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more * :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
efficient and lightweight approach to maintaining data consistency by efficient and lightweight approach to maintaining data consistency by
repairing only the data that has changed since the last repair. repairing only the data that has changed since the last repair.
* :doc:`Automatic Repair </features/automatic-repair/>` schedules and runs repairs
directly in ScyllaDB, without external schedulers.
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables * :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
similarity-based queries on vector embeddings. similarity-based queries on vector embeddings.

View File

@@ -86,7 +86,7 @@ Compaction Strategies with Materialized Views
Materialized views, just like regular tables, use one of the available :doc:`compaction strategies </architecture/compaction/compaction-strategies>`. Materialized views, just like regular tables, use one of the available :doc:`compaction strategies </architecture/compaction/compaction-strategies>`.
When a materialized view is created, it does not inherit its base table compaction strategy settings, because the data model When a materialized view is created, it does not inherit its base table compaction strategy settings, because the data model
of a view does not necessarily have the same characteristics as the one from its base table. of a view does not necessarily have the same characteristics as the one from its base table.
Instead, the default compaction strategy (IncrementalCompactionStrategy) is used. Instead, the default compaction strategy (SizeTieredCompactionStrategy) is used.
A compaction strategy for a new materialized view can be explicitly set during its creation, using the following command: A compaction strategy for a new materialized view can be explicitly set during its creation, using the following command:

View File

@@ -24,9 +24,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:id: "getting-started" :id: "getting-started"
:class: my-panel :class: my-panel
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>` * :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>` * :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>` * :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
.. panel-box:: .. panel-box::
@@ -35,7 +35,7 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:class: my-panel :class: my-panel
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>` * :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>` * :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>` * :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>` * :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>` * :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`

View File

@@ -4,9 +4,9 @@
.. |RHEL_EPEL_8| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm .. |RHEL_EPEL_8| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
.. |RHEL_EPEL_9| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm .. |RHEL_EPEL_9| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
======================================================== ======================================
Install ScyllaDB |CURRENT_VERSION| Linux Packages Install ScyllaDB Linux Packages
======================================================== ======================================
We recommend installing ScyllaDB using :doc:`ScyllaDB Web Installer for Linux </getting-started/installation-common/scylla-web-installer/>`, We recommend installing ScyllaDB using :doc:`ScyllaDB Web Installer for Linux </getting-started/installation-common/scylla-web-installer/>`,
a platform-agnostic installation script, to install ScyllaDB on any supported Linux platform. a platform-agnostic installation script, to install ScyllaDB on any supported Linux platform.
@@ -46,8 +46,8 @@ Install ScyllaDB
.. code-block:: console .. code-block:: console
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys c503c686b007f39e sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys a43e06657bac99e3
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor c503c686b007f39e | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor a43e06657bac99e3 | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg
.. code-block:: console .. code-block:: console
:substitutions: :substitutions:

View File

@@ -1,6 +1,6 @@
=============================================== ==========================
Launch ScyllaDB |CURRENT_VERSION| on AWS Launch ScyllaDB on AWS
=============================================== ==========================
This article will guide you through self-managed ScyllaDB deployment on AWS. For a fully-managed deployment of ScyllaDB This article will guide you through self-managed ScyllaDB deployment on AWS. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_. as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -1,6 +1,6 @@
=============================================== ==========================
Launch ScyllaDB |CURRENT_VERSION| on Azure Launch ScyllaDB on Azure
=============================================== ==========================
This article will guide you through self-managed ScyllaDB deployment on Azure. For a fully-managed deployment of ScyllaDB This article will guide you through self-managed ScyllaDB deployment on Azure. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_. as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -1,6 +1,6 @@
============================================= ==========================
Launch ScyllaDB |CURRENT_VERSION| on GCP Launch ScyllaDB on GCP
============================================= ==========================
This article will guide you through self-managed ScyllaDB deployment on GCP. For a fully-managed deployment of ScyllaDB This article will guide you through self-managed ScyllaDB deployment on GCP. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_. as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -57,6 +57,7 @@ Knowledge Base
* :doc:`Customizing CPUSET </kb/customizing-cpuset>` * :doc:`Customizing CPUSET </kb/customizing-cpuset>`
* :doc:`Recreate RAID devices </kb/raid-device>` - How to recreate your RAID devices without running scylla-setup * :doc:`Recreate RAID devices </kb/raid-device>` - How to recreate your RAID devices without running scylla-setup
* :doc:`Configure ScyllaDB Networking with Multiple NIC/IP Combinations </kb/yaml-address>` - examples for setting the different IP addresses in scylla.yaml * :doc:`Configure ScyllaDB Networking with Multiple NIC/IP Combinations </kb/yaml-address>` - examples for setting the different IP addresses in scylla.yaml
* :doc:`Updating the Mode in perftune.yaml After a ScyllaDB Upgrade </kb/perftune-modes-sync>`
* :doc:`Kafka Sink Connector Quickstart </using-scylla/integrations/kafka-connector>` * :doc:`Kafka Sink Connector Quickstart </using-scylla/integrations/kafka-connector>`
* :doc:`Kafka Sink Connector Configuration </using-scylla/integrations/sink-config>` * :doc:`Kafka Sink Connector Configuration </using-scylla/integrations/sink-config>`

View File

@@ -0,0 +1,48 @@
==============================================================
Updating the Mode in perftune.yaml After a ScyllaDB Upgrade
==============================================================
We improved ScyllaDB's performance by `removing the rx_queues_count from the mode
condition <https://github.com/scylladb/seastar/pull/949>`_. As a result, ScyllaDB operates in
the ``sq_split`` mode instead of the ``mq`` mode (see :doc:`Seastar Perftune </operating-scylla/admin-tools/perftune>` for information about the modes).
If you upgrade from an earlier version of ScyllaDB, your cluster's existing nodes may use the ``mq`` mode,
while new nodes will use the ``sq_split`` mode. As using different modes across one cluster is not recommended,
you should change the configuration to ensure that the ``sq_split`` mode is used on all nodes.
This section describes how to update the `perftune.yaml` file to configure the ``sq_split`` mode on all nodes.
Procedure
------------
The examples below assume that you are using the default locations for storing data and the `scylla.yaml` file,
and that your NIC is ``eth5``.
#. Backup your old configuration.
.. code-block:: console
sudo mv /etc/scylla.d/cpuset.conf /etc/scylla.d/cpuset.conf.old
sudo mv /etc/scylla.d/perftune.yaml /etc/scylla.d/perftune.yaml.old
#. Create a new configuration.
.. code-block:: console
sudo scylla_sysconfig_setup --nic eth5 --homedir /var/lib/scylla --confdir /etc/scylla
A new ``/etc/scylla.d/cpuset.conf`` will be generated on the output.
#. Compare the contents of the newly generated ``/etc/scylla.d/cpuset.conf`` with ``/etc/scylla.d/cpuset.conf.old`` you created in step 1.
- If they are exactly the same, rename ``/etc/scylla.d/perftune.yaml.old`` you created in step 1 back to ``/etc/scylla.d/perftune.yaml`` and continue to the next node.
- If they are different, move on to the next steps.
#. Restart the ``scylla-server`` service.
.. code-block:: console
nodetool drain
sudo systemctl restart scylla-server
#. Wait for the service to become up and running (similarly to how it is done during a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart>`). It may take a considerable amount of time before the node is in the UN state due to resharding.
#. Continue to the next node.

View File

@@ -25,8 +25,4 @@ Port Description Protocol
19142 Native shard-aware transport port (ssl) TCP 19142 Native shard-aware transport port (ssl) TCP
====== ============================================ ======== ====== ============================================ ========
If you're using ScyllaDB Alternator, ensure that the ports configured
for Alternator with the ``alternator_port`` or ``alternator_https_port`` parameter
are open. See :doc:`ScyllaDB Alternator </alternator/alternator>` for details.
.. note:: For ScyllaDB Manager ports, see the `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_ documentation. .. note:: For ScyllaDB Manager ports, see the `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_ documentation.

View File

@@ -601,7 +601,11 @@ Scrub has several modes:
* **segregate** - Fixes partition/row/mutation-fragment out-of-order errors by segregating the output into as many SStables as required so that the content of each output SStable is properly ordered. * **segregate** - Fixes partition/row/mutation-fragment out-of-order errors by segregating the output into as many SStables as required so that the content of each output SStable is properly ordered.
* **validate** - Validates the content of the SStable, reporting any corruptions found. Writes no output SStables. In this mode, scrub has the same outcome as the `validate operation <scylla-sstable-validate-operation_>`_ - and the validate operation is recommended over scrub. * **validate** - Validates the content of the SStable, reporting any corruptions found. Writes no output SStables. In this mode, scrub has the same outcome as the `validate operation <scylla-sstable-validate-operation_>`_ - and the validate operation is recommended over scrub.
Output SStables are written to the directory specified via ``--output-dir``. They will be written with the ``BIG`` format and the highest supported SStable format, with random generation. Output SStables are written to the directory specified via ``--output-directory``. They will be written with the ``BIG`` format and the highest supported SStable format, with generations chosen by scylla-sstable. Generations are chosen such
that they are unique among the SStables written by the current scrub.
The output directory must be empty; otherwise, scylla-sstable will abort scrub. You can allow writing to a non-empty directory by setting the ``--unsafe-accept-nonempty-output-dir`` command line flag.
Note that scrub will be aborted if an SStable cannot be written because its generation clashes with a pre-existing SStable in the output directory.
validate-checksums validate-checksums
^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
@@ -866,7 +870,7 @@ The SSTable version to be used can be overridden with the ``--version`` flag, al
SSTables which are already on the designated version are skipped. To force rewriting *all* SSTables, use the ``--all`` flag. SSTables which are already on the designated version are skipped. To force rewriting *all* SSTables, use the ``--all`` flag.
Output SSTables are written to the path provided by the ``--output-dir`` flag, or to the current directory if not specified. Output SSTables are written to the path provided by the ``--output-dir`` flag, or to the current directory if not specified.
This directory is expected to exist. This directory is expected to exist and be empty. If not empty the tool will refuse to run. This can be overridden with the ``--unsafe-accept-nonempty-output-dir`` flag.
It is strongly recommended to use the system schema tables as the schema source for this command, see the :ref:`schema options <scylla-sstable-schema>` for more details. It is strongly recommended to use the system schema tables as the schema source for this command, see the :ref:`schema options <scylla-sstable-schema>` for more details.
A schema which is good enough to read the SSTable and dump its content, may not be good enough to write its content back verbatim. A schema which is good enough to read the SSTable and dump its content, may not be good enough to write its content back verbatim.
@@ -878,25 +882,6 @@ But even an altered schema which changed only the table options can lead to data
The mapping of input SSTables to output SSTables is printed to ``stdout``. The mapping of input SSTables to output SSTables is printed to ``stdout``.
filter
^^^^^^
Filter the SSTable(s), including/excluding specified partitions.
Similar to ``scylla sstable dump-data --partition|--partition-file``, with some notable differences:
* Instead of dumping the content to stdout, the filtered content is written back to SSTable(s) on disk.
* Also supports negative filters (keep all partitions except the those specified).
The partition list can be provided either via the ``--partition`` command line argument, or via a file path passed to the the ``--partitions-file`` argument. The file should contain one partition key per line.
Partition keys should be provided in the hex format, as produced by `scylla types serialize </operating-scylla/admin-tools/scylla-types/>`_.
With ``--include``, only the specified partitions are kept from the input SSTable(s). With ``--exclude``, the specified partitions are discarded and won't be written to the output SSTable(s).
It is possible that certain input SSTable(s) won't have any content left after the filtering. These input SSTable(s) will not have a matching output SSTable.
By default, each input sstable is filtered individually. Use ``--merge`` to filter the combined content of all input sstables, producing a single output SSTable.
Output sstables use the latest supported sstable format (can be changed with ``--sstable-version``).
Examples Examples
-------- --------

View File

@@ -93,25 +93,6 @@ API calls
Cluster tasks are not unregistered from task manager with API calls. Cluster tasks are not unregistered from task manager with API calls.
Node operations module
----------------------
There is a module named ``node_ops``, which allows tracking node operations: decommission, removenode, bootstrap, replace, rebuild.
The ``type`` field designates the operation, and is one of:
- ``decommission``
- ``remove node``
- ``bootstrap``
- ``replace``
- ``rebuild``
The ``scope`` and ``kind`` fields are set to ``cluster``.
The ``entity`` field holds the host id of the node which is being operated on, as long as the request
is not finished. In case of the ``replace`` operation, it will hold the host id of the replacing node.
``decommission`` and ``remove node`` tasks are abortable, but only before they finish tablet migration.
Tasks API Tasks API
--------- ---------

View File

@@ -55,7 +55,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988 nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental. - ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
For example: For example:

View File

@@ -29,13 +29,5 @@ Before you run ``nodetool decommission``:
request may fail. request may fail.
In such a case, ALTER the keyspace to reduce the RF before running ``nodetool decommission``. In such a case, ALTER the keyspace to reduce the RF before running ``nodetool decommission``.
It's allowed to invoke ``nodetool decommission`` on multiple nodes in parallel. This will be faster than doing
it sequentially if there is significant amount of data in tablet-based keyspaces, because
tablets are migrated from nodes in parallel. Decommission process first migrates tablets away, and this
part is done in parallel for all nodes being decommissioned. Then it does the vnode-based decommission, and
this part is serialized with other vnode-based operations, including those from other decommission operations.
Decommission which is still in the tablet draining phase can be canceled using Task Manager API.
See :doc:`Task manager </operating-scylla/admin-tools/task-manager>`.
.. include:: nodetool-index.rst .. include:: nodetool-index.rst

View File

@@ -56,17 +56,6 @@ To only mark the node as permanently down without doing actual removal, use :doc
.. _removenode-ignore-dead-nodes: .. _removenode-ignore-dead-nodes:
It's allowed to invoke ``nodetool removenode`` on multiple nodes in parallel. This will be faster than doing
it sequentially if there is significant amount of data in tablet-based keyspaces, because
tablets which belong to removed nodes will be rebuilt in parallel. Node removal first migrates tablets to new
replicas, in parallel for all nodes being removed. Then does the part which executes
removal for the vnode-based keyspaces, and this is serialized with other vnode-based operations, including
those from other removenode operations.
Removenode which is still in the tablet rebuild phase can be canceled using Task Manager API.
Tablets which are already rebuilt will remain on their new replicas.
See :doc:`Task manager </operating-scylla/admin-tools/task-manager>`.
Ignoring Dead Nodes Ignoring Dead Nodes
--------------------- ---------------------

Some files were not shown because too many files have changed in this diff Show More