Compare commits
1 Commits
copilot/fi
...
alert-auto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
903024e569 |
2
.github/workflows/docs-pages.yaml
vendored
2
.github/workflows/docs-pages.yaml
vendored
@@ -18,6 +18,8 @@ on:
|
||||
|
||||
jobs:
|
||||
release:
|
||||
permissions:
|
||||
contents: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
@@ -169,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -708,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -989,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
ret.emplace_back(r.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -108,7 +107,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -100,8 +100,9 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
const auto route_entries = parse_set_client_array(root);
|
||||
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_await cr.local().set_client_routes(route_entries);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
@@ -131,7 +132,8 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
const auto route_keys = parse_delete_client_array(root);
|
||||
co_await cr.local().delete_client_routes(route_keys);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,9 +10,7 @@
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
enum class client_type {
|
||||
@@ -29,20 +27,6 @@ enum class client_connection_stage {
|
||||
ready,
|
||||
};
|
||||
|
||||
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
|
||||
struct options_cache_value_type {};
|
||||
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
|
||||
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
|
||||
using client_options_cache_key_type = client_options_cache_type::key_type;
|
||||
|
||||
// This struct represents a single OPTION key-value pair from the client's connection options.
|
||||
// Both key and value are represented by corresponding "references" to their cached values.
|
||||
// Each "reference" is effectively a lw_shared_ptr value.
|
||||
struct client_option_key_value_cached_entry {
|
||||
client_options_cache_entry_type key;
|
||||
client_options_cache_entry_type value;
|
||||
};
|
||||
|
||||
sstring to_string(client_connection_stage ct);
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
@@ -53,8 +37,8 @@ struct client_data {
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
std::optional<client_options_cache_entry_type> driver_name;
|
||||
std::optional<client_options_cache_entry_type> driver_version;
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
std::optional<int32_t> protocol_version;
|
||||
std::optional<sstring> ssl_cipher_suite;
|
||||
@@ -62,7 +46,6 @@ struct client_data {
|
||||
std::optional<sstring> ssl_protocol;
|
||||
std::optional<sstring> username;
|
||||
std::optional<sstring> scheduling_group_name;
|
||||
std::list<client_option_key_value_cached_entry> client_options;
|
||||
|
||||
sstring stage_str() const { return to_string(connection_stage); }
|
||||
sstring client_type_str() const { return to_string(ct); }
|
||||
|
||||
@@ -125,6 +125,10 @@ if(target_arch)
|
||||
add_compile_options("-march=${target_arch}")
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
|
||||
endif()
|
||||
|
||||
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
|
||||
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")
|
||||
|
||||
@@ -2251,6 +2251,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
|
||||
if debuginfo and mode_config['can_have_debug_info']:
|
||||
cxxflags += ['-g', '-gz']
|
||||
|
||||
if 'clang' in cxx:
|
||||
# Since AssignmentTracking was enabled by default in clang
|
||||
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
|
||||
# coroutine frame debugging info (`coro_frame_ty`) is broken.
|
||||
#
|
||||
# It seems that we aren't losing much by disabling AssigmentTracking,
|
||||
# so for now we choose to disable it to get `coro_frame_ty` back.
|
||||
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
|
||||
|
||||
return cxxflags
|
||||
|
||||
|
||||
|
||||
@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
|
||||
} catch (...) {
|
||||
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
|
||||
sleep = true;
|
||||
}
|
||||
|
||||
if (sleep) {
|
||||
vbw_logger.debug("Sleeping after exception.");
|
||||
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
|
||||
|
||||
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
|
||||
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
|
||||
auto it = vbw._state._batch->tasks.begin();
|
||||
while (it != vbw._state._batch->tasks.end()) {
|
||||
auto id = it->first;
|
||||
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
|
||||
|
||||
++it; // Advance the iterator before potentially removing the entry from the map.
|
||||
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
|
||||
for (auto& [id, t]: tasks_map) {
|
||||
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
|
||||
if (!task_opt || task_opt->get().aborted) {
|
||||
co_await vbw._state._batch->abort_task(id);
|
||||
}
|
||||
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
if (processing_base_table != building_state.currently_processed_base_table) {
|
||||
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
|
||||
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
tasks.insert({id, *task_opt});
|
||||
}
|
||||
#ifdef SEASTAR_DEBUG
|
||||
{
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
co_return collect_completed_tasks();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
|
||||
.with_column("ssl_protocol", utf8_type)
|
||||
.with_column("username", utf8_type)
|
||||
.with_column("scheduling_group", utf8_type)
|
||||
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Collect
|
||||
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
|
||||
using client_data_vec = utils::chunked_vector<client_data>;
|
||||
using shard_client_data = std::vector<client_data_vec>;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
|
||||
cd_vec.resize(smp::count);
|
||||
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
|
||||
for (unsigned i = 0; i < smp::count; i++) {
|
||||
for (auto&& ps_cdc : *cd_vec[i]) {
|
||||
for (auto&& cd : ps_cdc) {
|
||||
if (cd_map.contains(cd->ip)) {
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
if (cd_map.contains(cd.ip)) {
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
} else {
|
||||
dht::decorated_key key = make_partition_key(cd->ip);
|
||||
dht::decorated_key key = make_partition_key(cd.ip);
|
||||
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
|
||||
ips.insert(decorated_ip{std::move(key), cd->ip});
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
ips.insert(decorated_ip{std::move(key), cd.ip});
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
|
||||
co_await result.emit_partition_start(dip.key);
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
|
||||
return a.port < b.port || a.client_type_str() < b.client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd->shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd->stage_str());
|
||||
if (cd->driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
|
||||
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd.shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd.stage_str());
|
||||
if (cd.driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", *cd.driver_name);
|
||||
}
|
||||
if (cd->driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
|
||||
if (cd.driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", *cd.driver_version);
|
||||
}
|
||||
if (cd->hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd->hostname);
|
||||
if (cd.hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd.hostname);
|
||||
}
|
||||
if (cd->protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
|
||||
if (cd.protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
|
||||
}
|
||||
if (cd->ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
|
||||
if (cd.ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
|
||||
}
|
||||
if (cd->ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
|
||||
if (cd.ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
|
||||
}
|
||||
if (cd->ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
|
||||
if (cd.ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
|
||||
}
|
||||
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
|
||||
if (cd->scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
|
||||
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
|
||||
if (cd.scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(
|
||||
utf8_type,
|
||||
utf8_type,
|
||||
false
|
||||
);
|
||||
|
||||
auto prepare_client_options = [] (const auto& client_options) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& co: client_options) {
|
||||
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
};
|
||||
|
||||
set_cell(cr.cells(), "client_options",
|
||||
make_map_value(map_type, prepare_client_options(cd->client_options)));
|
||||
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
|
||||
@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
|
||||
|
||||
@@ -74,8 +74,6 @@ The keys and values are:
|
||||
as an indicator to which shard client wants to connect. The desired shard number
|
||||
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
|
||||
Its value is a decimal representation of type `uint16_t`, by default `19142`.
|
||||
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
|
||||
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
|
||||
|
||||
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
|
||||
`biased-token-round-robin`. To apply the algorithm,
|
||||
|
||||
@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
|
||||
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
|
||||
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
|
||||
|
||||
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
|
||||
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
|
||||
- a keyspace/view was dropped
|
||||
- tablet operations (see [tablet operations section](#tablet-operations))
|
||||
In the first case we simply delete relevant view building tasks as they are no longer needed.
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
|
||||
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
|
||||
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
|
||||
|
||||
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,
|
||||
|
||||
@@ -17,7 +17,6 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
Workload Prioritization </features/workload-prioritization>
|
||||
Backup and Restore </features/backup-and-restore>
|
||||
Incremental Repair </features/incremental-repair/>
|
||||
Vector Search </features/vector-search/>
|
||||
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
@@ -44,5 +43,3 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
|
||||
efficient and lightweight approach to maintaining data consistency by
|
||||
repairing only the data that has changed since the last repair.
|
||||
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
|
||||
similarity-based queries on vector embeddings.
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
=================================
|
||||
Vector Search in ScyllaDB
|
||||
=================================
|
||||
|
||||
.. note::
|
||||
|
||||
This feature is currently available only in `ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
What Is Vector Search
|
||||
-------------------------
|
||||
|
||||
Vector Search enables similarity-based queries over high-dimensional data,
|
||||
such as text, images, audio, or user behavior. Instead of searching for exact
|
||||
matches, it allows applications to find items that are semantically similar to
|
||||
a given input.
|
||||
|
||||
To do this, Vector Search works on vector embeddings, which are numerical
|
||||
representations of data that capture semantic meaning. This enables queries
|
||||
such as:
|
||||
|
||||
* “Find documents similar to this paragraph”
|
||||
* “Find products similar to what the user just viewed”
|
||||
* “Find previous tickets related to this support request”
|
||||
|
||||
Rather than relying on exact values or keywords, Vector Search returns results
|
||||
based on distance or similarity between vectors. This capability is
|
||||
increasingly used in modern workloads such as AI-powered search, recommendation
|
||||
systems, and retrieval-augmented generation (RAG).
|
||||
|
||||
Why Vector Search Matters
|
||||
------------------------------------
|
||||
|
||||
Many applications already rely on ScyllaDB for high throughput, low and
|
||||
predictable latency, and large-scale data storage.
|
||||
|
||||
Vector Search complements these strengths by enabling new classes of workloads,
|
||||
including:
|
||||
|
||||
* Semantic search over text or documents
|
||||
* Recommendations based on user or item similarity
|
||||
* AI and ML applications, including RAG pipelines
|
||||
* Anomaly and pattern detection
|
||||
|
||||
With Vector Search, ScyllaDB can serve as the similarity search backend for
|
||||
AI-driven applications.
|
||||
|
||||
Availability
|
||||
--------------
|
||||
|
||||
Vector Search is currently available only in ScyllaDB Cloud, the fully managed
|
||||
ScyllaDB service.
|
||||
|
||||
|
||||
👉 For details on using Vector Search, refer to the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/index.html>`_.
|
||||
@@ -20,10 +20,7 @@ You can run your ScyllaDB workloads on AWS, GCE, and Azure using a ScyllaDB imag
|
||||
Amazon Web Services (AWS)
|
||||
-----------------------------
|
||||
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`,
|
||||
:ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`,
|
||||
:ref:`i7ie <system-requirements-i7ie-instances>`, :ref:`i8g<system-requirements-i8g-instances>`,
|
||||
and :ref:`i8ge <system-requirements-i8ge-instances>`.
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`, :ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`, and :ref:`i7ie <system-requirements-i7ie-instances>`.
|
||||
|
||||
.. note::
|
||||
|
||||
@@ -198,118 +195,6 @@ All i7i instances have the following specs:
|
||||
|
||||
See `Amazon EC2 I7i Instances <https://aws.amazon.com/ec2/instance-types/i7i/>`_ for details.
|
||||
|
||||
|
||||
.. _system-requirements-i8g-instances:
|
||||
|
||||
i8g instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8g instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8g.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 468 GB
|
||||
* - i8g.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 937 GB
|
||||
* - i8g.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 1 x 1,875 GB
|
||||
* - i8g.4xlarge
|
||||
- 16
|
||||
- 128
|
||||
- 1 x 3,750 GB
|
||||
* - i8g.8xlarge
|
||||
- 32
|
||||
- 256
|
||||
- 2 x 3,750 GB
|
||||
* - i8g.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 3 x 3,750 GB
|
||||
* - i8g.16xlarge
|
||||
- 64
|
||||
- 512
|
||||
- 4 x 3,750 GB
|
||||
|
||||
All i8g instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 100 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 45 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
.. _system-requirements-i8ge-instances:
|
||||
|
||||
i8ge instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8ge instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8ge.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 1,250 GB
|
||||
* - i8ge.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 2,500 GB
|
||||
* - i8ge.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 2 x 2,500 GB
|
||||
* - i8ge.3xlarge
|
||||
- 12
|
||||
- 96
|
||||
- 1 x 7,500 GB
|
||||
* - i8ge.6xlarge
|
||||
- 24
|
||||
- 192
|
||||
- 2 x 7,500 GB
|
||||
* - i8ge.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 4 x 7,500 GB
|
||||
* - i8ge.18xlarge
|
||||
- 72
|
||||
- 576
|
||||
- 6 x 7,500 GB
|
||||
|
||||
All i8ge instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 300 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 120 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
|
||||
Im4gn and Is4gen instances
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
ScyllaDB supports Arm-based Im4gn and Is4gen instances. See `Amazon EC2 Im4gn and Is4gen instances <https://aws.amazon.com/ec2/instance-types/i4g/>`_ for specification details.
|
||||
|
||||
@@ -176,7 +176,7 @@ void fsm::become_leader() {
|
||||
|
||||
_last_election_time = _clock.now();
|
||||
_ping_leader = false;
|
||||
// a new leader needs to commit at least one entry to make sure that
|
||||
// a new leader needs to commit at lease one entry to make sure that
|
||||
// all existing entries in its log are committed as well. Also it should
|
||||
// send append entries RPC as soon as possible to establish its leadership
|
||||
// (3.4). Do both of those by committing a dummy entry.
|
||||
|
||||
@@ -3385,15 +3385,16 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
continue;
|
||||
}
|
||||
|
||||
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
||||
while (auto de = lister.get().get()) {
|
||||
auto snapshot_name = de->name;
|
||||
lister::scan_dir(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [datadir, &all_snapshots] (fs::path snapshots_dir, directory_entry de) {
|
||||
auto snapshot_name = de.name;
|
||||
all_snapshots.emplace(snapshot_name, snapshot_details());
|
||||
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
}
|
||||
return get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).then([&all_snapshots, snapshot_name] (auto details) {
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
return all_snapshots;
|
||||
});
|
||||
@@ -3401,61 +3402,38 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
|
||||
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
|
||||
table::snapshot_details details{};
|
||||
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
|
||||
if (!co_await file_exists(staging_dir->native())) {
|
||||
staging_dir.reset();
|
||||
}
|
||||
|
||||
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
while (auto de = co_await lister.get()) {
|
||||
const auto& name = de->name;
|
||||
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
|
||||
// See https://github.com/scylladb/seastar/pull/3163
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
|
||||
co_await lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [datadir, &details] (fs::path snapshot_dir, directory_entry de) -> future<> {
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / de.name).native(), follow_symlink::no);
|
||||
auto size = sd.allocated_size;
|
||||
|
||||
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
|
||||
//
|
||||
// All the others should just generate an exception: there is something wrong, so don't blindly
|
||||
// add it to the size.
|
||||
if (name != "manifest.json" && name != "schema.cql") {
|
||||
if (de.name != "manifest.json" && de.name != "schema.cql") {
|
||||
details.total += size;
|
||||
if (sd.number_of_links == 1) {
|
||||
// File exists only in the snapshot directory.
|
||||
details.live += size;
|
||||
continue;
|
||||
}
|
||||
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
|
||||
// So check the datadir for the file too.
|
||||
} else {
|
||||
continue;
|
||||
size = 0;
|
||||
}
|
||||
|
||||
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
|
||||
try {
|
||||
try {
|
||||
// File exists in the main SSTable directory. Snapshots are not contributing to size
|
||||
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
|
||||
auto psd = co_await io_check(file_stat, (datadir / de.name).native(), follow_symlink::no);
|
||||
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
|
||||
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
|
||||
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
|
||||
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
co_return false;
|
||||
(datadir / de.name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / de.name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
details.live += size;
|
||||
}
|
||||
co_return true;
|
||||
} catch (std::system_error& e) {
|
||||
} catch (std::system_error& e) {
|
||||
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
||||
throw;
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
};
|
||||
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
|
||||
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
|
||||
!co_await exists_in_dir(datadir / name)) {
|
||||
details.live += size;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
co_return details;
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ seastar::future<> service::client_routes_service::set_client_routes_inner(const
|
||||
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
for (const auto& entry : route_entries) {
|
||||
for (auto& entry : route_entries) {
|
||||
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
|
||||
cmuts.emplace_back(std::move(mut));
|
||||
}
|
||||
@@ -103,24 +103,24 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
|
||||
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_entries = std::move(route_entries)] {
|
||||
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.set_client_routes_inner(route_entries);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_keys = std::move(route_keys)] {
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.delete_client_routes_inner(route_keys);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
seastar::future<> service::client_routes_service::with_retry(Func func) const {
|
||||
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
|
||||
int retries = 10;
|
||||
while (true) {
|
||||
try {
|
||||
|
||||
@@ -66,8 +66,8 @@ public:
|
||||
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
|
||||
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
|
||||
future<std::vector<client_route_entry>> get_client_routes() const;
|
||||
seastar::future<> set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries);
|
||||
seastar::future<> delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys);
|
||||
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
|
||||
|
||||
// notifications
|
||||
@@ -76,7 +76,7 @@ private:
|
||||
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
template <typename Func>
|
||||
seastar::future<> with_retry(Func func) const;
|
||||
seastar::future<> with_retry(Func&& func) const;
|
||||
|
||||
abort_source& _abort_source;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
@@ -344,17 +344,3 @@ void service::client_state::update_per_service_level_params(qos::service_level_o
|
||||
|
||||
_workload_type = slo.workload;
|
||||
}
|
||||
|
||||
future<> service::client_state::set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options) {
|
||||
for (const auto& [key, value] : client_options) {
|
||||
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "client_data.hh"
|
||||
|
||||
#include "transport/cql_protocol_extension.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -103,8 +102,7 @@ private:
|
||||
private volatile String keyspace;
|
||||
#endif
|
||||
std::optional<auth::authenticated_user> _user;
|
||||
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
|
||||
std::list<client_option_key_value_cached_entry> _client_options;
|
||||
std::optional<sstring> _driver_name, _driver_version;
|
||||
|
||||
auth_state _auth_state = auth_state::UNINITIALIZED;
|
||||
bool _control_connection = false;
|
||||
@@ -153,33 +151,18 @@ public:
|
||||
return _control_connection = true;
|
||||
}
|
||||
|
||||
std::optional<client_options_cache_entry_type> get_driver_name() const {
|
||||
std::optional<sstring> get_driver_name() const {
|
||||
return _driver_name;
|
||||
}
|
||||
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
|
||||
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
void set_driver_name(sstring driver_name) {
|
||||
_driver_name = std::move(driver_name);
|
||||
}
|
||||
|
||||
const auto& get_client_options() const {
|
||||
return _client_options;
|
||||
}
|
||||
|
||||
future<> set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options);
|
||||
|
||||
std::optional<client_options_cache_entry_type> get_driver_version() const {
|
||||
std::optional<sstring> get_driver_version() const {
|
||||
return _driver_version;
|
||||
}
|
||||
future<> set_driver_version(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const sstring& driver_version)
|
||||
{
|
||||
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
void set_driver_version(sstring driver_version) {
|
||||
_driver_version = std::move(driver_version);
|
||||
}
|
||||
|
||||
client_state(external_tag,
|
||||
|
||||
@@ -131,9 +131,8 @@ async def test_backup_move(manager: ManagerClient, object_storage, move_files):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("ne_parameter", [ "endpoint", "bucket", "snapshot" ])
|
||||
async def test_backup_with_non_existing_parameters(manager: ManagerClient, object_storage, ne_parameter):
|
||||
'''backup should fail if either of the parameters does not exist'''
|
||||
async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the destination bucket does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
@@ -143,8 +142,7 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
backup_snap_name = 'backup'
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name = backup_snap_name)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
@@ -152,18 +150,39 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf,
|
||||
backup_snap_name if ne_parameter != 'snapshot' else 'no-such-snapshot',
|
||||
object_storage.address if ne_parameter != 'endpoint' else 'no-such-endpoint',
|
||||
object_storage.bucket_name if ne_parameter != 'bucket' else 'no-such-bucket',
|
||||
prefix)
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', object_storage.address, "non-existant-bucket", prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
if ne_parameter == 'endpoint':
|
||||
assert status['error'] == 'std::invalid_argument (endpoint no-such-endpoint not found)'
|
||||
#assert 'S3 request failed. Code: 15. Reason: Access Denied.' in status['error']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_endpoint(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the endpoint is invalid/inaccessible'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', "does_not_exist", object_storage.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
assert status['error'] == 'std::invalid_argument (endpoint does_not_exist not found)'
|
||||
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
@@ -217,6 +236,38 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the snapshot does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'nonexistent-snapshot',
|
||||
object_storage.address, object_storage.bucket_name, prefix)
|
||||
# The task is expected to fail immediately due to invalid snapshot name.
|
||||
# However, since internal implementation details may change, we'll wait for
|
||||
# task completion if immediate failure doesn't occur.
|
||||
actual_state = None
|
||||
for status_api in [manager.api.get_task_status,
|
||||
manager.api.wait_task]:
|
||||
status = await status_api(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
actual_state = status['state']
|
||||
if actual_state == 'failed':
|
||||
break
|
||||
else:
|
||||
assert actual_state == 'failed'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, object_storage):
|
||||
|
||||
@@ -74,6 +74,7 @@ def test_cast_int_literal_with_type_hint_to_blob(cql, table1, scylla_only):
|
||||
# An int can always be converted to a valid blob, but blobs might have wrong amount of bytes
|
||||
# and can't be converted to a valid int.
|
||||
def test_cast_blob_literal_to_int(cql, table1):
|
||||
pk = unique_key_int()
|
||||
with pytest.raises(InvalidRequest, match='HEX'):
|
||||
cql.execute(f"INSERT INTO {table1} (pk) VALUES (0xBAAAAAAD)")
|
||||
with pytest.raises(InvalidRequest, match='blob'):
|
||||
|
||||
@@ -61,7 +61,7 @@ def test_select_default_order(cql, table_int_desc):
|
||||
def test_multi_column_relation_desc(cql, table2):
|
||||
k = unique_key_int()
|
||||
stmt = cql.prepare(f'INSERT INTO {table2} (p, c1, c2) VALUES (?, ?, ?)')
|
||||
cql.execute(stmt, [k, 1, 0])
|
||||
cql.execute(stmt, [k, 1, 1])
|
||||
cql.execute(stmt, [k, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = {k} AND (c1, c2) >= (1, 1)'))
|
||||
cql.execute(stmt, [0, 1, 0])
|
||||
cql.execute(stmt, [0, 1, 1])
|
||||
cql.execute(stmt, [0, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = 0 AND (c1, c2) >= (1, 1)'))
|
||||
|
||||
@@ -352,7 +352,7 @@ def test_storage_options_alter_type(cql, scylla_only):
|
||||
ksdef_local = "WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : '1' } " \
|
||||
"AND STORAGE = { 'type' : 'S3', 'bucket' : '/b1', 'endpoint': 'localhost'}"
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
res = cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
|
||||
# Reproducer for scylladb#14139
|
||||
def test_alter_keyspace_preserves_udt(cql):
|
||||
|
||||
@@ -171,6 +171,7 @@ def test_grant_revoke_data_permissions(cql, test_keyspace):
|
||||
# Test that permissions for user-defined functions are serialized in a Cassandra-compatible way
|
||||
def test_udf_permissions_serialization(cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace, new_user(cql) as user:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
# Creating a bilingual function makes this test case work for both Scylla and Cassandra
|
||||
@@ -246,6 +247,7 @@ def test_udf_permissions_quoted_names(cassandra_bug, cql):
|
||||
# permissions. Cassandra erroneously reports the unrelated missing permissions.
|
||||
# Reported to Cassandra as CASSANDRA-19005.
|
||||
def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
schema = "a int primary key"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
|
||||
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42L;'"
|
||||
@@ -286,6 +288,7 @@ def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
# Tests for ALTER are separate, because they are qualified as cassandra_bug
|
||||
def test_grant_revoke_udf_permissions(cql):
|
||||
schema = "a int primary key, b list<int>"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int, l list<int>) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
@@ -332,6 +335,7 @@ def test_grant_revoke_udf_permissions(cql):
|
||||
# and yet it's not enforced
|
||||
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
|
||||
@@ -90,6 +90,8 @@ def test_attached_service_level(scylla_only, cql):
|
||||
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
||||
|
||||
def test_list_effective_service_level(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
timeout = "10s"
|
||||
workload_type = "batch"
|
||||
|
||||
@@ -118,6 +120,8 @@ def test_list_effective_service_level(scylla_only, cql):
|
||||
assert row.value == "batch"
|
||||
|
||||
def test_list_effective_service_level_shares(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
shares1 = 500
|
||||
shares2 = 200
|
||||
|
||||
@@ -180,6 +184,8 @@ def test_default_shares_in_listings(scylla_only, cql):
|
||||
# and that the messages Scylla returns are informative.
|
||||
def test_manipulating_default_service_level(cql, scylla_only):
|
||||
default_sl = "default"
|
||||
# Service levels are case-sensitive (if used with quotation marks).
|
||||
fake_default_sl = '"DeFaUlT"'
|
||||
|
||||
with new_user(cql) as role:
|
||||
# Creation.
|
||||
|
||||
@@ -76,7 +76,6 @@ def test_clients(scylla_only, cql):
|
||||
'ssl_enabled',
|
||||
'ssl_protocol',
|
||||
'username',
|
||||
'client_options',
|
||||
])
|
||||
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
|
||||
# There must be at least one connection - the one that sent this SELECT
|
||||
@@ -85,9 +84,6 @@ def test_clients(scylla_only, cql):
|
||||
for cl in cls:
|
||||
assert(cl[0] == '127.0.0.1')
|
||||
assert(cl[2] == 'cql')
|
||||
client_options = cl[13]
|
||||
assert(client_options.get('DRIVER_NAME') == cl[4])
|
||||
assert(client_options.get('DRIVER_VERSION') == cl[5])
|
||||
|
||||
# We only want to check that the table exists with the listed columns, to assert
|
||||
# backwards compatibility.
|
||||
|
||||
@@ -23,7 +23,7 @@ def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
|
||||
try:
|
||||
f42 = unique_name()
|
||||
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
|
||||
cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
|
||||
except NoHostAvailable as err:
|
||||
if "not enabled" in str(err):
|
||||
@@ -373,7 +373,8 @@ def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
assert len(res) == 1 and res[0].result == 177147
|
||||
|
||||
# Test that only compilable input is accepted
|
||||
def test_compilable(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
|
||||
"""
|
||||
@@ -383,7 +384,8 @@ Dear wasmtime compiler, please return a function which returns its float argumen
|
||||
|
||||
# Test that not exporting a function with matching name
|
||||
# results in an error
|
||||
def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
@@ -401,7 +403,8 @@ def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
|
||||
f"AS '{wrong_source}'")
|
||||
|
||||
# Test that trying to use something that is exported, but is not a function, won't work
|
||||
def test_not_a_function(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
|
||||
@@ -49,9 +49,6 @@ RUN_ID = pytest.StashKey[int]()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Store pytest config globally so we can access it in hooks that only receive report
|
||||
_pytest_config: pytest.Config | None = None
|
||||
|
||||
|
||||
def pytest_addoption(parser: pytest.Parser) -> None:
|
||||
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
|
||||
@@ -187,52 +184,6 @@ def pytest_sessionstart(session: pytest.Session) -> None:
|
||||
)
|
||||
|
||||
|
||||
@pytest.hookimpl(trylast=True)
|
||||
def pytest_runtest_logreport(report):
|
||||
"""Add custom XML attributes to JUnit testcase elements.
|
||||
|
||||
This hook wraps the node_reporter's to_xml method to add custom attributes
|
||||
when the XML element is created. This approach works with pytest-xdist because
|
||||
it modifies the XML element directly when it's generated, rather than trying
|
||||
to modify attrs before finalize() is called.
|
||||
|
||||
Attributes added:
|
||||
- function_path: The function path of the test case (excluding parameters).
|
||||
|
||||
Uses trylast=True to run after LogXML's hook has created the node_reporter.
|
||||
"""
|
||||
from _pytest.junitxml import xml_key
|
||||
|
||||
# Only process call phase
|
||||
if report.when != "call":
|
||||
return
|
||||
|
||||
# Get the XML reporter
|
||||
config = _pytest_config
|
||||
if config is None:
|
||||
return
|
||||
|
||||
xml = config.stash.get(xml_key, None)
|
||||
if xml is None:
|
||||
return
|
||||
|
||||
node_reporter = xml.node_reporter(report)
|
||||
|
||||
nodeid = report.nodeid
|
||||
function_path = f'test/{nodeid.rsplit('.', 2)[0].rsplit('[', 1)[0]}'
|
||||
|
||||
# Wrap the to_xml method to add custom attributes to the element
|
||||
original_to_xml = node_reporter.to_xml
|
||||
|
||||
def custom_to_xml():
|
||||
"""Wrapper that adds custom attributes to the testcase element."""
|
||||
element = original_to_xml()
|
||||
element.set("function_path", function_path)
|
||||
return element
|
||||
|
||||
node_reporter.to_xml = custom_to_xml
|
||||
|
||||
|
||||
def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
if not session.config.getoption("--test-py-init"):
|
||||
return
|
||||
@@ -245,9 +196,6 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
|
||||
|
||||
def pytest_configure(config: pytest.Config) -> None:
|
||||
global _pytest_config
|
||||
_pytest_config = config
|
||||
|
||||
config.build_modes = get_modes_to_run(config)
|
||||
|
||||
if testpy_run_id := config.getoption("--run_id"):
|
||||
|
||||
@@ -243,7 +243,7 @@ async def get_scylla_2025_1_executable(build_mode: str) -> str:
|
||||
if not unpacked_marker.exists():
|
||||
if not downloaded_marker.exists():
|
||||
archive_path.unlink(missing_ok=True)
|
||||
await run_process(["curl", "--retry", "10", "--fail", "--silent", "--show-error", "--output", archive_path, url])
|
||||
await run_process(["curl", "--silent", "--show-error", "--output", archive_path, url])
|
||||
downloaded_marker.touch()
|
||||
shutil.rmtree(unpack_dir, ignore_errors=True)
|
||||
unpack_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
@@ -2930,18 +2930,6 @@ private:
|
||||
|
||||
static constexpr elem_t magic = 54313;
|
||||
|
||||
static void check_digest_value(elem_t d) {
|
||||
if (d < 0 || d >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value out of range: {}", d));
|
||||
}
|
||||
}
|
||||
|
||||
static void validate_digest_value(elem_t d_new, elem_t d_old, elem_t x) {
|
||||
if (d_new < 0 || d_new >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value invalid after appending/removing element: d_new {}, d_old {}, x {}", d_new, d_old, x));
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
append_seq(std::vector<elem_t> v) : _seq{make_lw_shared<std::vector<elem_t>>(std::move(v))}, _end{_seq->size()}, _digest{0} {
|
||||
for (auto x : *_seq) {
|
||||
@@ -2950,26 +2938,20 @@ public:
|
||||
}
|
||||
|
||||
static elem_t digest_append(elem_t d, elem_t x) {
|
||||
check_digest_value(d);
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
|
||||
auto y = (d + x) % magic;
|
||||
SCYLLA_ASSERT(digest_remove(y, x) == d);
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
}
|
||||
|
||||
static elem_t digest_remove(elem_t d, elem_t x) {
|
||||
check_digest_value(d);
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
|
||||
auto y = (d - x) % magic;
|
||||
|
||||
if (y < 0) {
|
||||
y += magic;
|
||||
}
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
return y < 0 ? y + magic : y;
|
||||
}
|
||||
|
||||
elem_t digest() const {
|
||||
|
||||
@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
|
||||
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
|
||||
|
||||
|
||||
class RandomContentFile:
|
||||
class random_content_file:
|
||||
def __init__(self, path: str, size_in_bytes: int):
|
||||
path = pathlib.Path(path)
|
||||
self.filename = path if path.is_file() else path / str(uuid.uuid4())
|
||||
@@ -68,7 +68,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
|
||||
|
||||
try:
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
cmdline = [*global_cmdline,
|
||||
"--logger-log-level", "compaction=debug"]
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
|
||||
@@ -134,7 +134,7 @@ async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
|
||||
|
||||
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
s2_mark = await s2_log.mark()
|
||||
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
|
||||
|
||||
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
|
||||
|
||||
|
||||
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
|
||||
|
||||
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
|
||||
mark = await log.mark()
|
||||
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
|
||||
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
|
||||
await manager.server_restart(servers[0].server_id, wait_others=2)
|
||||
|
||||
|
||||
@@ -353,7 +353,7 @@ future<> controller::set_cql_ready(bool ready) {
|
||||
return _gossiper.local().add_local_application_state(gms::application_state::RPC_READY, gms::versioned_value::cql_ready(ready));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server ? _server->local().get_client_data() : protocol_server::get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ public:
|
||||
virtual future<> start_server() override;
|
||||
virtual future<> stop_server() override;
|
||||
virtual future<> request_stop_server() override;
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
future<> update_connections_scheduling_group();
|
||||
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/net/socket_defs.hh>
|
||||
#include <vector>
|
||||
#include "client_data.hh"
|
||||
@@ -44,8 +43,8 @@ public:
|
||||
/// This variant is used by the REST API so failure is acceptable.
|
||||
virtual future<> request_stop_server() = 0;
|
||||
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>>();
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<client_data>>(utils::chunked_vector<client_data>());
|
||||
}
|
||||
|
||||
protocol_server(seastar::scheduling_group sg) noexcept : _sched_group(std::move(sg)) {}
|
||||
|
||||
@@ -691,7 +691,6 @@ client_data cql_server::connection::make_client_data() const {
|
||||
cd.connection_stage = client_connection_stage::authenticating;
|
||||
}
|
||||
cd.scheduling_group_name = _current_scheduling_group.name();
|
||||
cd.client_options = _client_state.get_client_options();
|
||||
|
||||
cd.ssl_enabled = _ssl_enabled;
|
||||
cd.ssl_protocol = _ssl_protocol;
|
||||
@@ -959,17 +958,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
|
||||
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
|
||||
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
|
||||
_client_state.set_driver_version(driver_ver_opt->second);
|
||||
}
|
||||
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
|
||||
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
|
||||
_client_state.set_driver_name(driver_name_opt->second);
|
||||
}
|
||||
|
||||
// Store all received client options for later exposure in the system.clients 'client_options' column
|
||||
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
|
||||
// identical key/value sets across multiple connections (e.g., same driver name/version).
|
||||
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
|
||||
|
||||
cql_protocol_extension_enum_set cql_proto_exts;
|
||||
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
||||
if (options.contains(protocol_extension_name(ext))) {
|
||||
@@ -1653,9 +1647,6 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
|
||||
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
|
||||
opts.insert({"COMPRESSION", "lz4"});
|
||||
opts.insert({"COMPRESSION", "snappy"});
|
||||
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
|
||||
// e.g. CQL driver configuration.
|
||||
opts.insert({"CLIENT_OPTIONS", ""});
|
||||
if (_server._config.allow_shard_aware_drivers) {
|
||||
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
|
||||
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
||||
@@ -2317,11 +2308,11 @@ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata
|
||||
return _response_metadata_id.value();
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await for_each_gently([&ret] (const generic_server::connection& c) {
|
||||
const connection& conn = dynamic_cast<const connection&>(c);
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
|
||||
ret.emplace_back(conn.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -206,7 +206,6 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
std::unique_ptr<event_notifier> _notifier;
|
||||
private:
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
transport_stats _stats;
|
||||
auth::service& _auth_service;
|
||||
qos::service_level_controller& _sl_controller;
|
||||
@@ -235,7 +234,7 @@ public:
|
||||
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
future<> update_connections_scheduling_group();
|
||||
future<> update_connections_service_level_params();
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -1547,8 +1547,8 @@ void reclaim_timer::report() const noexcept {
|
||||
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
|
||||
auto info_level = _stall_detected ? log_level::info : log_level::debug;
|
||||
auto MiB = 1024*1024;
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected && !_preemptible,
|
||||
(_stall_detected && !_preemptible) ? current_backtrace() : saved_backtrace{});
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected,
|
||||
_stall_detected ? current_backtrace() : saved_backtrace{});
|
||||
|
||||
timing_logger.log(time_level, "{} took {} us, trying to release {:.3f} MiB {}preemptibly, reserve: {{goal: {}, max: {}}}{}",
|
||||
_name, (_duration + 500ns) / 1us, (float)_memory_to_release / MiB, _preemptible ? "" : "non-",
|
||||
|
||||
Reference in New Issue
Block a user