Compare commits
42 Commits
debug_form
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83babc20e3 | ||
|
|
04b9e98ef8 | ||
|
|
de8c2a8196 | ||
|
|
dd2e8a2105 | ||
|
|
90fd618967 | ||
|
|
c191c31682 | ||
|
|
a4fd7019e3 | ||
|
|
e18072d4b8 | ||
|
|
7353aa5aa5 | ||
|
|
ec0b31b193 | ||
|
|
b5c3e2465f | ||
|
|
3cae4a21ab | ||
|
|
5c6335e029 | ||
|
|
de4975d181 | ||
|
|
1f73e18eaf | ||
|
|
931f9ca3db | ||
|
|
3775e8e49a | ||
|
|
f4d9513e0f | ||
|
|
5eeb1e3e76 | ||
|
|
989aa0b237 | ||
|
|
eba0a2cf72 | ||
|
|
3a9eb9b65f | ||
|
|
f75541b7b3 | ||
|
|
879db5855d | ||
|
|
22d3ee5670 | ||
|
|
2bdf792f8e | ||
|
|
2e2d1f17bb | ||
|
|
e9aba62cc5 | ||
|
|
a7d0cf6dd0 | ||
|
|
6e94c075e3 | ||
|
|
f90ca413a0 | ||
|
|
5e0f5f4b44 | ||
|
|
5d32fef3ae | ||
|
|
1b5c46a796 | ||
|
|
f2c5874fa9 | ||
|
|
664cdd3d99 | ||
|
|
4ea6c51fb1 | ||
|
|
eb9babfd4a | ||
|
|
558f460517 | ||
|
|
a9f4024c1b | ||
|
|
6969918d31 | ||
|
|
d69edfcd34 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.4.0-dev
|
||||
VERSION=2025.4.0-rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -3636,16 +3636,16 @@ future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schem
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
||||
uint64_t& rcu_half_units) {
|
||||
noncopyable_function<void(uint64_t)> item_callback) {
|
||||
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
||||
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
||||
auto result_set = builder.build();
|
||||
std::vector<rjson::value> ret;
|
||||
for (auto& result_row : result_set->rows()) {
|
||||
rjson::value item = rjson::empty_object();
|
||||
rcu_consumed_capacity_counter consumed_capacity;
|
||||
describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes);
|
||||
rcu_half_units += consumed_capacity.get_half_units();
|
||||
uint64_t item_length_in_bytes = 0;
|
||||
describe_single_item(*selection, result_row, *attrs_to_get, item, &item_length_in_bytes);
|
||||
item_callback(item_length_in_bytes);
|
||||
ret.push_back(std::move(item));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
@@ -4584,7 +4584,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
std::vector<std::vector<uint64_t>> responses_sizes;
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
@@ -4612,11 +4611,10 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
responses_sizes.resize(requests.size());
|
||||
size_t responses_sizes_pos = 0;
|
||||
for (const auto& rs : requests) {
|
||||
responses_sizes[responses_sizes_pos].resize(rs.requests.size());
|
||||
size_t pos = 0;
|
||||
std::vector<uint64_t> consumed_rcu_half_units_per_table(requests.size());
|
||||
for (size_t i = 0; i < requests.size(); i++) {
|
||||
const table_requests& rs = requests[i];
|
||||
bool is_quorum = rs.cl == db::consistency_level::LOCAL_QUORUM;
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_histogram.add(rs.requests.size());
|
||||
for (const auto &r : rs.requests) {
|
||||
@@ -4639,16 +4637,17 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
command->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
const auto item_callback = [is_quorum, &rcus_per_table = consumed_rcu_half_units_per_table[i]](uint64_t size) {
|
||||
rcus_per_table += rcu_consumed_capacity_counter::get_half_units(size, is_quorum);
|
||||
};
|
||||
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
|
||||
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
|
||||
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, item_callback = std::move(item_callback)] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
|
||||
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size);
|
||||
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), std::move(item_callback));
|
||||
});
|
||||
pos++;
|
||||
response_futures.push_back(std::move(f));
|
||||
}
|
||||
responses_sizes_pos++;
|
||||
}
|
||||
|
||||
// Wait for all requests to complete, and then return the response.
|
||||
@@ -4660,14 +4659,11 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Responses", rjson::empty_object());
|
||||
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
|
||||
size_t rcu_half_units;
|
||||
auto fut_it = response_futures.begin();
|
||||
responses_sizes_pos = 0;
|
||||
rjson::value consumed_capacity = rjson::empty_array();
|
||||
for (const auto& rs : requests) {
|
||||
for (size_t i = 0; i < requests.size(); i++) {
|
||||
const table_requests& rs = requests[i];
|
||||
std::string table = table_name(*rs.schema);
|
||||
size_t pos = 0;
|
||||
rcu_half_units = 0;
|
||||
for (const auto &r : rs.requests) {
|
||||
auto& pk = r.first;
|
||||
auto& cks = r.second;
|
||||
@@ -4682,7 +4678,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
for (rjson::value& json : results) {
|
||||
rjson::push_back(response["Responses"][table], std::move(json));
|
||||
}
|
||||
rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM);
|
||||
} catch(...) {
|
||||
eptr = std::current_exception();
|
||||
// This read of potentially several rows in one partition,
|
||||
@@ -4706,8 +4701,8 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
|
||||
}
|
||||
}
|
||||
pos++;
|
||||
}
|
||||
uint64_t rcu_half_units = consumed_rcu_half_units_per_table[i];
|
||||
_stats.rcu_half_units_total += rcu_half_units;
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->rcu_half_units_total += rcu_half_units;
|
||||
@@ -4717,7 +4712,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rjson::add(entry, "CapacityUnits", rcu_half_units*0.5);
|
||||
rjson::push_back(consumed_capacity, std::move(entry));
|
||||
}
|
||||
responses_sizes_pos++;
|
||||
}
|
||||
|
||||
if (should_add_rcu) {
|
||||
|
||||
@@ -228,12 +228,15 @@ public:
|
||||
const std::optional<attrs_to_get>&,
|
||||
uint64_t* = nullptr);
|
||||
|
||||
// Converts a multi-row selection result to JSON compatible with DynamoDB.
|
||||
// For each row, this method calls item_callback, which takes the size of
|
||||
// the item as the parameter.
|
||||
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice&& slice,
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
||||
uint64_t& rcu_half_units);
|
||||
noncopyable_function<void(uint64_t)> item_callback = {});
|
||||
|
||||
static void describe_single_item(const cql3::selection::selection&,
|
||||
const std::vector<managed_bytes_opt>&,
|
||||
|
||||
@@ -2924,7 +2924,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -233,9 +233,9 @@ future<role_set> ldap_role_manager::query_granted(std::string_view grantee_name,
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map>
|
||||
ldap_role_manager::query_all_directly_granted() {
|
||||
ldap_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
role_to_directly_granted_map result;
|
||||
auto roles = co_await query_all();
|
||||
auto roles = co_await query_all(qs);
|
||||
for (auto& role: roles) {
|
||||
auto granted_set = co_await query_granted(role, recursive_role_query::no);
|
||||
for (auto& granted: granted_set) {
|
||||
@@ -247,8 +247,8 @@ ldap_role_manager::query_all_directly_granted() {
|
||||
co_return result;
|
||||
}
|
||||
|
||||
future<role_set> ldap_role_manager::query_all() {
|
||||
return _std_mgr.query_all();
|
||||
future<role_set> ldap_role_manager::query_all(::service::query_state& qs) {
|
||||
return _std_mgr.query_all(qs);
|
||||
}
|
||||
|
||||
future<> ldap_role_manager::create_role(std::string_view role_name) {
|
||||
@@ -311,12 +311,12 @@ future<bool> ldap_role_manager::can_login(std::string_view role_name) {
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> ldap_role_manager::get_attribute(
|
||||
std::string_view role_name, std::string_view attribute_name) {
|
||||
return _std_mgr.get_attribute(role_name, attribute_name);
|
||||
std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return _std_mgr.get_attribute(role_name, attribute_name, qs);
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name) {
|
||||
return _std_mgr.query_attribute_for_all(attribute_name);
|
||||
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return _std_mgr.query_attribute_for_all(attribute_name, qs);
|
||||
}
|
||||
|
||||
future<> ldap_role_manager::set_attribute(
|
||||
|
||||
@@ -75,9 +75,9 @@ class ldap_role_manager : public role_manager {
|
||||
|
||||
future<role_set> query_granted(std::string_view, recursive_role_query) override;
|
||||
|
||||
future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
future<role_set> query_all() override;
|
||||
future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
future<bool> exists(std::string_view) override;
|
||||
|
||||
@@ -85,9 +85,9 @@ class ldap_role_manager : public role_manager {
|
||||
|
||||
future<bool> can_login(std::string_view) override;
|
||||
|
||||
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view) override;
|
||||
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view, ::service::query_state&) override;
|
||||
|
||||
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view) override;
|
||||
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view, ::service::query_state&) override;
|
||||
|
||||
future<> set_attribute(std::string_view, std::string_view, std::string_view, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -78,11 +78,11 @@ future<role_set> maintenance_socket_role_manager::query_granted(std::string_view
|
||||
return operation_not_supported_exception<role_set>("QUERY GRANTED");
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted() {
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
|
||||
}
|
||||
|
||||
future<role_set> maintenance_socket_role_manager::query_all() {
|
||||
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_set>("QUERY ALL");
|
||||
}
|
||||
|
||||
@@ -98,11 +98,11 @@ future<bool> maintenance_socket_role_manager::can_login(std::string_view role_na
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name) {
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
|
||||
}
|
||||
|
||||
|
||||
@@ -53,9 +53,9 @@ public:
|
||||
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
|
||||
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
virtual future<role_set> query_all() override;
|
||||
virtual future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) override;
|
||||
|
||||
@@ -63,9 +63,9 @@ public:
|
||||
|
||||
virtual future<bool> can_login(std::string_view role_name) override;
|
||||
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -17,12 +17,17 @@
|
||||
#include <seastar/core/format.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "cql3/description.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace service {
|
||||
class query_state;
|
||||
};
|
||||
|
||||
namespace auth {
|
||||
|
||||
struct role_config final {
|
||||
@@ -167,9 +172,9 @@ public:
|
||||
/// (role2, role3)
|
||||
/// }
|
||||
///
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() = 0;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
virtual future<role_set> query_all() = 0;
|
||||
virtual future<role_set> query_all(::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) = 0;
|
||||
|
||||
@@ -186,12 +191,12 @@ public:
|
||||
///
|
||||
/// \returns the value of the named attribute, if one is set.
|
||||
///
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) = 0;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
///
|
||||
/// \returns a mapping of each role's value for the named attribute, if one is set for the role.
|
||||
///
|
||||
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name) = 0;
|
||||
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
|
||||
|
||||
/// Sets `attribute_name` with `attribute_value` for `role_name`.
|
||||
/// \returns an exceptional future with nonexistant_role if the role does not exist.
|
||||
|
||||
@@ -663,21 +663,30 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
|
||||
});
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted() {
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{}",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
qs,
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
|
||||
role_to_directly_granted_map roles_map;
|
||||
co_await _qp.query_internal(query, [&roles_map] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
roles_map.insert({row.get_as<sstring>("member"), row.get_as<sstring>("role")});
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
std::transform(
|
||||
results->begin(),
|
||||
results->end(),
|
||||
std::inserter(roles_map, roles_map.begin()),
|
||||
[] (const cql3::untyped_result_set_row& row) {
|
||||
return std::make_pair(row.get_as<sstring>("member"), row.get_as<sstring>("role")); }
|
||||
);
|
||||
|
||||
co_return roles_map;
|
||||
}
|
||||
|
||||
future<role_set> standard_role_manager::query_all() {
|
||||
future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT {} FROM {}.{}",
|
||||
meta::roles_table::role_col_name,
|
||||
get_auth_ks_name(_qp),
|
||||
@@ -695,7 +704,7 @@ future<role_set> standard_role_manager::query_all() {
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_query_state(),
|
||||
qs,
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
|
||||
role_set roles;
|
||||
@@ -727,11 +736,11 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) {
|
||||
});
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
const auto result_set = co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
if (!result_set->empty()) {
|
||||
const cql3::untyped_result_set_row &row = result_set->one();
|
||||
co_return std::optional<sstring>(row.get_as<sstring>("value"));
|
||||
@@ -739,11 +748,11 @@ future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_
|
||||
co_return std::optional<sstring>{};
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) {
|
||||
return query_all().then([this, attribute_name] (role_set roles) {
|
||||
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) {
|
||||
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) {
|
||||
return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
|
||||
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name, ::service::query_state& qs) {
|
||||
return query_all(qs).then([this, attribute_name, &qs] (role_set roles) {
|
||||
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles), &qs] (attribute_vals &role_to_att_val) {
|
||||
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name, &qs] (sstring role) {
|
||||
return get_attribute(role, attribute_name, qs).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
|
||||
if (att_val) {
|
||||
role_to_att_val.emplace(std::move(role), std::move(*att_val));
|
||||
}
|
||||
@@ -788,7 +797,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
|
||||
future<std::vector<cql3::description>> standard_role_manager::describe_role_grants() {
|
||||
std::vector<cql3::description> result{};
|
||||
|
||||
const auto grants = co_await query_all_directly_granted();
|
||||
const auto grants = co_await query_all_directly_granted(internal_distributed_query_state());
|
||||
result.reserve(grants.size());
|
||||
|
||||
for (const auto& [grantee_role, granted_role] : grants) {
|
||||
|
||||
@@ -66,9 +66,9 @@ public:
|
||||
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
|
||||
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
|
||||
virtual future<role_set> query_all() override;
|
||||
virtual future<role_set> query_all(::service::query_state&) override;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) override;
|
||||
|
||||
@@ -76,9 +76,9 @@ public:
|
||||
|
||||
virtual future<bool> can_login(std::string_view role_name) override;
|
||||
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
|
||||
|
||||
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "create_index_statement.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -92,9 +94,13 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
|
||||
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
|
||||
}
|
||||
|
||||
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
|
||||
throw exceptions::invalid_request_exception(format("Secondary indexes are not supported on base tables with tablets (keyspace '{}')", keyspace()));
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
validate_for_local_index(*schema);
|
||||
|
||||
std::vector<::shared_ptr<index_target>> targets;
|
||||
|
||||
@@ -113,8 +113,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
|
||||
if (rs->uses_tablets()) {
|
||||
warnings.push_back(
|
||||
"Tables in this keyspace will be replicated using Tablets "
|
||||
"and will not support Materialized Views, Secondary Indexes and counters features. "
|
||||
"To use Materialized Views, Secondary Indexes or counters, drop this keyspace and re-create it "
|
||||
"and will not support counters features. To use counters, drop this keyspace and re-create it "
|
||||
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
|
||||
if (ksm->initial_tablets().value()) {
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
|
||||
@@ -152,9 +152,13 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
|
||||
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
|
||||
|
||||
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
|
||||
throw exceptions::invalid_request_exception(format("Materialized views are not supported on base tables with tablets"));
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
if (schema->is_counter()) {
|
||||
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
|
||||
}
|
||||
|
||||
@@ -1756,7 +1756,7 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
|
||||
{"broadcast-tables", feature::BROADCAST_TABLES},
|
||||
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
|
||||
{"tablets", feature::UNUSED},
|
||||
{"views-with-tablets", feature::VIEWS_WITH_TABLETS}
|
||||
{"views-with-tablets", feature::UNUSED}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -136,8 +136,7 @@ struct experimental_features_t {
|
||||
UDF,
|
||||
ALTERNATOR_STREAMS,
|
||||
BROADCAST_TABLES,
|
||||
KEYSPACE_STORAGE_OPTIONS,
|
||||
VIEWS_WITH_TABLETS
|
||||
KEYSPACE_STORAGE_OPTIONS
|
||||
};
|
||||
static std::map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <flat_map>
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/view/base_info.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "db/view/view_consumer.hh"
|
||||
@@ -3715,5 +3716,22 @@ sstring build_status_to_sstring(build_status status) {
|
||||
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
|
||||
}
|
||||
|
||||
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
|
||||
const bool tablet_views_enabled = db.features().views_with_tablets;
|
||||
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
|
||||
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
|
||||
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
|
||||
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
|
||||
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
|
||||
|
||||
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
|
||||
|
||||
if (!required_config && uses_tablets) {
|
||||
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
|
||||
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
|
||||
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
@@ -309,6 +309,18 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
bool use_tablets_basic_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats);
|
||||
|
||||
/// Verify that the provided keyspace is eligible for storing materialized views.
|
||||
///
|
||||
/// Result:
|
||||
/// * If the keyspace is eligible, no effect.
|
||||
/// * If the keyspace is not eligible, an exception is thrown. Its type is not specified,
|
||||
/// and the user of this function cannot make any assumption about it. The carried exception
|
||||
/// message will be worded in a way that can be directly passed on to the end user.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * The provided `keyspace_name` must correspond to an existing keyspace.
|
||||
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -127,8 +127,9 @@ view_building_worker::view_building_worker(replica::database& db, db::system_key
|
||||
init_messaging_service();
|
||||
}
|
||||
|
||||
void view_building_worker::start_background_fibers() {
|
||||
future<> view_building_worker::init() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
co_await discover_existing_staging_sstables();
|
||||
_staging_sstables_registrator = run_staging_sstables_registrator();
|
||||
_view_building_state_observer = run_view_building_state_observer();
|
||||
_mnotifier.register_listener(this);
|
||||
@@ -195,8 +196,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
}
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
co_await discover_existing_staging_sstables();
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
@@ -340,6 +339,7 @@ future<> view_building_worker::run_view_building_state_observer() {
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
_state.some_batch_finished = false;
|
||||
try {
|
||||
vbw_logger.trace("view_building_state_observer() iteration");
|
||||
auto read_apply_mutex_holder = co_await _group0_client.hold_read_apply_mutex(_as);
|
||||
@@ -349,7 +349,12 @@ future<> view_building_worker::run_view_building_state_observer() {
|
||||
_as.check();
|
||||
|
||||
read_apply_mutex_holder.return_all();
|
||||
co_await _vb_state_machine.event.wait();
|
||||
|
||||
// A batch could finished its work while the worker was
|
||||
// updating the state. In that case we should do another iteration.
|
||||
if (!_state.some_batch_finished) {
|
||||
co_await _vb_state_machine.event.wait();
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
} catch (broken_condition_variable&) {
|
||||
} catch (...) {
|
||||
@@ -657,6 +662,7 @@ future<> view_building_worker::local_state::clear_state() {
|
||||
finished_tasks.clear();
|
||||
aborted_tasks.clear();
|
||||
state_updated_cv.broadcast();
|
||||
some_batch_finished = false;
|
||||
vbw_logger.debug("View building worker state was cleared.");
|
||||
}
|
||||
|
||||
@@ -676,6 +682,7 @@ void view_building_worker::batch::start() {
|
||||
return do_work();
|
||||
}).finally([this] () {
|
||||
state = batch_state::finished;
|
||||
_vbw.local()._state.some_batch_finished = true;
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ class view_building_worker : public seastar::peering_sharded_service<view_buildi
|
||||
std::unordered_set<utils::UUID> finished_tasks;
|
||||
std::unordered_set<utils::UUID> aborted_tasks;
|
||||
|
||||
bool some_batch_finished = false;
|
||||
condition_variable state_updated_cv;
|
||||
|
||||
// Clears completed/aborted tasks and creates batches (without starting them) for started tasks.
|
||||
@@ -166,7 +167,7 @@ public:
|
||||
view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier,
|
||||
service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms,
|
||||
view_building_state_machine& vbsm);
|
||||
void start_background_fibers();
|
||||
future<> init();
|
||||
|
||||
future<> register_staging_sstable_tasks(std::vector<sstables::shared_sstable> ssts, table_id table_id);
|
||||
|
||||
|
||||
24
dist/common/scripts/scylla_io_setup
vendored
24
dist/common/scripts/scylla_io_setup
vendored
@@ -131,6 +131,28 @@ def configure_iotune_open_fd_limit(shards_count):
|
||||
logging.error(f"Required FDs count: {precalculated_fds_count}, default limit: {fd_limits}!")
|
||||
sys.exit(1)
|
||||
|
||||
def force_random_request_size_of_4k():
|
||||
"""
|
||||
It is a known bug that on i4i, i7i, i8g, i8ge instances, the disk controller reports the wrong
|
||||
physical sector size as 512bytes, but the actual physical sector size is 4096bytes. This function
|
||||
helps us work around that issue until AWS manages to get a fix for it. It returns 4096 if it
|
||||
detect it's running on one of the affected instance types, otherwise it returns None and IOTune
|
||||
will use the physical sector size reported by the disk.
|
||||
"""
|
||||
path="/sys/devices/virtual/dmi/id/product_name"
|
||||
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
instance_type = f.read().strip()
|
||||
except FileNotFoundError:
|
||||
logging.warning(f"Couldn't find {path}. Falling back to IOTune using the physical sector size reported by disk.")
|
||||
return
|
||||
|
||||
prefixes = ["i7i", "i4i", "i8g", "i8ge"]
|
||||
if any(instance_type.startswith(p) for p in prefixes):
|
||||
return 4096
|
||||
|
||||
|
||||
def run_iotune():
|
||||
if "SCYLLA_CONF" in os.environ:
|
||||
conf_dir = os.environ["SCYLLA_CONF"]
|
||||
@@ -173,6 +195,8 @@ def run_iotune():
|
||||
|
||||
configure_iotune_open_fd_limit(cpudata.nr_shards())
|
||||
|
||||
if (reqsize := force_random_request_size_of_4k()):
|
||||
iotune_args += ["--random-write-io-buffer-size", f"{reqsize}"]
|
||||
try:
|
||||
subprocess.check_call([bindir() + "/iotune",
|
||||
"--format", "envfile",
|
||||
|
||||
27
dist/common/scripts/scylla_raid_setup
vendored
27
dist/common/scripts/scylla_raid_setup
vendored
@@ -17,6 +17,7 @@ import stat
|
||||
import logging
|
||||
import pyudev
|
||||
import psutil
|
||||
import platform
|
||||
from pathlib import Path
|
||||
from scylla_util import *
|
||||
from subprocess import run, SubprocessError
|
||||
@@ -102,6 +103,21 @@ def is_selinux_enabled():
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_kernel_version_at_least(major, minor):
|
||||
"""Check if the Linux kernel version is at least major.minor"""
|
||||
try:
|
||||
kernel_version = platform.release()
|
||||
# Extract major.minor from version string like "5.15.0-56-generic"
|
||||
version_parts = kernel_version.split('.')
|
||||
if len(version_parts) >= 2:
|
||||
kernel_major = int(version_parts[0])
|
||||
kernel_minor = int(version_parts[1])
|
||||
return (kernel_major, kernel_minor) >= (major, minor)
|
||||
except (ValueError, IndexError):
|
||||
# If we can't parse the version, assume older kernel for safety
|
||||
pass
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
print('Requires root permission.')
|
||||
@@ -231,8 +247,17 @@ if __name__ == '__main__':
|
||||
# see https://git.kernel.org/pub/scm/fs/xfs/xfsprogs-dev.git/tree/mkfs/xfs_mkfs.c .
|
||||
# and it also cannot be smaller than the sector size.
|
||||
block_size = max(1024, sector_size)
|
||||
|
||||
run('udevadm settle', shell=True, check=True)
|
||||
run(f'mkfs.xfs -b size={block_size} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
|
||||
|
||||
# On Linux 5.12+, sub-block overwrites are supported well, so keep the default block
|
||||
# size, which will play better with the SSD.
|
||||
if is_kernel_version_at_least(5, 12):
|
||||
block_size_opt = ""
|
||||
else:
|
||||
block_size_opt = f"-b size={block_size}"
|
||||
|
||||
run(f'mkfs.xfs {block_size_opt} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
|
||||
run('udevadm settle', shell=True, check=True)
|
||||
|
||||
if is_debian_variant():
|
||||
|
||||
@@ -202,12 +202,9 @@ enabled. If you plan to use any of the features listed below, CREATE your keyspa
|
||||
:ref:`with tablets disabled <tablets-enable-tablets>`.
|
||||
|
||||
* Counters
|
||||
* Materialized Views (MV) ``*``
|
||||
* Secondary indexes (SI, as it depends on MV) ``*``
|
||||
|
||||
``*`` You can enable experimental support for MV and SI using
|
||||
the ``--experimental-features=views-with-tablets`` configuration option.
|
||||
See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
To enable materialized views and secondary indexes for tablet keyspaces, use
|
||||
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
|
||||
Resharding in keyspaces with tablets enabled has the following limitations:
|
||||
|
||||
|
||||
@@ -341,17 +341,13 @@ credentials and endpoint.
|
||||
Views with Tablets
|
||||
------------------
|
||||
|
||||
By default, Materialized Views (MV) and Secondary Indexes (SI)
|
||||
are disabled in keyspaces that use tablets.
|
||||
|
||||
Support for MV and SI with tablets is experimental and must be explicitly
|
||||
enabled in the ``scylla.yaml`` configuration file by specifying
|
||||
the ``views-with-tablets`` option:
|
||||
Materialized Views (MV) and Secondary Indexes (SI) are enabled in keyspaces that use tablets
|
||||
only when :term:`RF-rack-valid keyspaces <RF-rack-valid keyspace>` are enforced. That can be
|
||||
done in the ``scylla.yaml`` configuration file by specifying
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
experimental_features:
|
||||
- views-with-tablets
|
||||
rf_rack_valid_keyspaces: true
|
||||
|
||||
|
||||
Monitoring
|
||||
|
||||
@@ -53,7 +53,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
|
||||
|
||||
nodetool cluster repair --tablet-tokens 1,10474535988
|
||||
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
|
||||
|
||||
For example:
|
||||
|
||||
|
||||
@@ -38,14 +38,14 @@ Manual Dictionary Training
|
||||
|
||||
You can manually trigger dictionary training using the REST API::
|
||||
|
||||
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&table=mytable"
|
||||
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&cf=mytable"
|
||||
|
||||
Estimating Compression Ratios
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To choose the best compression configuration, you can estimate compression ratios using the REST API::
|
||||
|
||||
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&table=mytable"
|
||||
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&cf=mytable"
|
||||
|
||||
This will return a report with estimated compression ratios for various combinations of compression
|
||||
parameters (algorithm, chunk size, zstd level, dictionary).
|
||||
|
||||
@@ -76,7 +76,7 @@ struct repair_row_level_start_response {
|
||||
|
||||
namespace locator {
|
||||
enum class tablet_repair_incremental_mode : uint8_t {
|
||||
regular,
|
||||
incremental,
|
||||
full,
|
||||
disabled,
|
||||
};
|
||||
|
||||
3
init.cc
3
init.cc
@@ -99,9 +99,6 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
||||
disabled.insert("KEYSPACE_STORAGE_OPTIONS"s);
|
||||
}
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::VIEWS_WITH_TABLETS)) {
|
||||
disabled.insert("VIEWS_WITH_TABLETS"s);
|
||||
}
|
||||
if (cfg.force_gossip_topology_changes()) {
|
||||
if (cfg.enable_tablets_by_default()) {
|
||||
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");
|
||||
|
||||
@@ -754,7 +754,7 @@ tablet_task_type tablet_task_type_from_string(const sstring& name) {
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<locator::tablet_repair_incremental_mode, sstring> tablet_repair_incremental_mode_to_name = {
|
||||
{locator::tablet_repair_incremental_mode::disabled, "disabled"},
|
||||
{locator::tablet_repair_incremental_mode::regular, "regular"},
|
||||
{locator::tablet_repair_incremental_mode::incremental, "incremental"},
|
||||
{locator::tablet_repair_incremental_mode::full, "full"},
|
||||
};
|
||||
|
||||
|
||||
@@ -162,11 +162,11 @@ sstring tablet_task_type_to_string(tablet_task_type);
|
||||
tablet_task_type tablet_task_type_from_string(const sstring&);
|
||||
|
||||
|
||||
// - regular (regular incremental repair): The incremental repair logic is enabled.
|
||||
// - incremental (incremental repair): The incremental repair logic is enabled.
|
||||
// Unrepaired sstables will be included for repair. Repaired sstables will be
|
||||
// skipped. The incremental repair states will be updated after repair.
|
||||
|
||||
// - full (full incremental repair): The incremental repair logic is enabled.
|
||||
// - full (full repair): The incremental repair logic is enabled.
|
||||
// Both repaired and unrepaired sstables will be included for repair. The
|
||||
// incremental repair states will be updated after repair.
|
||||
|
||||
@@ -175,12 +175,12 @@ tablet_task_type tablet_task_type_from_string(const sstring&);
|
||||
// sstables_repaired_at in system.tablets table, will not be updated after
|
||||
// repair.
|
||||
enum class tablet_repair_incremental_mode : uint8_t {
|
||||
regular,
|
||||
incremental,
|
||||
full,
|
||||
disabled,
|
||||
};
|
||||
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::regular};
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
|
||||
|
||||
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
|
||||
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);
|
||||
|
||||
7
main.cc
7
main.cc
@@ -2208,6 +2208,11 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), token_metadata.local().get());
|
||||
|
||||
// Materialized views and secondary indexes are still restricted and require specific configuration
|
||||
// options to work. Make sure that if there are existing views or indexes, they don't violate
|
||||
// the requirements imposed on them.
|
||||
db.local().validate_tablet_views_indexes();
|
||||
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
@@ -2426,7 +2431,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
checkpoint(stop_signal, "starting view building worker's background fibers");
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
view_building_worker.local().start_background_fibers();
|
||||
return view_building_worker.local().init();
|
||||
}).get();
|
||||
auto drain_view_buiding_worker = defer_verbose_shutdown("draining view building worker", [&] {
|
||||
view_building_worker.invoke_on_all(&db::view::view_building_worker::drain).get();
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:103bd12a1f0feb60d814da074b81ebafaa13059d1267ee3612c48a8bc96798b6
|
||||
size 6242980
|
||||
oid sha256:5e35a15a32060d47846c2a5ab29373639e651ac112cc0785306789b8273c63dc
|
||||
size 6299456
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:2cb637e741a2b9badc96f3f175f15db257b9273ea43040289de7d72657b5505a
|
||||
size 6240824
|
||||
oid sha256:71a3e8a3a0e68d35c2e14b553a81e1bc55f6adb73a1988e17e4326923020db2c
|
||||
size 6316028
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <fmt/std.h>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "db/view/view.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
@@ -86,6 +87,7 @@
|
||||
#include "tracing/trace_keyspace_helper.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <flat_set>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace db;
|
||||
@@ -3483,6 +3485,37 @@ void database::check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces
|
||||
}
|
||||
}
|
||||
|
||||
void database::validate_tablet_views_indexes() const {
|
||||
dblog.info("Verifying that all existing materialized views are valid");
|
||||
const data_dictionary::database& db = this->as_data_dictionary();
|
||||
|
||||
std::flat_set<std::string_view> invalid_keyspaces;
|
||||
|
||||
for (const view_ptr& view : get_views()) {
|
||||
const auto& ks = view->ks_name();
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, ks);
|
||||
} catch (...) {
|
||||
invalid_keyspaces.emplace(ks);
|
||||
}
|
||||
}
|
||||
|
||||
if (invalid_keyspaces.empty()) {
|
||||
dblog.info("All existing materialized views are valid");
|
||||
return;
|
||||
}
|
||||
|
||||
// `std::flat_set` guarantees iteration in the increasing order.
|
||||
const std::string ks_list = invalid_keyspaces
|
||||
| std::views::join_with(std::string_view(", "))
|
||||
| std::ranges::to<std::string>();
|
||||
|
||||
dblog.warn("Some of the existing keyspaces violate the requirements "
|
||||
"for using materialized views or secondary indexes. Those features require enabling "
|
||||
"the configuration option `rf_rack_valid_keyspaces` and the cluster feature "
|
||||
"`VIEWS_WITH_TABLETS`. The keyspaces that violate that condition: {}", ks_list);
|
||||
}
|
||||
|
||||
utils::chunked_vector<uint64_t> compute_random_sorted_ints(uint64_t max_value, uint64_t n_values) {
|
||||
static thread_local std::minstd_rand rng{std::random_device{}()};
|
||||
std::uniform_int_distribution<uint64_t> dist(0, max_value);
|
||||
|
||||
@@ -2091,6 +2091,20 @@ public:
|
||||
// * the `locator::topology` instance corresponding to the passed `locator::token_metadata_ptr`
|
||||
// must contain a complete list of racks and data centers in the cluster.
|
||||
void check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces, const locator::token_metadata_ptr) const;
|
||||
|
||||
/// Verify that all existing materialized views are valid.
|
||||
///
|
||||
/// We consider a materialized view valid if one of the following
|
||||
/// conditions is satisfied:
|
||||
/// * it resides in a vnode-based keyspace,
|
||||
/// * it resides in a tablet-based keyspace, the cluster feature `VIEWS_WITH_TABLETS`
|
||||
/// is enabled, and the configuration option `rf_rack_valid_keyspaces` is enabled.
|
||||
///
|
||||
/// Result:
|
||||
/// * Depending on whether there are invalid materialized views, the function will
|
||||
/// log that either everything's OK, or that there are some keyspaces that violate
|
||||
/// the requirement.
|
||||
void validate_tablet_views_indexes() const;
|
||||
private:
|
||||
// SSTable sampling might require considerable amounts of memory,
|
||||
// so we want to limit the number of concurrent sampling operations.
|
||||
|
||||
@@ -319,7 +319,7 @@ future<> service_level_controller::update_service_levels_cache(qos::query_contex
|
||||
});
|
||||
}
|
||||
|
||||
future<> service_level_controller::auth_integration::reload_cache() {
|
||||
future<> service_level_controller::auth_integration::reload_cache(qos::query_context ctx) {
|
||||
SCYLLA_ASSERT(this_shard_id() == global_controller);
|
||||
const auto _ = _stop_gate.hold();
|
||||
|
||||
@@ -336,11 +336,12 @@ future<> service_level_controller::auth_integration::reload_cache() {
|
||||
}
|
||||
auto units = co_await get_units(_sl_controller._global_controller_db->notifications_serializer, 1);
|
||||
|
||||
auto& qs = qos_query_state(ctx);
|
||||
auto& role_manager = _auth_service.underlying_role_manager();
|
||||
const auto all_roles = co_await role_manager.query_all();
|
||||
const auto hierarchy = co_await role_manager.query_all_directly_granted();
|
||||
const auto all_roles = co_await role_manager.query_all(qs);
|
||||
const auto hierarchy = co_await role_manager.query_all_directly_granted(qs);
|
||||
// includes only roles with attached service level
|
||||
const auto attributes = co_await role_manager.query_attribute_for_all("service_level");
|
||||
const auto attributes = co_await role_manager.query_attribute_for_all("service_level", qs);
|
||||
|
||||
std::map<sstring, service_level_options> effective_sl_map;
|
||||
|
||||
@@ -403,7 +404,7 @@ future<> service_level_controller::update_cache(update_both_cache_levels update_
|
||||
}
|
||||
|
||||
if (_auth_integration) {
|
||||
co_await _auth_integration->reload_cache();
|
||||
co_await _auth_integration->reload_cache(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -173,7 +173,7 @@ public:
|
||||
future<std::vector<cql3::description>> describe_attached_service_levels();
|
||||
|
||||
/// Must be executed on shard 0.
|
||||
future<> reload_cache();
|
||||
future<> reload_cache(qos::query_context ctx);
|
||||
|
||||
void clear_cache();
|
||||
};
|
||||
|
||||
@@ -497,7 +497,15 @@ future<> group0_voter_handler::update_nodes(
|
||||
};
|
||||
|
||||
// Helper for adding a single node to the nodes list
|
||||
auto add_node = [&nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
|
||||
auto add_node = [this, &nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
|
||||
// Some topology members may not belong to the new group 0 in the Raft-based recovery procedure.
|
||||
if (!group0_config.contains(id)) {
|
||||
if (!_gossiper.get_recovery_leader()) {
|
||||
rvlogger.warn("node {} in state {} is not a part of the group 0 configuration {}, ignoring",
|
||||
id, rs.state, group0_config);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const auto is_voter = group0_config.can_vote(id);
|
||||
const auto is_leader = (id == leader_id);
|
||||
nodes.emplace(id, group0_voter_calculator::node_descriptor{
|
||||
|
||||
@@ -3642,7 +3642,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
auto cached_partitions_file = caching == use_caching::yes
|
||||
? _cached_partitions_file
|
||||
: seastar::make_shared<cached_file>(
|
||||
_partitions_file,
|
||||
uncached_partitions_file(),
|
||||
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
||||
_manager.get_cache_tracker().get_lru(),
|
||||
_manager.get_cache_tracker().region(),
|
||||
@@ -3652,7 +3652,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
auto cached_rows_file = caching == use_caching::yes
|
||||
? _cached_rows_file
|
||||
: seastar::make_shared<cached_file>(
|
||||
_rows_file,
|
||||
uncached_rows_file(),
|
||||
_manager.get_cache_tracker().get_index_cached_file_stats(),
|
||||
_manager.get_cache_tracker().get_lru(),
|
||||
_manager.get_cache_tracker().region(),
|
||||
|
||||
@@ -335,6 +335,85 @@ def test_simple_batch_get_items(test_table_sb):
|
||||
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
|
||||
assert 2 == response['ConsumedCapacity'][0]['CapacityUnits']
|
||||
|
||||
# This test reproduces a bug where the consumed capacity was divided by 16 MB,
|
||||
# instead of 4 KB. The general formula for RCU per item is the same as for
|
||||
# GetItem, namely:
|
||||
#
|
||||
# CEIL(ItemSizeInBytes / 4096) * (1 if strong consistency, 0.5 if eventual
|
||||
# consistency)
|
||||
#
|
||||
# The RCU is calculated for each item individually, and the results are summed
|
||||
# for the total cost of the BatchGetItem. In this case, the larger item is
|
||||
# rounded up to 68KB, giving 17 RCUs, and the smaller item to 20KB, which
|
||||
# results in 5 RCUs, making the total consumed capacity for this operation
|
||||
# 22 RCUs.
|
||||
def test_batch_get_items_large(test_table_sb):
|
||||
p1 = random_string()
|
||||
c1 = random_bytes()
|
||||
test_table_sb.put_item(Item={'p': p1, 'c': c1, 'a': 'a' * 64 * KB})
|
||||
|
||||
p2 = random_string()
|
||||
c2 = random_bytes()
|
||||
test_table_sb.put_item(Item={'p': p2, 'c': c2, 'a': 'a' * 16 * KB})
|
||||
|
||||
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_sb.name: {'Keys': [{'p': p1, 'c': c1}, {'p': p2, 'c': c2}], 'ConsistentRead': True}}, ReturnConsumedCapacity='TOTAL')
|
||||
assert 'ConsumedCapacity' in response
|
||||
assert 'TableName' in response['ConsumedCapacity'][0]
|
||||
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
|
||||
assert 22 == response['ConsumedCapacity'][0]['CapacityUnits']
|
||||
|
||||
# Helper function to generate item_count items and batch write them to the
|
||||
# table. Returns the list of generated items.
|
||||
def prepare_items(table, item_factory, item_count=10):
|
||||
items = []
|
||||
with table.batch_writer() as writer:
|
||||
for i in range(item_count):
|
||||
item = item_factory(i)
|
||||
items.append(item)
|
||||
writer.put_item(Item=item)
|
||||
return items
|
||||
|
||||
# This test verifies if querying two tables, each containing multiple ~30 byte
|
||||
# items, reports the RCU correctly. A single item should consume 1 RCU, because
|
||||
# the items' sizes are rounded up separately to 1 KB (ConsistentReads), and
|
||||
# RCU should be reported per table. A variant of test_batch_get_items_large.
|
||||
def test_batch_get_items_many_small(test_table_s, test_table_sb):
|
||||
# Each item should be about 30 bytes.
|
||||
items_sb = prepare_items(test_table_sb, lambda i: {'p': f'item_{i}_' + random_string(), 'c': random_bytes()})
|
||||
items_s = prepare_items(test_table_s, lambda i: {'p': f'item_{i}_' + random_string()})
|
||||
|
||||
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_sb.name: {'Keys': items_sb, 'ConsistentRead': True},
|
||||
test_table_s.name: {'Keys': items_s, 'ConsistentRead': True},
|
||||
}, ReturnConsumedCapacity='TOTAL')
|
||||
|
||||
assert 'ConsumedCapacity' in response
|
||||
assert len(response['ConsumedCapacity']) == 2
|
||||
expected_tables = {test_table_sb.name, test_table_s.name}
|
||||
for consumption_per_table in response['ConsumedCapacity']:
|
||||
assert 'TableName' in consumption_per_table
|
||||
assert consumption_per_table['CapacityUnits'] == 10, f"Table {consumption_per_table['TableName']} reported {consumption_per_table['CapacityUnits']} RCUs, expected 10"
|
||||
assert consumption_per_table['TableName'] in expected_tables
|
||||
expected_tables.remove(consumption_per_table['TableName'])
|
||||
assert not expected_tables
|
||||
|
||||
# This test verifies if querying a single partition reports the RCU correctly.
|
||||
# This test is similar to test_batch_get_items_many_small.
|
||||
def test_batch_get_items_many_small_single_partition(test_table_sb):
|
||||
# Each item should be about 20 bytes.
|
||||
pk = random_string()
|
||||
items_sb = prepare_items(test_table_sb, lambda _: {'p': pk, 'c': random_bytes()})
|
||||
|
||||
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_sb.name: {'Keys': items_sb, 'ConsistentRead': True},
|
||||
}, ReturnConsumedCapacity='TOTAL')
|
||||
|
||||
assert 'ConsumedCapacity' in response
|
||||
assert 'TableName' in response['ConsumedCapacity'][0]
|
||||
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
|
||||
assert 10 == response['ConsumedCapacity'][0]['CapacityUnits']
|
||||
|
||||
# Validate that when getting a batch of requests
|
||||
# From multiple tables we get an RCU for each of the tables
|
||||
# We also validate that the eventual consistency return half the units
|
||||
|
||||
@@ -25,17 +25,23 @@
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_empty) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_THROW(tools::load_schemas(dbcfg, "").get(), std::exception);
|
||||
BOOST_REQUIRE_THROW(tools::load_schemas(dbcfg, ";").get(), std::exception);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_keyspace_only) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};").get().size(), 0);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_single_table) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf (pk int PRIMARY KEY, v int)").get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf (pk int PRIMARY KEY, v map<int, int>)").get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
|
||||
@@ -43,6 +49,8 @@ SEASTAR_THREAD_TEST_CASE(test_single_table) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_keyspace_replication_strategy) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'mydc1': 1, 'mydc2': 4}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
|
||||
@@ -50,6 +58,8 @@ SEASTAR_THREAD_TEST_CASE(test_keyspace_replication_strategy) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multiple_tables) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf1 (pk int PRIMARY KEY, v int); CREATE TABLE ks.cf2 (pk int PRIMARY KEY, v int)").get().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf1 (pk int PRIMARY KEY, v int); CREATE TABLE ks.cf2 (pk int PRIMARY KEY, v int);").get().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(
|
||||
@@ -70,6 +80,8 @@ SEASTAR_THREAD_TEST_CASE(test_multiple_tables) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_udts) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(
|
||||
dbcfg,
|
||||
"CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; "
|
||||
@@ -107,6 +119,8 @@ SEASTAR_THREAD_TEST_CASE(test_udts) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_dropped_columns) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tools::load_schemas(
|
||||
dbcfg,
|
||||
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int); "
|
||||
@@ -177,6 +191,7 @@ void check_views(std::vector<schema_ptr> schemas, std::vector<view_type> views_t
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_materialized_view) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
check_views(
|
||||
tools::load_schemas(
|
||||
@@ -219,6 +234,7 @@ SEASTAR_THREAD_TEST_CASE(test_materialized_view) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_index) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
check_views(
|
||||
tools::load_schemas(
|
||||
@@ -269,6 +285,7 @@ SEASTAR_THREAD_TEST_CASE(test_index) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_mv_index) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
check_views(
|
||||
tools::load_schemas(
|
||||
@@ -308,6 +325,7 @@ void check_schema_columns(const schema& a, const schema& b, bool check_key_colum
|
||||
|
||||
void check_sstable_schema(sstables::test_env& env, std::filesystem::path sst_path, const utils::chunked_vector<mutation>& mutations, bool has_scylla_metadata) {
|
||||
db::config dbcfg;
|
||||
dbcfg.rf_rack_valid_keyspaces(true);
|
||||
|
||||
auto schema = tools::load_schema_from_sstable(dbcfg, sst_path).get();
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(sstable_compressor_factory_test)
|
||||
|
||||
@@ -27,7 +28,7 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
|
||||
|
||||
// Create a compressor factory.
|
||||
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
|
||||
tests::require(shard_to_numa_mapping.size() == smp::count);
|
||||
auto config = default_sstable_compressor_factory::config{
|
||||
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
|
||||
};
|
||||
@@ -68,8 +69,8 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
|
||||
// Check that the dictionary used by this shard lies on the same NUMA node.
|
||||
// This is important to avoid cross-node memory accesses on the hot path.
|
||||
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
|
||||
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
|
||||
tests::require_equal(our_numa_node, compressor_numa_node);
|
||||
tests::require_equal(our_numa_node, decompressor_numa_node);
|
||||
|
||||
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
|
||||
@@ -79,22 +80,22 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
auto compressed_size = compressor->compress(
|
||||
reinterpret_cast<const char*>(message.data()), message.size(),
|
||||
reinterpret_cast<char*>(compressed.data()), compressed.size());
|
||||
BOOST_REQUIRE_GE(compressed_size, 0);
|
||||
tests::require_greater_equal(compressed_size, 0);
|
||||
compressed.resize(compressed_size);
|
||||
|
||||
// Validate that the recommeded dict was actually used.
|
||||
BOOST_CHECK(compressed.size() < message.size() / 10);
|
||||
tests::require_less(compressed.size(), message.size() / 10);
|
||||
|
||||
auto decompressed = std::vector<char>(message.size());
|
||||
auto decompressed_size = decompressor->uncompress(
|
||||
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
|
||||
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
|
||||
BOOST_REQUIRE_GE(decompressed_size, 0);
|
||||
tests::require_greater_equal(decompressed_size, 0);
|
||||
decompressed.resize(decompressed_size);
|
||||
|
||||
// Validate that the roundtrip through compressor and decompressor
|
||||
// resulted in the original message.
|
||||
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
|
||||
tests::require(std::equal(message.begin(), message.end(), decompressed.begin(), decompressed.end()));
|
||||
})).get();
|
||||
}
|
||||
|
||||
@@ -102,11 +103,11 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
|
||||
// of NUMA nodes.
|
||||
// This isn't that important, but we don't want to duplicate dictionaries
|
||||
// within a NUMA node unnecessarily.
|
||||
BOOST_CHECK_EQUAL(
|
||||
tests::require_equal(
|
||||
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
BOOST_CHECK_EQUAL(
|
||||
tests::require_equal(
|
||||
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
|
||||
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
|
||||
);
|
||||
|
||||
286
test/cluster/dtest/limits_test.py
Normal file
286
test/cluster/dtest/limits_test.py
Normal file
@@ -0,0 +1,286 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import logging
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
from dtest_class import Tester, create_ks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# Those are ideal values according to c* specifications
|
||||
# they should pass
|
||||
|
||||
LIMIT_64_K = 64 * 1024
|
||||
LIMIT_32K = 32 * 1024
|
||||
LIMIT_128K = 128 * 1024
|
||||
LIMIT_2GB = 2 * 1024 * 1024 * 1024
|
||||
|
||||
MAX_KEY_SIZE = LIMIT_64_K
|
||||
MAX_BLOB_SIZE = 8388608 # theoretical limit LIMIT_2GB
|
||||
MAX_COLUMNS = LIMIT_128K
|
||||
MAX_TUPLES = LIMIT_32K
|
||||
MAX_BATCH_SIZE = 50 * 1024
|
||||
MAX_CELLS_COLUMNS = LIMIT_32K
|
||||
MAX_CELLS_BATCH_SIZE = 50
|
||||
MAX_CELLS = 16777216
|
||||
|
||||
# Those are values used to validate the tests code
|
||||
# MAX_KEY_SIZE = 1000
|
||||
# MAX_BLOB_SIZE = 1000
|
||||
# MAX_COLUMNS = 1000
|
||||
# MAX_TUPLES = 1000
|
||||
# MAX_BATCH_SIZE = 1000
|
||||
# MAX_CELLS_COLUMNS = 100
|
||||
# MAX_CELLS_BATCH_SIZE = 100
|
||||
# MAX_CELLS = 1000
|
||||
|
||||
|
||||
@pytest.mark.single_node
|
||||
class TestLimits(Tester):
|
||||
def prepare(self):
|
||||
"""
|
||||
Sets up node to test against.
|
||||
"""
|
||||
cluster = self.cluster
|
||||
return cluster
|
||||
|
||||
def _do_test_max_key_length(self, session, node, size, expect_failure=False):
|
||||
print("Testing max key length for {}.{}".format(size, " Expected failure..." if expect_failure else ""))
|
||||
key_name = "k" * size
|
||||
|
||||
c = f"CREATE TABLE test1 ({key_name} int PRIMARY KEY)"
|
||||
if expect_failure:
|
||||
expected_error = r"Key size too large: \d+ > 65535"
|
||||
self.ignore_log_patterns += [expected_error]
|
||||
with pytest.raises(Exception, match=expected_error):
|
||||
session.execute(c)
|
||||
return
|
||||
|
||||
session.execute(c)
|
||||
|
||||
session.execute("insert into ks.test1 (%s) values (1);" % key_name)
|
||||
session.execute("insert into ks.test1 (%s) values (2);" % key_name)
|
||||
|
||||
node.flush()
|
||||
# Select
|
||||
res = session.execute(
|
||||
"""
|
||||
SELECT * FROM ks.test1
|
||||
WHERE %s=1
|
||||
"""
|
||||
% key_name
|
||||
)
|
||||
|
||||
assert len(res.current_rows) == 1
|
||||
|
||||
res = session.execute(
|
||||
"""
|
||||
SELECT * FROM ks.test1
|
||||
WHERE %s=2
|
||||
"""
|
||||
% key_name
|
||||
)
|
||||
|
||||
assert len(res.current_rows) == 1
|
||||
session.execute("""DROP TABLE test1""")
|
||||
|
||||
def test_max_key_length(self):
|
||||
cluster = self.prepare()
|
||||
cluster.populate(1).start()
|
||||
node = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node)
|
||||
create_ks(session, "ks", 1)
|
||||
|
||||
# biggest that will currently work in scylla
|
||||
# key_name = "k" * 65526
|
||||
self._do_test_max_key_length(session, node, MAX_KEY_SIZE, expect_failure=True)
|
||||
self._do_test_max_key_length(session, node, MAX_KEY_SIZE - 9, expect_failure=True)
|
||||
|
||||
self._do_test_max_key_length(session, node, MAX_KEY_SIZE - 10)
|
||||
|
||||
size = MAX_KEY_SIZE // 2
|
||||
while size >= 1:
|
||||
self._do_test_max_key_length(session, node, size)
|
||||
size >>= 3
|
||||
|
||||
def _do_test_blob_size(self, session, node, size):
|
||||
print("Testing blob size %i" % size)
|
||||
|
||||
blob_a = "a" * size
|
||||
blob_b = "b" * size
|
||||
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE test1 (
|
||||
user ascii PRIMARY KEY,
|
||||
payload blob,
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
session.execute("insert into ks.test1 (user, payload) values ('tintin', textAsBlob('%s'));" % blob_a)
|
||||
session.execute("insert into ks.test1 (user, payload) values ('milou', textAsBlob('%s'));" % blob_b)
|
||||
|
||||
node.flush()
|
||||
# Select
|
||||
res = session.execute(
|
||||
"""
|
||||
SELECT * FROM ks.test1
|
||||
WHERE user='tintin'
|
||||
"""
|
||||
)
|
||||
|
||||
assert len(list(res)) == 1
|
||||
|
||||
res = session.execute(
|
||||
"""
|
||||
SELECT * FROM ks.test1
|
||||
WHERE user='milou'
|
||||
"""
|
||||
)
|
||||
|
||||
assert len(list(res)) == 1
|
||||
session.execute("""DROP TABLE test1""")
|
||||
|
||||
def test_max_column_value_size(self):
|
||||
cluster = self.prepare()
|
||||
cluster.populate(1).start()
|
||||
node = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node)
|
||||
create_ks(session, "ks", 1)
|
||||
|
||||
size = 1
|
||||
for i in range(int(math.log(MAX_BLOB_SIZE, 2))):
|
||||
size <<= 1
|
||||
self._do_test_blob_size(session, node, size - 1)
|
||||
|
||||
def _do_test_max_tuples(self, session, node, count):
|
||||
print("Testing max tuples for %i" % count)
|
||||
t = ""
|
||||
v = ""
|
||||
for i in range(count):
|
||||
t += "int, "
|
||||
v += "1, "
|
||||
t = t[:-2]
|
||||
v = v[:-2]
|
||||
|
||||
c = (
|
||||
"""
|
||||
CREATE TABLE stuff (
|
||||
k int PRIMARY KEY,
|
||||
v frozen<tuple<%s>>
|
||||
);
|
||||
"""
|
||||
% t
|
||||
)
|
||||
session.execute(c)
|
||||
|
||||
c = "INSERT INTO stuff (k, v) VALUES(0, (%s));" % v
|
||||
session.execute(c)
|
||||
|
||||
c = "SELECT * FROM STUFF;"
|
||||
res = session.execute(c)
|
||||
assert len(res.current_rows) == 1
|
||||
|
||||
session.execute("""DROP TABLE stuff""")
|
||||
|
||||
def test_max_tuple(self):
|
||||
cluster = self.prepare()
|
||||
cluster.populate(1).start()
|
||||
node = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node)
|
||||
create_ks(session, "ks", 1)
|
||||
|
||||
count = 1
|
||||
for i in range(int(math.log(MAX_TUPLES, 2))):
|
||||
count <<= 1
|
||||
self._do_test_max_tuples(session, node, count - 1)
|
||||
|
||||
def _do_test_max_batch_size(self, session, node, size):
|
||||
print("Testing max batch size for size=%i" % size)
|
||||
c = """
|
||||
CREATE TABLE stuff (
|
||||
k int PRIMARY KEY,
|
||||
v text
|
||||
);
|
||||
"""
|
||||
session.execute(c)
|
||||
|
||||
c = "BEGIN UNLOGGED BATCH\n"
|
||||
row_size = 1000
|
||||
overhead = 100
|
||||
blob = (row_size - overhead) * "x"
|
||||
rows = size // row_size
|
||||
for i in range(rows):
|
||||
c += "INSERT INTO stuff (k, v) VALUES(%i, '%s')\n" % (i, blob)
|
||||
c += "APPLY BATCH;\n"
|
||||
|
||||
session.execute(c)
|
||||
|
||||
c = "SELECT * FROM STUFF;"
|
||||
res = session.execute(c)
|
||||
|
||||
assert len(list(res)) == rows
|
||||
session.execute("""DROP TABLE STUFF""")
|
||||
|
||||
def test_max_batch_size(self):
|
||||
cluster = self.prepare()
|
||||
cluster.populate(1).start()
|
||||
node = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node)
|
||||
create_ks(session, "ks", 1)
|
||||
|
||||
size = 1
|
||||
for i in range(int(math.log(MAX_BATCH_SIZE, 2))):
|
||||
size <<= 1
|
||||
self._do_test_max_batch_size(session, node, size - 1)
|
||||
|
||||
def _do_test_max_cell_count(self, session, cells):
|
||||
print("Testing max cells count for %i" % cells)
|
||||
keys = ""
|
||||
keys_create = ""
|
||||
columns = MAX_CELLS_COLUMNS
|
||||
for i in range(columns):
|
||||
keys += "key" + str(i) + ", "
|
||||
keys_create += "key" + str(i) + " int, "
|
||||
values = "1, " * columns
|
||||
|
||||
c = """CREATE TABLE test1 (%s blub int PRIMARY KEY,)""" % keys_create
|
||||
session.execute(c)
|
||||
|
||||
batch_size = MAX_CELLS_BATCH_SIZE
|
||||
rows = cells // columns
|
||||
c = "BEGIN UNLOGGED BATCH\n"
|
||||
for i in range(rows):
|
||||
c += "insert into ks.test1 (%s blub) values (%s %i);\n" % (keys, values, i)
|
||||
if i == rows - 1 or (i + 1) % batch_size == 0:
|
||||
c += "APPLY BATCH;\n"
|
||||
session.execute(c)
|
||||
c = "BEGIN UNLOGGED BATCH\n"
|
||||
|
||||
session.execute("""DROP TABLE test1""")
|
||||
|
||||
def test_max_cells(self):
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
pytest.skip("client times out in debug mode")
|
||||
cluster = self.prepare()
|
||||
cluster.set_configuration_options(values={"query_tombstone_page_limit": 9999999, "batch_size_warn_threshold_in_kb": 1024 * 1024, "batch_size_fail_threshold_in_kb": 1024 * 1024, "commitlog_segment_size_in_mb": 64})
|
||||
cluster.populate(1).start(jvm_args=["--smp", "1", "--memory", "2G", "--logger-log-level", "lsa-timing=debug"])
|
||||
node = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node)
|
||||
create_ks(session, "ks", 1)
|
||||
|
||||
cells = 1
|
||||
for i in range(int(math.log(MAX_CELLS, 2))):
|
||||
cells <<= 1
|
||||
self._do_test_max_cell_count(session, cells - 1)
|
||||
120
test/cluster/mv/test_mv_simple.py
Normal file
120
test/cluster/mv/test_mv_simple.py
Normal file
@@ -0,0 +1,120 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
from cassandra.cluster import Session as CassandraSession
|
||||
from cassandra.protocol import InvalidRequest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("schema_kind", ["view", "index"])
|
||||
# Views no longer depend on the experimental feature `views-with-tablets`,
|
||||
# but let's keep these test cases to make sure it's really not needed anymore.
|
||||
@pytest.mark.parametrize("views_with_tablets", [False, True])
|
||||
@pytest.mark.parametrize("rf_rack_valid_keyspaces", [False, True])
|
||||
async def test_mv_and_index_restrictions_in_tablet_keyspaces(manager: ManagerClient, schema_kind: str,
|
||||
views_with_tablets: bool, rf_rack_valid_keyspaces: bool):
|
||||
"""
|
||||
Verify that creating a materialized view or a secondary index in a tablet-based keyspace
|
||||
is only possible when both the configuration option `rf_rack_valid_keyspaces` is enabled.
|
||||
"""
|
||||
|
||||
async def create_mv_or_index(cql: CassandraSession):
|
||||
if schema_kind == "view":
|
||||
await cql.run_async("CREATE MATERIALIZED VIEW ks.mv "
|
||||
"AS SELECT * FROM ks.t "
|
||||
"WHERE p IS NOT NULL AND v IS NOT NULL "
|
||||
"PRIMARY KEY (v, p)")
|
||||
elif schema_kind == "index":
|
||||
await cql.run_async("CREATE INDEX myindex ON ks.t(v)")
|
||||
else:
|
||||
assert False, "Unknown schema kind"
|
||||
|
||||
async def try_pass(cql: CassandraSession):
|
||||
try:
|
||||
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = "
|
||||
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
|
||||
"AND tablets = {'enabled': true}")
|
||||
await cql.run_async(f"CREATE TABLE ks.t (p int PRIMARY KEY, v int)")
|
||||
await create_mv_or_index(cql)
|
||||
finally:
|
||||
await cql.run_async(f"DROP KEYSPACE IF EXISTS ks")
|
||||
|
||||
async def try_fail(cql: CassandraSession):
|
||||
err = "Materialized views and secondary indexes are not supported on base tables with tablets. " \
|
||||
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and " \
|
||||
"make sure that the cluster feature `VIEWS_WITH_TABLETS` is enabled."
|
||||
with pytest.raises(InvalidRequest, match=err):
|
||||
await try_pass(cql)
|
||||
|
||||
feature = ["views-with-tablets"] if views_with_tablets else []
|
||||
config = {"experimental_features": feature, "rf_rack_valid_keyspaces": rf_rack_valid_keyspaces}
|
||||
|
||||
srv = await manager.server_add(config=config)
|
||||
|
||||
# Necessary because we're restarting the node multiple times.
|
||||
cql, _ = await manager.get_ready_cql([srv])
|
||||
logger.debug("Obtained CassandraSession object")
|
||||
|
||||
# We just want to validate the statements. We don't need to wait.
|
||||
assert hasattr(cql.cluster, "max_schema_agreement_wait")
|
||||
cql.cluster.max_schema_agreement_wait = 0
|
||||
logger.debug("Set max_schema_agreement_wait to 0")
|
||||
|
||||
if rf_rack_valid_keyspaces:
|
||||
await try_pass(cql)
|
||||
logger.debug("try_pass finished successfully")
|
||||
else:
|
||||
await try_fail(cql)
|
||||
logger.debug("try_fail finished successfully")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("view_type", ["view", "index"])
|
||||
async def test_view_startup(manager: ManagerClient, view_type: str):
|
||||
"""
|
||||
Verify that starting a node with materialized views in a tablet-based
|
||||
keyspace when the configuration option `rf_rack_valid_keyspaces` is disabled
|
||||
leads to a warning.
|
||||
"""
|
||||
|
||||
srv = await manager.server_add(config={"rf_rack_valid_keyspaces": True})
|
||||
cql = manager.get_cql()
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ks WITH replication = "
|
||||
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
|
||||
"AND tablets = {'enabled': true}")
|
||||
await cql.run_async("CREATE TABLE ks.t (p int PRIMARY KEY, v int)")
|
||||
|
||||
if view_type == "view":
|
||||
await cql.run_async("CREATE MATERIALIZED VIEW ks.mv "
|
||||
"AS SELECT * FROM ks.t "
|
||||
"WHERE p IS NOT NULL AND v IS NOT NULL "
|
||||
"PRIMARY KEY (v, p)")
|
||||
elif view_type == "index":
|
||||
await cql.run_async("CREATE INDEX i ON ks.t(v)")
|
||||
else:
|
||||
logger.error(f"Unexpected view type: {view_type}")
|
||||
assert False
|
||||
|
||||
await manager.server_stop(srv.server_id)
|
||||
await manager.server_update_config(srv.server_id, "rf_rack_valid_keyspaces", False)
|
||||
|
||||
log = await manager.server_open_log(srv.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
start_task = asyncio.create_task(manager.server_start(srv.server_id))
|
||||
err = "Some of the existing keyspaces violate the requirements for using materialized " \
|
||||
"views or secondary indexes. Those features require enabling the configuration " \
|
||||
"option `rf_rack_valid_keyspaces` and the cluster feature `VIEWS_WITH_TABLETS`. " \
|
||||
"The keyspaces that violate that condition: ks"
|
||||
await log.wait_for(err, from_mark=mark)
|
||||
await start_task
|
||||
@@ -256,6 +256,11 @@ async def test_mv_pairing_during_replace(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("delayed_replica", ["base", "mv"])
|
||||
@pytest.mark.parametrize("altered_dc", ["dc1", "dc2"])
|
||||
# FIXME: The test relies on cross-rack tablet migrations. They're forbidden when the configuration option
|
||||
# `rf_rack_valid_keyspaces` is enabled. On the other hand, materialized views in tablet-based keyspaces
|
||||
# require the configuration option to be used.
|
||||
# Hence, we need to rewrite this test.
|
||||
@pytest.mark.skip
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_mv_rf_change(manager: ManagerClient, delayed_replica: str, altered_dc: str):
|
||||
servers = []
|
||||
@@ -331,8 +336,8 @@ async def test_mv_first_replica_in_dc(manager: ManagerClient, delayed_replica: s
|
||||
# If we run the test with more than 1 shard and the tablet for the view table gets allocated on the same shard as the tablet of the base table,
|
||||
# we'll perform an intranode migration of one of these tablets to the other shard. This migration can be confused with the migration to the
|
||||
# new dc in the "first_migration_done()" below. To avoid this, run servers with only 1 shard.
|
||||
servers.append(await manager.server_add(cmdline=['--smp', '1'], config={'rf_rack_valid_keyspaces': False}, property_file={'dc': f'dc1', 'rack': 'myrack1'}))
|
||||
servers.append(await manager.server_add(cmdline=['--smp', '1'], config={'rf_rack_valid_keyspaces': False}, property_file={'dc': f'dc2', 'rack': 'myrack1'}))
|
||||
servers.append(await manager.server_add(cmdline=['--smp', '1'], property_file={'dc': f'dc1', 'rack': 'myrack1'}))
|
||||
servers.append(await manager.server_add(cmdline=['--smp', '1'], property_file={'dc': f'dc2', 'rack': 'myrack1'}))
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE IF NOT EXISTS ks WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} AND tablets = {'initial': 1}")
|
||||
|
||||
@@ -614,8 +614,15 @@ CLUSTER_EVENTS: tuple[ClusterEventType, ...] = (
|
||||
sleep_for_30_seconds,
|
||||
add_new_table,
|
||||
drop_table,
|
||||
add_index,
|
||||
drop_index,
|
||||
|
||||
# FIXME: We omit creating or dropping indexes because the random_failures
|
||||
# tests still haven't been adjusted to work with `rf_rack_valid_keyspaces`.
|
||||
# That option is a requirement for using materialized views
|
||||
# in tablet-based keyspaces, so let's skip them.
|
||||
#
|
||||
# add_index,
|
||||
# drop_index,
|
||||
|
||||
add_new_keyspace,
|
||||
drop_keyspace,
|
||||
add_cdc,
|
||||
|
||||
@@ -25,6 +25,7 @@ run_first:
|
||||
skip_in_release:
|
||||
- test_raft_cluster_features
|
||||
- test_cluster_features
|
||||
- dtest/limits_test
|
||||
skip_in_debug:
|
||||
- test_shutdown_hang
|
||||
- test_replace
|
||||
|
||||
@@ -14,10 +14,16 @@ from typing import Any
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import ScyllaMetrics
|
||||
|
||||
# main logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
|
||||
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
|
||||
def get_io_read_ops(metrics: list[ScyllaMetrics]) -> int:
|
||||
return int(sum([m.get("scylla_io_queue_total_read_ops") for m in metrics]))
|
||||
|
||||
async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: Any):
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await asyncio.gather(*[manager.server_update_config(s.server_id, key, value) for s in servers])
|
||||
@@ -98,7 +104,9 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
|
||||
|
||||
async def test_bti_usage_during_reads(should_use_bti: bool, use_cache: bool):
|
||||
select = select_with_cache if use_cache else select_without_cache
|
||||
metrics_before = await get_metrics(manager, servers)
|
||||
select_result = cql.execute(select, (chosen_pk, chosen_ck), trace=True)
|
||||
metrics_after = await get_metrics(manager, servers)
|
||||
row = select_result.one()
|
||||
assert row.pk == chosen_pk
|
||||
assert row.ck == chosen_ck
|
||||
@@ -113,14 +121,25 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
|
||||
seen_partitions = seen_partitions or "Partitions.db" in event.description
|
||||
seen_rows = seen_rows or "Rows.db" in event.description
|
||||
seen_index = seen_index or "Index.db" in event.description
|
||||
if should_use_bti:
|
||||
assert not seen_index, "Index.db was used despite BTI preference"
|
||||
assert seen_partitions, "Partitions.db was not used despite BTI preference"
|
||||
assert seen_rows, "Rows.db was not used despite BTI preference"
|
||||
else:
|
||||
assert seen_index, "Index.db was not used despite BIG preference"
|
||||
assert not seen_partitions, "Partitions.db was used despite BIG preference"
|
||||
assert not seen_rows, "Rows.db was used despite BIG preference"
|
||||
|
||||
if not use_cache:
|
||||
if should_use_bti:
|
||||
assert not seen_index, "Index.db was used despite BTI preference"
|
||||
assert seen_partitions, "Partitions.db was not used despite BTI preference"
|
||||
assert seen_rows, "Rows.db was not used despite BTI preference"
|
||||
else:
|
||||
assert seen_index, "Index.db was not used despite BIG preference"
|
||||
assert not seen_partitions, "Partitions.db was used despite BIG preference"
|
||||
assert not seen_rows, "Rows.db was used despite BIG preference"
|
||||
|
||||
# Test that BYPASS CACHE does force disk reads.
|
||||
io_read_ops = get_io_read_ops(metrics_after) - get_io_read_ops(metrics_before)
|
||||
if should_use_bti:
|
||||
# At least one read for Partitions.db, Rows.db, Data.db
|
||||
assert io_read_ops >= 3
|
||||
else:
|
||||
# At least one read in Index.db (main index), Index.db (promoted index), Data.db
|
||||
assert io_read_ops >= 3
|
||||
|
||||
logger.info("Step 3: Checking for BTI files (should not exist, because cluster feature is suppressed)")
|
||||
await test_files_presence(bti_should_exist=False, big_should_exist=True)
|
||||
@@ -143,7 +162,10 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
|
||||
await asyncio.gather(*[manager.api.keyspace_upgrade_sstables(s.ip_addr, ks_name) for s in servers])
|
||||
logger.info("Step 7: Checking for BTI files (should exist)")
|
||||
await test_files_presence(bti_should_exist=True, big_should_exist=False)
|
||||
await test_bti_usage_during_reads(should_use_bti=True, use_cache=False)
|
||||
await test_bti_usage_during_reads(should_use_bti=True, use_cache=True)
|
||||
|
||||
# Test that BYPASS CACHE does its thing.
|
||||
for _ in range(3):
|
||||
await test_bti_usage_during_reads(should_use_bti=True, use_cache=False)
|
||||
await test_bti_usage_during_reads(should_use_bti=True, use_cache=True)
|
||||
|
||||
manager.driver_close()
|
||||
|
||||
@@ -655,7 +655,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
assert read1 == 0
|
||||
assert skip2 == 0
|
||||
assert read2 > 0
|
||||
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=regular.*', check1)
|
||||
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
|
||||
|
||||
def check2(skip1, read1, skip2, read2):
|
||||
assert skip1 == skip2
|
||||
@@ -665,7 +665,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
def check3(skip1, read1, skip2, read2):
|
||||
assert skip1 < skip2
|
||||
assert read1 == read2
|
||||
await do_repair_and_check('regular', 1, rf'Starting tablet repair by API .* incremental_mode=regular.*', check3)
|
||||
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check3)
|
||||
|
||||
def check4(skip1, read1, skip2, read2):
|
||||
assert skip1 == skip2
|
||||
|
||||
@@ -236,15 +236,20 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
|
||||
for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers}")
|
||||
for idx, srv in enumerate(servers):
|
||||
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
|
||||
for idx, srv in enumerate(servers[:2]):
|
||||
await manager.server_start(srv.server_id)
|
||||
|
||||
# Make sure that the first two nodes restart without group 0 quorum.
|
||||
if idx < 2:
|
||||
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, "
|
||||
"there is no raft quorum, total voters count 5, "
|
||||
f"alive voters count {idx + 1}"):
|
||||
await read_barrier(manager.api, srv.ip_addr)
|
||||
else:
|
||||
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, "
|
||||
"there is no raft quorum, total voters count 5, "
|
||||
f"alive voters count {idx + 1}"):
|
||||
await read_barrier(manager.api, srv.ip_addr)
|
||||
|
||||
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
|
||||
# times out.
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
|
||||
for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
|
||||
for srv in servers[2:]:
|
||||
await manager.server_start(srv.server_id)
|
||||
await read_barrier(manager.api, srv.ip_addr)
|
||||
|
||||
@@ -307,12 +307,23 @@ async def test_alter_base_schema_while_build_in_progress(manager: ManagerClient,
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode("release", "error injections are not supported in release mode")
|
||||
async def test_change_rf_while_build_in_progress(manager: ManagerClient, change: str):
|
||||
node_count = 4
|
||||
servers = await manager.servers_add(node_count, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
|
||||
if change == "increase":
|
||||
node_count = 2
|
||||
rack_layout = ["rack1", "rack2"]
|
||||
elif change == "decrease":
|
||||
node_count = 3
|
||||
rack_layout = ["rack1", "rack1", "rack2"]
|
||||
else:
|
||||
assert False
|
||||
|
||||
property_file = [{"dc": "dc1", "rack": rack} for rack in rack_layout]
|
||||
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
|
||||
property_file=property_file)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
rf = 3
|
||||
rf = node_count - 1
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
await populate_base_table(cql, ks, "tab")
|
||||
@@ -326,7 +337,7 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
|
||||
await wait_for_some_view_build_tasks_to_get_stuck(manager, marks)
|
||||
|
||||
new_rf = rf + 1 if change == "increase" else rf - 1
|
||||
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'datacenter1': {new_rf}}}")
|
||||
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {new_rf}}}")
|
||||
|
||||
await unpause_view_building_tasks(manager)
|
||||
|
||||
@@ -337,8 +348,18 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode("release", "error injections are not supported in release mode")
|
||||
async def test_node_operation_during_view_building(manager: ManagerClient, operation: str):
|
||||
node_count = 4 if operation == "remove" or operation == "decommission" else 3
|
||||
servers = await manager.servers_add(node_count, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
|
||||
if operation == "remove" or operation == "decommission":
|
||||
node_count = 4
|
||||
rack_layout = ["rack1", "rack2", "rack3", "rack3"]
|
||||
else:
|
||||
node_count = 3
|
||||
rack_layout = ["rack1", "rack2", "rack3"]
|
||||
|
||||
property_file = [{"dc": "dc1", "rack": rack} for rack in rack_layout]
|
||||
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"},
|
||||
cmdline=cmdline_loggers,
|
||||
property_file=property_file)
|
||||
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
@@ -354,7 +375,8 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
|
||||
await wait_for_some_view_build_tasks_to_get_stuck(manager, marks)
|
||||
|
||||
if operation == "add":
|
||||
await manager.server_add(config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
|
||||
property_file = servers[-1].property_file()
|
||||
await manager.server_add(config={"enable_tablets": "true"}, cmdline=cmdline_loggers, property_file=property_file)
|
||||
node_count = node_count + 1
|
||||
elif operation == "remove":
|
||||
await manager.server_stop_gracefully(servers[-1].server_id)
|
||||
@@ -364,9 +386,11 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
|
||||
await manager.decommission_node(servers[-1].server_id)
|
||||
node_count = node_count - 1
|
||||
elif operation == "replace":
|
||||
property_file = servers[-1].property_file()
|
||||
await manager.server_stop_gracefully(servers[-1].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[-1].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(replace_cfg, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
|
||||
await manager.server_add(replace_cfg, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
|
||||
property_file=property_file)
|
||||
|
||||
await unpause_view_building_tasks(manager)
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "auth/standard_role_manager.hh"
|
||||
#include "auth/ldap_role_manager.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
@@ -407,7 +408,7 @@ SEASTAR_TEST_CASE(ldap_delegates_query_all) {
|
||||
auto m = make_ldap_manager(env);
|
||||
m->start().get();
|
||||
create_ldap_roles(env, *m);
|
||||
const auto roles = m->query_all().get();
|
||||
const auto roles = m->query_all(auth::internal_distributed_query_state()).get();
|
||||
BOOST_REQUIRE_EQUAL(1, roles.count("role1"));
|
||||
BOOST_REQUIRE_EQUAL(1, roles.count("role2"));
|
||||
BOOST_REQUIRE_EQUAL(1, roles.count("jsmith"));
|
||||
@@ -442,7 +443,7 @@ SEASTAR_TEST_CASE(ldap_delegates_attributes) {
|
||||
do_with_mc(env, [&] (service::group0_batch& b) {
|
||||
m->create("r", auth::role_config{}, b).get();
|
||||
});
|
||||
BOOST_REQUIRE(!m->get_attribute("r", "a").get());
|
||||
BOOST_REQUIRE(!m->get_attribute("r", "a", auth::internal_distributed_query_state()).get());
|
||||
do_with_mc(env, [&] (service::group0_batch& b) {
|
||||
m->set_attribute("r", "a", "3", b).get();
|
||||
});
|
||||
@@ -451,7 +452,7 @@ SEASTAR_TEST_CASE(ldap_delegates_attributes) {
|
||||
do_with_mc(env, [&] (service::group0_batch& b) {
|
||||
m->remove_attribute("r", "a", b).get();
|
||||
});
|
||||
BOOST_REQUIRE(!m->get_attribute("r", "a").get());
|
||||
BOOST_REQUIRE(!m->get_attribute("r", "a", auth::internal_distributed_query_state()).get());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1098,6 +1098,11 @@ private:
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
_db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), _token_metadata.local().get());
|
||||
|
||||
// Materialized views and secondary indexes are still restricted and require specific configuration
|
||||
// options to work. Make sure that if there are existing views or indexes, they don't violate
|
||||
// the requirements imposed on them.
|
||||
_db.local().validate_tablet_views_indexes();
|
||||
|
||||
utils::loading_cache_config perm_cache_config;
|
||||
perm_cache_config.max_size = cfg->permissions_cache_max_entries();
|
||||
perm_cache_config.expiry = std::chrono::milliseconds(cfg->permissions_validity_in_ms());
|
||||
|
||||
@@ -424,7 +424,7 @@ def test_repair_keyspace(nodetool):
|
||||
]},
|
||||
["error processing arguments: nodetool cluster repair repairs only tablet keyspaces. To repair vnode keyspaces use nodetool repair."])
|
||||
|
||||
@pytest.mark.parametrize("mode", ["disabled", "regular", "full"])
|
||||
@pytest.mark.parametrize("mode", ["disabled", "incremental", "full"])
|
||||
def test_repair_incremenatal_repair(nodetool, mode):
|
||||
id1 = "ef1b7a61-66c8-494c-bb03-6f65724e6eee"
|
||||
res = nodetool("cluster", "repair", "--incremental-mode", mode, "ks", "table1", expected_requests=[
|
||||
|
||||
@@ -563,9 +563,9 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
|
||||
|
||||
if (vm.contains("incremental-mode")) {
|
||||
auto mode = vm["incremental-mode"].as<sstring>();
|
||||
const std::unordered_set<sstring> supported_mode{"disabled", "regular", "full"};
|
||||
const std::unordered_set<sstring> supported_mode{"disabled", "incremental", "full"};
|
||||
if (!supported_mode.contains(mode)) {
|
||||
throw std::invalid_argument("nodetool cluster repair --incremental-mode only supports: disabled, regular, full");
|
||||
throw std::invalid_argument("nodetool cluster repair --incremental-mode only supports: disabled, incremental, full");
|
||||
}
|
||||
repair_params["incremental_mode"] = mode;
|
||||
}
|
||||
@@ -3729,7 +3729,7 @@ For more information, see: {}"
|
||||
typed_option<std::vector<sstring>>("in-dc", "Constrain repair to specific datacenter(s)"),
|
||||
typed_option<std::vector<sstring>>("in-hosts", "Constrain repair to the specific host(s)"),
|
||||
typed_option<std::vector<sstring>>("tablet-tokens", "Tokens owned by the tablets to repair."),
|
||||
typed_option<sstring>("incremental-mode", "Specify the incremental repair mode: disabled, regular, full"),
|
||||
typed_option<sstring>("incremental-mode", "Specify the incremental repair mode: disabled, incremental, full"),
|
||||
},
|
||||
{
|
||||
typed_option<sstring>("keyspace", "The keyspace to repair, if missing all keyspaces are repaired", 1),
|
||||
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
}
|
||||
named_value(config_file* file, std::string_view name, liveness liveness_, value_status vs, const T& t = T(), std::string_view desc = {},
|
||||
std::initializer_list<T> allowed_values = {})
|
||||
: named_value(file, name, {}, liveness_, vs, t, desc) {
|
||||
: named_value(file, name, {}, liveness_, vs, t, desc, std::move(allowed_values)) {
|
||||
}
|
||||
named_value(config_file* file, std::string_view name, std::string_view alias, value_status vs, const T& t = T(), std::string_view desc = {},
|
||||
std::initializer_list<T> allowed_values = {})
|
||||
|
||||
@@ -235,6 +235,11 @@ void client::group_client::register_metrics(std::string class_name, std::string
|
||||
sm::description("Total time spend writing data to objects"), {ep_label, sg_label}),
|
||||
sm::make_counter("total_read_prefetch_bytes", [this] { return prefetch_bytes; },
|
||||
sm::description("Total number of bytes requested from object"), {ep_label, sg_label}),
|
||||
sm::make_counter("downloads_blocked_on_memory",
|
||||
[this] { return downloads_blocked_on_memory; },
|
||||
sm::description("Counts the number of times S3 client downloads were delayed due to insufficient memory availability"),
|
||||
{ep_label, sg_label})
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1142,13 +1147,15 @@ class client::chunked_download_source final : public seastar::data_source_impl {
|
||||
s3l.trace("Fiber starts cycle for object '{}'", _object_name);
|
||||
while (!_is_finished) {
|
||||
try {
|
||||
if (_buffers_size >= _max_buffers_size * _buffers_low_watermark) {
|
||||
co_await _bg_fiber_cv.when([this] { return _buffers_size < _max_buffers_size * _buffers_low_watermark; });
|
||||
if (!_is_finished && _buffers_size >= _max_buffers_size * _buffers_low_watermark) {
|
||||
co_await _bg_fiber_cv.when([this] { return _is_finished || (_buffers_size < _max_buffers_size * _buffers_low_watermark); });
|
||||
}
|
||||
|
||||
if (auto units = try_get_units(_client->_memory, _socket_buff_size); !_buffers.empty() && !units) {
|
||||
if (auto units = try_get_units(_client->_memory, _socket_buff_size); !_is_finished && !_buffers.empty() && !units) {
|
||||
auto& gc = _client->find_or_create_client();
|
||||
++gc.downloads_blocked_on_memory;
|
||||
co_await _bg_fiber_cv.when([this] {
|
||||
return _buffers.empty() || try_get_units(_client->_memory, _socket_buff_size);
|
||||
return _is_finished || _buffers.empty() || try_get_units(_client->_memory, _socket_buff_size);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -131,6 +131,7 @@ class client : public enable_shared_from_this<client> {
|
||||
io_stats read_stats;
|
||||
io_stats write_stats;
|
||||
uint64_t prefetch_bytes = 0;
|
||||
uint64_t downloads_blocked_on_memory = 0;
|
||||
seastar::metrics::metric_groups metrics;
|
||||
group_client(std::unique_ptr<http::experimental::connection_factory> f, unsigned max_conn, const aws::retry_strategy& retry_strategy);
|
||||
void register_metrics(std::string class_name, std::string host);
|
||||
|
||||
Reference in New Issue
Block a user