Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Kaul
903024e569 Potential fix for code scanning alert no. 144: Workflow does not contain permissions
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2025-12-22 11:58:34 +02:00
41 changed files with 249 additions and 515 deletions

View File

@@ -18,6 +18,8 @@ on:
jobs:
release:
permissions:
contents: write
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -169,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -93,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -708,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), std::move(user_agent_header),
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -989,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
return cd;
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
ret.emplace_back(r.make_client_data());
});
co_return ret;
}

View File

@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
client_options_cache_entry_type _user_agent;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -108,7 +107,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -100,8 +100,9 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
const auto route_entries = parse_set_client_array(root);
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_await cr.local().set_client_routes(route_entries);
co_return seastar::json::json_void();
}
@@ -131,7 +132,8 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
const auto route_keys = parse_delete_client_array(root);
co_await cr.local().delete_client_routes(route_keys);
co_return seastar::json::json_void();
}

View File

@@ -10,9 +10,7 @@
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "utils/loading_shared_values.hh"
#include <list>
#include <optional>
enum class client_type {
@@ -29,20 +27,6 @@ enum class client_connection_stage {
ready,
};
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
struct options_cache_value_type {};
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
using client_options_cache_key_type = client_options_cache_type::key_type;
// This struct represents a single OPTION key-value pair from the client's connection options.
// Both key and value are represented by corresponding "references" to their cached values.
// Each "reference" is effectively a lw_shared_ptr value.
struct client_option_key_value_cached_entry {
client_options_cache_entry_type key;
client_options_cache_entry_type value;
};
sstring to_string(client_connection_stage ct);
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
@@ -53,8 +37,8 @@ struct client_data {
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
std::optional<client_options_cache_entry_type> driver_name;
std::optional<client_options_cache_entry_type> driver_version;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
@@ -62,7 +46,6 @@ struct client_data {
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
std::optional<sstring> scheduling_group_name;
std::list<client_option_key_value_cached_entry> client_options;
sstring stage_str() const { return to_string(connection_stage); }
sstring client_type_str() const { return to_string(ct); }

View File

@@ -125,6 +125,10 @@ if(target_arch)
add_compile_options("-march=${target_arch}")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
endif()
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")

View File

@@ -2251,6 +2251,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
if debuginfo and mode_config['can_have_debug_info']:
cxxflags += ['-g', '-gz']
if 'clang' in cxx:
# Since AssignmentTracking was enabled by default in clang
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
# coroutine frame debugging info (`coro_frame_ty`) is broken.
#
# It seems that we aren't losing much by disabling AssigmentTracking,
# so for now we choose to disable it to get `coro_frame_ty` back.
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
return cxxflags

View File

@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
future<> view_building_worker::run_staging_sstables_registrator() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
co_await create_staging_sstable_tasks();
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
} catch (raft::request_aborted&) {
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
} catch (...) {
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
sleep = true;
}
if (sleep) {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
}
}
}
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
auto it = vbw._state._batch->tasks.begin();
while (it != vbw._state._batch->tasks.end()) {
auto id = it->first;
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
++it; // Advance the iterator before potentially removing the entry from the map.
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
for (auto& [id, t]: tasks_map) {
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
if (!task_opt || task_opt->get().aborted) {
co_await vbw._state._batch->abort_task(id);
}
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
}) | std::ranges::to<std::unordered_set>();;
}
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
// clear the state, save and flush new base table
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
if (processing_base_table != building_state.currently_processed_base_table) {
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
break;
}
}
_vbw.local()._vb_state_machine.event.broadcast();
}
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
tasks.insert({id, *task_opt});
}
#ifdef SEASTAR_DEBUG
{
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
#endif
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
co_return collect_completed_tasks();
}
}
}

View File

@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using client_data_vec = utils::chunked_vector<client_data>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
if (cd_map.contains(cd.ip)) {
cd_map[cd.ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
dht::decorated_key key = make_partition_key(cd.ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
ips.insert(decorated_ip{std::move(key), cd.ip});
cd_map[cd.ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
return a.port < b.port || a.client_type_str() < b.client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
set_cell(cr.cells(), "shard_id", cd.shard_id);
set_cell(cr.cells(), "connection_stage", cd.stage_str());
if (cd.driver_name) {
set_cell(cr.cells(), "driver_name", *cd.driver_name);
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
if (cd.driver_version) {
set_cell(cr.cells(), "driver_version", *cd.driver_version);
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
if (cd.hostname) {
set_cell(cr.cells(), "hostname", *cd.hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
if (cd.protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
if (cd.ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
if (cd.ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
if (cd.ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
if (cd.scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();

View File

@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.

View File

@@ -74,8 +74,6 @@ The keys and values are:
as an indicator to which shard client wants to connect. The desired shard number
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
Its value is a decimal representation of type `uint16_t`, by default `19142`.
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
`biased-token-round-robin`. To apply the algorithm,

View File

@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
- a keyspace/view was dropped
- tablet operations (see [tablet operations section](#tablet-operations))
In the first case we simply delete relevant view building tasks as they are no longer needed.
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,

View File

@@ -17,7 +17,6 @@ This document highlights ScyllaDB's key data modeling features.
Workload Prioritization </features/workload-prioritization>
Backup and Restore </features/backup-and-restore>
Incremental Repair </features/incremental-repair/>
Vector Search </features/vector-search/>
.. panel-box::
:title: ScyllaDB Features
@@ -44,5 +43,3 @@ This document highlights ScyllaDB's key data modeling features.
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
efficient and lightweight approach to maintaining data consistency by
repairing only the data that has changed since the last repair.
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
similarity-based queries on vector embeddings.

View File

@@ -1,55 +0,0 @@
=================================
Vector Search in ScyllaDB
=================================
.. note::
This feature is currently available only in `ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
What Is Vector Search
-------------------------
Vector Search enables similarity-based queries over high-dimensional data,
such as text, images, audio, or user behavior. Instead of searching for exact
matches, it allows applications to find items that are semantically similar to
a given input.
To do this, Vector Search works on vector embeddings, which are numerical
representations of data that capture semantic meaning. This enables queries
such as:
* “Find documents similar to this paragraph”
* “Find products similar to what the user just viewed”
* “Find previous tickets related to this support request”
Rather than relying on exact values or keywords, Vector Search returns results
based on distance or similarity between vectors. This capability is
increasingly used in modern workloads such as AI-powered search, recommendation
systems, and retrieval-augmented generation (RAG).
Why Vector Search Matters
------------------------------------
Many applications already rely on ScyllaDB for high throughput, low and
predictable latency, and large-scale data storage.
Vector Search complements these strengths by enabling new classes of workloads,
including:
* Semantic search over text or documents
* Recommendations based on user or item similarity
* AI and ML applications, including RAG pipelines
* Anomaly and pattern detection
With Vector Search, ScyllaDB can serve as the similarity search backend for
AI-driven applications.
Availability
--------------
Vector Search is currently available only in ScyllaDB Cloud, the fully managed
ScyllaDB service.
👉 For details on using Vector Search, refer to the
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/index.html>`_.

View File

@@ -20,10 +20,7 @@ You can run your ScyllaDB workloads on AWS, GCE, and Azure using a ScyllaDB imag
Amazon Web Services (AWS)
-----------------------------
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`,
:ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`,
:ref:`i7ie <system-requirements-i7ie-instances>`, :ref:`i8g<system-requirements-i8g-instances>`,
and :ref:`i8ge <system-requirements-i8ge-instances>`.
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`, :ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`, and :ref:`i7ie <system-requirements-i7ie-instances>`.
.. note::
@@ -198,118 +195,6 @@ All i7i instances have the following specs:
See `Amazon EC2 I7i Instances <https://aws.amazon.com/ec2/instance-types/i7i/>`_ for details.
.. _system-requirements-i8g-instances:
i8g instances
^^^^^^^^^^^^^^
The following i8g instances are supported.
.. list-table::
:widths: 30 20 20 30
:header-rows: 1
* - Model
- vCPU
- Mem (GiB)
- Storage (GB)
* - i8g.large
- 2
- 16
- 1 x 468 GB
* - i8g.xlarge
- 4
- 32
- 1 x 937 GB
* - i8g.2xlarge
- 8
- 64
- 1 x 1,875 GB
* - i8g.4xlarge
- 16
- 128
- 1 x 3,750 GB
* - i8g.8xlarge
- 32
- 256
- 2 x 3,750 GB
* - i8g.12xlarge
- 48
- 384
- 3 x 3,750 GB
* - i8g.16xlarge
- 64
- 512
- 4 x 3,750 GB
All i8g instances have the following specs:
* Powered by AWS Graviton4 processors
* 3rd generation AWS Nitro SSD storage
* DDR5-5600 memory for improved throughput
* Up to 100 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
Amazon Elastic Block Store (EBS)
* Instance sizes offer up to 45 TB of total local NVMe instance storage
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
.. _system-requirements-i8ge-instances:
i8ge instances
^^^^^^^^^^^^^^
The following i8ge instances are supported.
.. list-table::
:widths: 30 20 20 30
:header-rows: 1
* - Model
- vCPU
- Mem (GiB)
- Storage (GB)
* - i8ge.large
- 2
- 16
- 1 x 1,250 GB
* - i8ge.xlarge
- 4
- 32
- 1 x 2,500 GB
* - i8ge.2xlarge
- 8
- 64
- 2 x 2,500 GB
* - i8ge.3xlarge
- 12
- 96
- 1 x 7,500 GB
* - i8ge.6xlarge
- 24
- 192
- 2 x 7,500 GB
* - i8ge.12xlarge
- 48
- 384
- 4 x 7,500 GB
* - i8ge.18xlarge
- 72
- 576
- 6 x 7,500 GB
All i8ge instances have the following specs:
* Powered by AWS Graviton4 processors
* 3rd generation AWS Nitro SSD storage
* DDR5-5600 memory for improved throughput
* Up to 300 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
Amazon Elastic Block Store (EBS)
* Instance sizes offer up to 120 TB of total local NVMe instance storage
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
Im4gn and Is4gen instances
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ScyllaDB supports Arm-based Im4gn and Is4gen instances. See `Amazon EC2 Im4gn and Is4gen instances <https://aws.amazon.com/ec2/instance-types/i4g/>`_ for specification details.

View File

@@ -176,7 +176,7 @@ void fsm::become_leader() {
_last_election_time = _clock.now();
_ping_leader = false;
// a new leader needs to commit at least one entry to make sure that
// a new leader needs to commit at lease one entry to make sure that
// all existing entries in its log are committed as well. Also it should
// send append entries RPC as soon as possible to establish its leadership
// (3.4). Do both of those by committing a dummy entry.

View File

@@ -3385,15 +3385,16 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
continue;
}
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
while (auto de = lister.get().get()) {
auto snapshot_name = de->name;
lister::scan_dir(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [datadir, &all_snapshots] (fs::path snapshots_dir, directory_entry de) {
auto snapshot_name = de.name;
all_snapshots.emplace(snapshot_name, snapshot_details());
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
auto& sd = all_snapshots.at(snapshot_name);
sd.total += details.total;
sd.live += details.live;
}
return get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).then([&all_snapshots, snapshot_name] (auto details) {
auto& sd = all_snapshots.at(snapshot_name);
sd.total += details.total;
sd.live += details.live;
return make_ready_future<>();
});
}).get();
}
return all_snapshots;
});
@@ -3401,61 +3402,38 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
table::snapshot_details details{};
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
if (!co_await file_exists(staging_dir->native())) {
staging_dir.reset();
}
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
while (auto de = co_await lister.get()) {
const auto& name = de->name;
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
// See https://github.com/scylladb/seastar/pull/3163
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
co_await lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [datadir, &details] (fs::path snapshot_dir, directory_entry de) -> future<> {
auto sd = co_await io_check(file_stat, (snapshot_dir / de.name).native(), follow_symlink::no);
auto size = sd.allocated_size;
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
//
// All the others should just generate an exception: there is something wrong, so don't blindly
// add it to the size.
if (name != "manifest.json" && name != "schema.cql") {
if (de.name != "manifest.json" && de.name != "schema.cql") {
details.total += size;
if (sd.number_of_links == 1) {
// File exists only in the snapshot directory.
details.live += size;
continue;
}
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
// So check the datadir for the file too.
} else {
continue;
size = 0;
}
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
try {
try {
// File exists in the main SSTable directory. Snapshots are not contributing to size
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
auto psd = co_await io_check(file_stat, (datadir / de.name).native(), follow_symlink::no);
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
co_return false;
(datadir / de.name).native(), psd.device_id, psd.inode_number, psd.size,
(snapshot_dir / de.name).native(), sd.device_id, sd.inode_number, sd.size);
details.live += size;
}
co_return true;
} catch (std::system_error& e) {
} catch (std::system_error& e) {
if (e.code() != std::error_code(ENOENT, std::system_category())) {
throw;
}
co_return false;
}
};
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
!co_await exists_in_dir(datadir / name)) {
details.live += size;
}
}
});
co_return details;
}

View File

@@ -82,7 +82,7 @@ seastar::future<> service::client_routes_service::set_client_routes_inner(const
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
utils::chunked_vector<canonical_mutation> cmuts;
for (const auto& entry : route_entries) {
for (auto& entry : route_entries) {
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
cmuts.emplace_back(std::move(mut));
}
@@ -103,24 +103,24 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
}
seastar::future<> service::client_routes_service::set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries) {
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) mutable -> future<> {
return cr.with_retry([&cr, route_entries = std::move(route_entries)] {
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&] {
return cr.set_client_routes_inner(route_entries);
});
});
}
seastar::future<> service::client_routes_service::delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys) {
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) mutable -> future<> {
return cr.with_retry([&cr, route_keys = std::move(route_keys)] {
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&] {
return cr.delete_client_routes_inner(route_keys);
});
});
}
template <typename Func>
seastar::future<> service::client_routes_service::with_retry(Func func) const {
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
int retries = 10;
while (true) {
try {

View File

@@ -66,8 +66,8 @@ public:
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
future<std::vector<client_route_entry>> get_client_routes() const;
seastar::future<> set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries);
seastar::future<> delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys);
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
// notifications
@@ -76,7 +76,7 @@ private:
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
template <typename Func>
seastar::future<> with_retry(Func func) const;
seastar::future<> with_retry(Func&& func) const;
abort_source& _abort_source;
gms::feature_service& _feature_service;

View File

@@ -344,17 +344,3 @@ void service::client_state::update_per_service_level_params(qos::service_level_o
_workload_type = slo.workload;
}
future<> service::client_state::set_client_options(
client_options_cache_type& keys_and_values_cache,
const std::unordered_map<sstring, sstring>& client_options) {
for (const auto& [key, value] : client_options) {
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
}
}

View File

@@ -18,7 +18,6 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/permission.hh"
#include "client_data.hh"
#include "transport/cql_protocol_extension.hh"
#include "service/qos/service_level_controller.hh"
@@ -103,8 +102,7 @@ private:
private volatile String keyspace;
#endif
std::optional<auth::authenticated_user> _user;
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
std::list<client_option_key_value_cached_entry> _client_options;
std::optional<sstring> _driver_name, _driver_version;
auth_state _auth_state = auth_state::UNINITIALIZED;
bool _control_connection = false;
@@ -153,33 +151,18 @@ public:
return _control_connection = true;
}
std::optional<client_options_cache_entry_type> get_driver_name() const {
std::optional<sstring> get_driver_name() const {
return _driver_name;
}
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
void set_driver_name(sstring driver_name) {
_driver_name = std::move(driver_name);
}
const auto& get_client_options() const {
return _client_options;
}
future<> set_client_options(
client_options_cache_type& keys_and_values_cache,
const std::unordered_map<sstring, sstring>& client_options);
std::optional<client_options_cache_entry_type> get_driver_version() const {
std::optional<sstring> get_driver_version() const {
return _driver_version;
}
future<> set_driver_version(
client_options_cache_type& keys_and_values_cache,
const sstring& driver_version)
{
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
void set_driver_version(sstring driver_version) {
_driver_version = std::move(driver_version);
}
client_state(external_tag,

View File

@@ -131,9 +131,8 @@ async def test_backup_move(manager: ManagerClient, object_storage, move_files):
@pytest.mark.asyncio
@pytest.mark.parametrize("ne_parameter", [ "endpoint", "bucket", "snapshot" ])
async def test_backup_with_non_existing_parameters(manager: ManagerClient, object_storage, ne_parameter):
'''backup should fail if either of the parameters does not exist'''
async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_storage):
'''backup should fail if the destination bucket does not exist'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
@@ -143,8 +142,7 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
backup_snap_name = 'backup'
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name = backup_snap_name)
ks, cf = await prepare_snapshot_for_backup(manager, server)
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
@@ -152,18 +150,39 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
assert len(files) > 0
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf,
backup_snap_name if ne_parameter != 'snapshot' else 'no-such-snapshot',
object_storage.address if ne_parameter != 'endpoint' else 'no-such-endpoint',
object_storage.bucket_name if ne_parameter != 'bucket' else 'no-such-bucket',
prefix)
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', object_storage.address, "non-existant-bucket", prefix)
status = await manager.api.wait_task(server.ip_addr, tid)
assert status is not None
assert status['state'] == 'failed'
if ne_parameter == 'endpoint':
assert status['error'] == 'std::invalid_argument (endpoint no-such-endpoint not found)'
#assert 'S3 request failed. Code: 15. Reason: Access Denied.' in status['error']
@pytest.mark.asyncio
async def test_backup_to_non_existent_endpoint(manager: ManagerClient, object_storage):
'''backup should fail if the endpoint is invalid/inaccessible'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options'],
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
assert len(files) > 0
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', "does_not_exist", object_storage.bucket_name, prefix)
status = await manager.api.wait_task(server.ip_addr, tid)
assert status is not None
assert status['state'] == 'failed'
assert status['error'] == 'std::invalid_argument (endpoint does_not_exist not found)'
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
'''helper for backup abort testing'''
@@ -217,6 +236,38 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
assert max_files is None or uploaded_count < max_files
@pytest.mark.asyncio
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, object_storage):
'''backup should fail if the snapshot does not exist'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options'],
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'nonexistent-snapshot',
object_storage.address, object_storage.bucket_name, prefix)
# The task is expected to fail immediately due to invalid snapshot name.
# However, since internal implementation details may change, we'll wait for
# task completion if immediate failure doesn't occur.
actual_state = None
for status_api in [manager.api.get_task_status,
manager.api.wait_task]:
status = await status_api(server.ip_addr, tid)
assert status is not None
actual_state = status['state']
if actual_state == 'failed':
break
else:
assert actual_state == 'failed'
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_backup_is_abortable(manager: ManagerClient, object_storage):

View File

@@ -74,6 +74,7 @@ def test_cast_int_literal_with_type_hint_to_blob(cql, table1, scylla_only):
# An int can always be converted to a valid blob, but blobs might have wrong amount of bytes
# and can't be converted to a valid int.
def test_cast_blob_literal_to_int(cql, table1):
pk = unique_key_int()
with pytest.raises(InvalidRequest, match='HEX'):
cql.execute(f"INSERT INTO {table1} (pk) VALUES (0xBAAAAAAD)")
with pytest.raises(InvalidRequest, match='blob'):

View File

@@ -61,7 +61,7 @@ def test_select_default_order(cql, table_int_desc):
def test_multi_column_relation_desc(cql, table2):
k = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table2} (p, c1, c2) VALUES (?, ?, ?)')
cql.execute(stmt, [k, 1, 0])
cql.execute(stmt, [k, 1, 1])
cql.execute(stmt, [k, 1, 2])
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = {k} AND (c1, c2) >= (1, 1)'))
cql.execute(stmt, [0, 1, 0])
cql.execute(stmt, [0, 1, 1])
cql.execute(stmt, [0, 1, 2])
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = 0 AND (c1, c2) >= (1, 1)'))

View File

@@ -352,7 +352,7 @@ def test_storage_options_alter_type(cql, scylla_only):
ksdef_local = "WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : '1' } " \
"AND STORAGE = { 'type' : 'S3', 'bucket' : '/b1', 'endpoint': 'localhost'}"
with pytest.raises(InvalidRequest):
cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
res = cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
# Reproducer for scylladb#14139
def test_alter_keyspace_preserves_udt(cql):

View File

@@ -171,6 +171,7 @@ def test_grant_revoke_data_permissions(cql, test_keyspace):
# Test that permissions for user-defined functions are serialized in a Cassandra-compatible way
def test_udf_permissions_serialization(cql):
schema = "a int primary key"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace, new_user(cql) as user:
with new_test_table(cql, keyspace, schema) as table:
# Creating a bilingual function makes this test case work for both Scylla and Cassandra
@@ -246,6 +247,7 @@ def test_udf_permissions_quoted_names(cassandra_bug, cql):
# permissions. Cassandra erroneously reports the unrelated missing permissions.
# Reported to Cassandra as CASSANDRA-19005.
def test_drop_udf_with_same_name(cql, cassandra_bug):
schema = "a int primary key"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42L;'"
@@ -286,6 +288,7 @@ def test_drop_udf_with_same_name(cql, cassandra_bug):
# Tests for ALTER are separate, because they are qualified as cassandra_bug
def test_grant_revoke_udf_permissions(cql):
schema = "a int primary key, b list<int>"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
with new_test_table(cql, keyspace, schema) as table:
fun_body_lua = "(i int, l list<int>) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
@@ -332,6 +335,7 @@ def test_grant_revoke_udf_permissions(cql):
# and yet it's not enforced
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
schema = "a int primary key"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
with new_test_table(cql, keyspace, schema) as table:
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"

View File

@@ -90,6 +90,8 @@ def test_attached_service_level(scylla_only, cql):
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
def test_list_effective_service_level(scylla_only, cql):
sl1 = "sl1"
sl2 = "sl2"
timeout = "10s"
workload_type = "batch"
@@ -118,6 +120,8 @@ def test_list_effective_service_level(scylla_only, cql):
assert row.value == "batch"
def test_list_effective_service_level_shares(scylla_only, cql):
sl1 = "sl1"
sl2 = "sl2"
shares1 = 500
shares2 = 200
@@ -180,6 +184,8 @@ def test_default_shares_in_listings(scylla_only, cql):
# and that the messages Scylla returns are informative.
def test_manipulating_default_service_level(cql, scylla_only):
default_sl = "default"
# Service levels are case-sensitive (if used with quotation marks).
fake_default_sl = '"DeFaUlT"'
with new_user(cql) as role:
# Creation.

View File

@@ -76,7 +76,6 @@ def test_clients(scylla_only, cql):
'ssl_enabled',
'ssl_protocol',
'username',
'client_options',
])
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
# There must be at least one connection - the one that sent this SELECT
@@ -85,9 +84,6 @@ def test_clients(scylla_only, cql):
for cl in cls:
assert(cl[0] == '127.0.0.1')
assert(cl[2] == 'cql')
client_options = cl[13]
assert(client_options.get('DRIVER_NAME') == cl[4])
assert(client_options.get('DRIVER_VERSION') == cl[5])
# We only want to check that the table exists with the listed columns, to assert
# backwards compatibility.

View File

@@ -23,7 +23,7 @@ def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
try:
f42 = unique_name()
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
except NoHostAvailable as err:
if "not enabled" in str(err):
@@ -373,7 +373,8 @@ def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
assert len(res) == 1 and res[0].result == 177147
# Test that only compilable input is accepted
def test_compilable(cql, test_keyspace, scylla_with_wasm_only):
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
wrong_source = f"""
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
"""
@@ -383,7 +384,8 @@ Dear wasmtime compiler, please return a function which returns its float argumen
# Test that not exporting a function with matching name
# results in an error
def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
wrong_source = f"""
(module
(type (;0;) (func (param f32) (result f32)))
@@ -401,7 +403,8 @@ def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
f"AS '{wrong_source}'")
# Test that trying to use something that is exported, but is not a function, won't work
def test_not_a_function(cql, test_keyspace, scylla_with_wasm_only):
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
wrong_source = f"""
(module
(type (;0;) (func (param f32) (result f32)))

View File

@@ -49,9 +49,6 @@ RUN_ID = pytest.StashKey[int]()
logger = logging.getLogger(__name__)
# Store pytest config globally so we can access it in hooks that only receive report
_pytest_config: pytest.Config | None = None
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
@@ -187,52 +184,6 @@ def pytest_sessionstart(session: pytest.Session) -> None:
)
@pytest.hookimpl(trylast=True)
def pytest_runtest_logreport(report):
"""Add custom XML attributes to JUnit testcase elements.
This hook wraps the node_reporter's to_xml method to add custom attributes
when the XML element is created. This approach works with pytest-xdist because
it modifies the XML element directly when it's generated, rather than trying
to modify attrs before finalize() is called.
Attributes added:
- function_path: The function path of the test case (excluding parameters).
Uses trylast=True to run after LogXML's hook has created the node_reporter.
"""
from _pytest.junitxml import xml_key
# Only process call phase
if report.when != "call":
return
# Get the XML reporter
config = _pytest_config
if config is None:
return
xml = config.stash.get(xml_key, None)
if xml is None:
return
node_reporter = xml.node_reporter(report)
nodeid = report.nodeid
function_path = f'test/{nodeid.rsplit('.', 2)[0].rsplit('[', 1)[0]}'
# Wrap the to_xml method to add custom attributes to the element
original_to_xml = node_reporter.to_xml
def custom_to_xml():
"""Wrapper that adds custom attributes to the testcase element."""
element = original_to_xml()
element.set("function_path", function_path)
return element
node_reporter.to_xml = custom_to_xml
def pytest_sessionfinish(session: pytest.Session) -> None:
if not session.config.getoption("--test-py-init"):
return
@@ -245,9 +196,6 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
def pytest_configure(config: pytest.Config) -> None:
global _pytest_config
_pytest_config = config
config.build_modes = get_modes_to_run(config)
if testpy_run_id := config.getoption("--run_id"):

View File

@@ -243,7 +243,7 @@ async def get_scylla_2025_1_executable(build_mode: str) -> str:
if not unpacked_marker.exists():
if not downloaded_marker.exists():
archive_path.unlink(missing_ok=True)
await run_process(["curl", "--retry", "10", "--fail", "--silent", "--show-error", "--output", archive_path, url])
await run_process(["curl", "--silent", "--show-error", "--output", archive_path, url])
downloaded_marker.touch()
shutil.rmtree(unpack_dir, ignore_errors=True)
unpack_dir.mkdir(exist_ok=True, parents=True)

View File

@@ -2930,18 +2930,6 @@ private:
static constexpr elem_t magic = 54313;
static void check_digest_value(elem_t d) {
if (d < 0 || d >= magic) {
on_fatal_internal_error(tlogger, fmt::format("Digest value out of range: {}", d));
}
}
static void validate_digest_value(elem_t d_new, elem_t d_old, elem_t x) {
if (d_new < 0 || d_new >= magic) {
on_fatal_internal_error(tlogger, fmt::format("Digest value invalid after appending/removing element: d_new {}, d_old {}, x {}", d_new, d_old, x));
}
}
public:
append_seq(std::vector<elem_t> v) : _seq{make_lw_shared<std::vector<elem_t>>(std::move(v))}, _end{_seq->size()}, _digest{0} {
for (auto x : *_seq) {
@@ -2950,26 +2938,20 @@ public:
}
static elem_t digest_append(elem_t d, elem_t x) {
check_digest_value(d);
BOOST_REQUIRE_LE(0, d);
BOOST_REQUIRE_LT(d, magic);
auto y = (d + x) % magic;
SCYLLA_ASSERT(digest_remove(y, x) == d);
validate_digest_value(y, d, x);
return y;
}
static elem_t digest_remove(elem_t d, elem_t x) {
check_digest_value(d);
BOOST_REQUIRE_LE(0, d);
BOOST_REQUIRE_LT(d, magic);
auto y = (d - x) % magic;
if (y < 0) {
y += magic;
}
validate_digest_value(y, d, x);
return y;
return y < 0 ? y + magic : y;
}
elem_t digest() const {

View File

@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
class RandomContentFile:
class random_content_file:
def __init__(self, path: str, size_in_bytes: int):
path = pathlib.Path(path)
self.filename = path if path.is_file() else path / str(uuid.uuid4())
@@ -68,7 +68,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
try:
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
except Exception:
pass
else:
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
@pytest.mark.asyncio
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
cmdline = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
@@ -134,7 +134,7 @@ async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
s2_mark = await s2_log.mark()
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
mark = await log.mark()
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
coord_log = await manager.server_open_log(coord_serv.server_id)
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
await manager.server_restart(servers[0].server_id, wait_others=2)

View File

@@ -353,7 +353,7 @@ future<> controller::set_cql_ready(bool ready) {
return _gossiper.local().add_local_application_state(gms::application_state::RPC_READY, gms::versioned_value::cql_ready(ready));
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server ? _server->local().get_client_data() : protocol_server::get_client_data();
}

View File

@@ -77,7 +77,7 @@ public:
virtual future<> start_server() override;
virtual future<> stop_server() override;
virtual future<> request_stop_server() override;
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
future<> update_connections_scheduling_group();
future<std::vector<connection_service_level_params>> get_connections_service_level_params();

View File

@@ -10,7 +10,6 @@
#include "seastarx.hh"
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/net/socket_defs.hh>
#include <vector>
#include "client_data.hh"
@@ -44,8 +43,8 @@ public:
/// This variant is used by the REST API so failure is acceptable.
virtual future<> request_stop_server() = 0;
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() {
return make_ready_future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>>();
virtual future<utils::chunked_vector<client_data>> get_client_data() {
return make_ready_future<utils::chunked_vector<client_data>>(utils::chunked_vector<client_data>());
}
protocol_server(seastar::scheduling_group sg) noexcept : _sched_group(std::move(sg)) {}

View File

@@ -691,7 +691,6 @@ client_data cql_server::connection::make_client_data() const {
cd.connection_stage = client_connection_stage::authenticating;
}
cd.scheduling_group_name = _current_scheduling_group.name();
cd.client_options = _client_state.get_client_options();
cd.ssl_enabled = _ssl_enabled;
cd.ssl_protocol = _ssl_protocol;
@@ -959,17 +958,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
}
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
_client_state.set_driver_version(driver_ver_opt->second);
}
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
_client_state.set_driver_name(driver_name_opt->second);
}
// Store all received client options for later exposure in the system.clients 'client_options' column
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
// identical key/value sets across multiple connections (e.g., same driver name/version).
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
cql_protocol_extension_enum_set cql_proto_exts;
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
if (options.contains(protocol_extension_name(ext))) {
@@ -1653,9 +1647,6 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
opts.insert({"COMPRESSION", "lz4"});
opts.insert({"COMPRESSION", "snappy"});
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
// e.g. CQL driver configuration.
opts.insert({"CLIENT_OPTIONS", ""});
if (_server._config.allow_shard_aware_drivers) {
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
@@ -2317,11 +2308,11 @@ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata
return _response_metadata_id.value();
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> cql_server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await for_each_gently([&ret] (const generic_server::connection& c) {
const connection& conn = dynamic_cast<const connection&>(c);
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
ret.emplace_back(conn.make_client_data());
});
co_return ret;
}

View File

@@ -206,7 +206,6 @@ private:
seastar::metrics::metric_groups _metrics;
std::unique_ptr<event_notifier> _notifier;
private:
client_options_cache_type _connection_options_keys_and_values;
transport_stats _stats;
auth::service& _auth_service;
qos::service_level_controller& _sl_controller;
@@ -235,7 +234,7 @@ public:
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
future<> update_connections_scheduling_group();
future<> update_connections_service_level_params();
future<std::vector<connection_service_level_params>> get_connections_service_level_params();

View File

@@ -1547,8 +1547,8 @@ void reclaim_timer::report() const noexcept {
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
auto info_level = _stall_detected ? log_level::info : log_level::debug;
auto MiB = 1024*1024;
auto msg_extra = extra_msg_when_stall_detected(_stall_detected && !_preemptible,
(_stall_detected && !_preemptible) ? current_backtrace() : saved_backtrace{});
auto msg_extra = extra_msg_when_stall_detected(_stall_detected,
_stall_detected ? current_backtrace() : saved_backtrace{});
timing_logger.log(time_level, "{} took {} us, trying to release {:.3f} MiB {}preemptibly, reserve: {{goal: {}, max: {}}}{}",
_name, (_duration + 500ns) / 1us, (float)_memory_to_release / MiB, _preemptible ? "" : "non-",