Compare commits
54 Commits
copilot/ad
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c06760cf15 | ||
|
|
c684456eba | ||
|
|
cd2568ad00 | ||
|
|
7586c5ccbd | ||
|
|
d60b908a8e | ||
|
|
20ff2fcc18 | ||
|
|
6ffdada0ea | ||
|
|
4c247a5d08 | ||
|
|
288d4b49e9 | ||
|
|
e304d912b4 | ||
|
|
846a6e700b | ||
|
|
af5e73def9 | ||
|
|
9793a45288 | ||
|
|
033579ad6f | ||
|
|
c1da552fa4 | ||
|
|
cb3b96b8f4 | ||
|
|
b105ad8379 | ||
|
|
addac8b3f7 | ||
|
|
ea95cdaaec | ||
|
|
28cbaef110 | ||
|
|
85adf6bdb1 | ||
|
|
3a54bab193 | ||
|
|
f65db4e8eb | ||
|
|
df2ac0f257 | ||
|
|
093e97a539 | ||
|
|
fa6e5d0754 | ||
|
|
08518b2c12 | ||
|
|
2a75b1374e | ||
|
|
2cb9bb8f3a | ||
|
|
f1d63d014c | ||
|
|
33f7bc28da | ||
|
|
f831ca5ab5 | ||
|
|
1fe0509a9b | ||
|
|
e7d76fd8f3 | ||
|
|
700853740d | ||
|
|
3c5dd5e5ae | ||
|
|
5971b2ad97 | ||
|
|
f89315d02f | ||
|
|
6ad10b141a | ||
|
|
8cf8e6c87d | ||
|
|
798714183e | ||
|
|
f5ca3657e2 | ||
|
|
dc00461adf | ||
|
|
be6d87648c | ||
|
|
004c08f525 | ||
|
|
4e106b9820 | ||
|
|
4fa4f40712 | ||
|
|
aa908ba99c | ||
|
|
529cd25c51 | ||
|
|
4fc5fcaec4 | ||
|
|
3253b05ec9 | ||
|
|
597a2ce5f9 | ||
|
|
a5f19af050 | ||
|
|
b4fe565f07 |
@@ -169,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -708,8 +708,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -985,10 +989,10 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(r.make_client_data());
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -88,7 +89,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
sstring _user_agent;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -107,7 +108,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -100,9 +100,8 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
const auto route_entries = parse_set_client_array(root);
|
||||
|
||||
co_await cr.local().set_client_routes(route_entries);
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
@@ -132,8 +131,7 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
const auto route_keys = parse_delete_client_array(root);
|
||||
co_await cr.local().delete_client_routes(route_keys);
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
enum class client_type {
|
||||
@@ -27,6 +29,20 @@ enum class client_connection_stage {
|
||||
ready,
|
||||
};
|
||||
|
||||
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
|
||||
struct options_cache_value_type {};
|
||||
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
|
||||
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
|
||||
using client_options_cache_key_type = client_options_cache_type::key_type;
|
||||
|
||||
// This struct represents a single OPTION key-value pair from the client's connection options.
|
||||
// Both key and value are represented by corresponding "references" to their cached values.
|
||||
// Each "reference" is effectively a lw_shared_ptr value.
|
||||
struct client_option_key_value_cached_entry {
|
||||
client_options_cache_entry_type key;
|
||||
client_options_cache_entry_type value;
|
||||
};
|
||||
|
||||
sstring to_string(client_connection_stage ct);
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
@@ -37,8 +53,8 @@ struct client_data {
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<client_options_cache_entry_type> driver_name;
|
||||
std::optional<client_options_cache_entry_type> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
std::optional<int32_t> protocol_version;
|
||||
std::optional<sstring> ssl_cipher_suite;
|
||||
@@ -46,6 +62,7 @@ struct client_data {
|
||||
std::optional<sstring> ssl_protocol;
|
||||
std::optional<sstring> username;
|
||||
std::optional<sstring> scheduling_group_name;
|
||||
std::list<client_option_key_value_cached_entry> client_options;
|
||||
|
||||
sstring stage_str() const { return to_string(connection_stage); }
|
||||
sstring client_type_str() const { return to_string(ct); }
|
||||
|
||||
@@ -125,10 +125,6 @@ if(target_arch)
|
||||
add_compile_options("-march=${target_arch}")
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
|
||||
endif()
|
||||
|
||||
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
|
||||
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")
|
||||
|
||||
@@ -2251,15 +2251,6 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
|
||||
if debuginfo and mode_config['can_have_debug_info']:
|
||||
cxxflags += ['-g', '-gz']
|
||||
|
||||
if 'clang' in cxx:
|
||||
# Since AssignmentTracking was enabled by default in clang
|
||||
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
|
||||
# coroutine frame debugging info (`coro_frame_ty`) is broken.
|
||||
#
|
||||
# It seems that we aren't losing much by disabling AssigmentTracking,
|
||||
# so for now we choose to disable it to get `coro_frame_ty` back.
|
||||
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
|
||||
|
||||
return cxxflags
|
||||
|
||||
|
||||
|
||||
@@ -1489,8 +1489,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, nodeops_heartbeat_interval_seconds(this, "nodeops_heartbeat_interval_seconds", liveness::LiveUpdate, value_status::Used, 10, "Period of heartbeat ticks in node operations.")
|
||||
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions. The amount of memory usable by index cache is limited with ``index_cache_fraction``.")
|
||||
, partition_index_cache_enabled(this, "partition_index_cache_enabled", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Enable partition_index_cache. When disabled, partition index entries are not cached in the global partition_index_cache, reducing memory usage but potentially degrading read performance. The ``cache_index_pages`` option controls caching of the index file pages themselves.")
|
||||
, index_cache_fraction(this, "index_cache_fraction", liveness::LiveUpdate, value_status::Used, 0.2,
|
||||
"The maximum fraction of cache memory permitted for use by index cache. Clamped to the [0.0; 1.0] range. Must be small enough to not deprive the row cache of memory, but should be big enough to fit a large fraction of the index. The default value 0.2 means that at least 80\% of cache memory is reserved for the row cache, while at most 20\% is usable by the index cache.")
|
||||
, consistent_cluster_management(this, "consistent_cluster_management", value_status::Deprecated, true, "Use RAFT for cluster management and DDL.")
|
||||
|
||||
@@ -496,7 +496,6 @@ public:
|
||||
named_value<uint32_t> nodeops_heartbeat_interval_seconds;
|
||||
|
||||
named_value<bool> cache_index_pages;
|
||||
named_value<bool> partition_index_cache_enabled;
|
||||
named_value<double> index_cache_fraction;
|
||||
|
||||
named_value<bool> consistent_cluster_management;
|
||||
|
||||
@@ -198,6 +198,7 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
@@ -214,6 +215,14 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
|
||||
} catch (...) {
|
||||
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
|
||||
sleep = true;
|
||||
}
|
||||
|
||||
if (sleep) {
|
||||
vbw_logger.debug("Sleeping after exception.");
|
||||
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -417,9 +426,12 @@ future<> view_building_worker::check_for_aborted_tasks() {
|
||||
|
||||
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
|
||||
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
|
||||
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
|
||||
for (auto& [id, t]: tasks_map) {
|
||||
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
|
||||
auto it = vbw._state._batch->tasks.begin();
|
||||
while (it != vbw._state._batch->tasks.end()) {
|
||||
auto id = it->first;
|
||||
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
|
||||
|
||||
++it; // Advance the iterator before potentially removing the entry from the map.
|
||||
if (!task_opt || task_opt->get().aborted) {
|
||||
co_await vbw._state._batch->abort_task(id);
|
||||
}
|
||||
@@ -449,7 +461,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
if (processing_base_table != building_state.currently_processed_base_table) {
|
||||
@@ -571,8 +583,6 @@ future<> view_building_worker::batch::do_work() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
|
||||
@@ -774,13 +784,15 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
tasks.insert({id, *task_opt});
|
||||
}
|
||||
#ifdef SEASTAR_DEBUG
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
{
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -811,25 +823,6 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
co_return collect_completed_tasks();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -749,6 +749,7 @@ class clients_table : public streaming_virtual_table {
|
||||
.with_column("ssl_protocol", utf8_type)
|
||||
.with_column("username", utf8_type)
|
||||
.with_column("scheduling_group", utf8_type)
|
||||
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
@@ -766,7 +767,7 @@ class clients_table : public streaming_virtual_table {
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Collect
|
||||
using client_data_vec = utils::chunked_vector<client_data>;
|
||||
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
|
||||
using shard_client_data = std::vector<client_data_vec>;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
|
||||
cd_vec.resize(smp::count);
|
||||
@@ -806,13 +807,13 @@ class clients_table : public streaming_virtual_table {
|
||||
for (unsigned i = 0; i < smp::count; i++) {
|
||||
for (auto&& ps_cdc : *cd_vec[i]) {
|
||||
for (auto&& cd : ps_cdc) {
|
||||
if (cd_map.contains(cd.ip)) {
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
if (cd_map.contains(cd->ip)) {
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
} else {
|
||||
dht::decorated_key key = make_partition_key(cd.ip);
|
||||
dht::decorated_key key = make_partition_key(cd->ip);
|
||||
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
|
||||
ips.insert(decorated_ip{std::move(key), cd.ip});
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
ips.insert(decorated_ip{std::move(key), cd->ip});
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -825,39 +826,58 @@ class clients_table : public streaming_virtual_table {
|
||||
co_await result.emit_partition_start(dip.key);
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
|
||||
return a.port < b.port || a.client_type_str() < b.client_type_str();
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd.shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd.stage_str());
|
||||
if (cd.driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", *cd.driver_name);
|
||||
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd->shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd->stage_str());
|
||||
if (cd->driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
|
||||
}
|
||||
if (cd.driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", *cd.driver_version);
|
||||
if (cd->driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
|
||||
}
|
||||
if (cd.hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd.hostname);
|
||||
if (cd->hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd->hostname);
|
||||
}
|
||||
if (cd.protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
|
||||
if (cd->protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
|
||||
}
|
||||
if (cd.ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
|
||||
if (cd->ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
|
||||
}
|
||||
if (cd.ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
|
||||
if (cd->ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
|
||||
}
|
||||
if (cd.ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
|
||||
if (cd->ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
|
||||
}
|
||||
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
|
||||
if (cd.scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
|
||||
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
|
||||
if (cd->scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(
|
||||
utf8_type,
|
||||
utf8_type,
|
||||
false
|
||||
);
|
||||
|
||||
auto prepare_client_options = [] (const auto& client_options) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& co: client_options) {
|
||||
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
};
|
||||
|
||||
set_cell(cr.cells(), "client_options",
|
||||
make_map_value(map_type, prepare_client_options(cd->client_options)));
|
||||
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
|
||||
@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
|
||||
@@ -1043,6 +1043,8 @@ The following modes are available:
|
||||
* - ``immediate``
|
||||
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
|
||||
@@ -74,6 +74,8 @@ The keys and values are:
|
||||
as an indicator to which shard client wants to connect. The desired shard number
|
||||
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
|
||||
Its value is a decimal representation of type `uint16_t`, by default `19142`.
|
||||
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
|
||||
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
|
||||
|
||||
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
|
||||
`biased-token-round-robin`. To apply the algorithm,
|
||||
|
||||
@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
|
||||
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
|
||||
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
|
||||
|
||||
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
|
||||
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
|
||||
- a keyspace/view was dropped
|
||||
- tablet operations (see [tablet operations section](#tablet-operations))
|
||||
In the first case we simply delete relevant view building tasks as they are no longer needed.
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
|
||||
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
|
||||
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
|
||||
|
||||
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,
|
||||
|
||||
@@ -17,6 +17,7 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
Workload Prioritization </features/workload-prioritization>
|
||||
Backup and Restore </features/backup-and-restore>
|
||||
Incremental Repair </features/incremental-repair/>
|
||||
Vector Search </features/vector-search/>
|
||||
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
@@ -43,3 +44,5 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
|
||||
efficient and lightweight approach to maintaining data consistency by
|
||||
repairing only the data that has changed since the last repair.
|
||||
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
|
||||
similarity-based queries on vector embeddings.
|
||||
|
||||
55
docs/features/vector-search.rst
Normal file
55
docs/features/vector-search.rst
Normal file
@@ -0,0 +1,55 @@
|
||||
=================================
|
||||
Vector Search in ScyllaDB
|
||||
=================================
|
||||
|
||||
.. note::
|
||||
|
||||
This feature is currently available only in `ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
What Is Vector Search
|
||||
-------------------------
|
||||
|
||||
Vector Search enables similarity-based queries over high-dimensional data,
|
||||
such as text, images, audio, or user behavior. Instead of searching for exact
|
||||
matches, it allows applications to find items that are semantically similar to
|
||||
a given input.
|
||||
|
||||
To do this, Vector Search works on vector embeddings, which are numerical
|
||||
representations of data that capture semantic meaning. This enables queries
|
||||
such as:
|
||||
|
||||
* “Find documents similar to this paragraph”
|
||||
* “Find products similar to what the user just viewed”
|
||||
* “Find previous tickets related to this support request”
|
||||
|
||||
Rather than relying on exact values or keywords, Vector Search returns results
|
||||
based on distance or similarity between vectors. This capability is
|
||||
increasingly used in modern workloads such as AI-powered search, recommendation
|
||||
systems, and retrieval-augmented generation (RAG).
|
||||
|
||||
Why Vector Search Matters
|
||||
------------------------------------
|
||||
|
||||
Many applications already rely on ScyllaDB for high throughput, low and
|
||||
predictable latency, and large-scale data storage.
|
||||
|
||||
Vector Search complements these strengths by enabling new classes of workloads,
|
||||
including:
|
||||
|
||||
* Semantic search over text or documents
|
||||
* Recommendations based on user or item similarity
|
||||
* AI and ML applications, including RAG pipelines
|
||||
* Anomaly and pattern detection
|
||||
|
||||
With Vector Search, ScyllaDB can serve as the similarity search backend for
|
||||
AI-driven applications.
|
||||
|
||||
Availability
|
||||
--------------
|
||||
|
||||
Vector Search is currently available only in ScyllaDB Cloud, the fully managed
|
||||
ScyllaDB service.
|
||||
|
||||
|
||||
👉 For details on using Vector Search, refer to the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/index.html>`_.
|
||||
@@ -20,7 +20,10 @@ You can run your ScyllaDB workloads on AWS, GCE, and Azure using a ScyllaDB imag
|
||||
Amazon Web Services (AWS)
|
||||
-----------------------------
|
||||
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`, :ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`, and :ref:`i7ie <system-requirements-i7ie-instances>`.
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`,
|
||||
:ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`,
|
||||
:ref:`i7ie <system-requirements-i7ie-instances>`, :ref:`i8g<system-requirements-i8g-instances>`,
|
||||
and :ref:`i8ge <system-requirements-i8ge-instances>`.
|
||||
|
||||
.. note::
|
||||
|
||||
@@ -195,6 +198,118 @@ All i7i instances have the following specs:
|
||||
|
||||
See `Amazon EC2 I7i Instances <https://aws.amazon.com/ec2/instance-types/i7i/>`_ for details.
|
||||
|
||||
|
||||
.. _system-requirements-i8g-instances:
|
||||
|
||||
i8g instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8g instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8g.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 468 GB
|
||||
* - i8g.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 937 GB
|
||||
* - i8g.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 1 x 1,875 GB
|
||||
* - i8g.4xlarge
|
||||
- 16
|
||||
- 128
|
||||
- 1 x 3,750 GB
|
||||
* - i8g.8xlarge
|
||||
- 32
|
||||
- 256
|
||||
- 2 x 3,750 GB
|
||||
* - i8g.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 3 x 3,750 GB
|
||||
* - i8g.16xlarge
|
||||
- 64
|
||||
- 512
|
||||
- 4 x 3,750 GB
|
||||
|
||||
All i8g instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 100 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 45 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
.. _system-requirements-i8ge-instances:
|
||||
|
||||
i8ge instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8ge instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8ge.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 1,250 GB
|
||||
* - i8ge.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 2,500 GB
|
||||
* - i8ge.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 2 x 2,500 GB
|
||||
* - i8ge.3xlarge
|
||||
- 12
|
||||
- 96
|
||||
- 1 x 7,500 GB
|
||||
* - i8ge.6xlarge
|
||||
- 24
|
||||
- 192
|
||||
- 2 x 7,500 GB
|
||||
* - i8ge.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 4 x 7,500 GB
|
||||
* - i8ge.18xlarge
|
||||
- 72
|
||||
- 576
|
||||
- 6 x 7,500 GB
|
||||
|
||||
All i8ge instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 300 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 120 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
|
||||
Im4gn and Is4gen instances
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
ScyllaDB supports Arm-based Im4gn and Is4gen instances. See `Amazon EC2 Im4gn and Is4gen instances <https://aws.amazon.com/ec2/instance-types/i4g/>`_ for specification details.
|
||||
|
||||
@@ -25,8 +25,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
|
||||
|
||||
* :doc:`Install ScyllaDB </getting-started/install-scylla/index/>`
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
|
||||
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
* :doc:`Create a ScyllaDB Cluster - Single Data Center (DC) </operating-scylla/procedures/cluster-management/create-cluster/>`
|
||||
|
||||
@@ -3,8 +3,7 @@
|
||||
ScyllaDB Housekeeping and how to disable it
|
||||
============================================
|
||||
|
||||
It is always recommended to run the latest version of ScyllaDB.
|
||||
The latest stable release version is always available from the `Download Center <https://www.scylladb.com/download/>`_.
|
||||
It is always recommended to run the latest stable version of ScyllaDB.
|
||||
|
||||
When you install ScyllaDB, it installs by default two services: **scylla-housekeeping-restart** and **scylla-housekeeping-daily**. These services check for the latest ScyllaDB version and prompt the user if they are using a version that is older than what is publicly available.
|
||||
Information about your ScyllaDB deployment, including the ScyllaDB version currently used, as well as unique user and server identifiers, are collected by a centralized service.
|
||||
|
||||
@@ -9,6 +9,8 @@ Running ``cluster repair`` on a **single node** synchronizes all data on all nod
|
||||
To synchronize all data in clusters that have both tablets-based and vnodes-based keyspaces, run :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair/>` on **all**
|
||||
of the nodes in the cluster, and :doc:`nodetool cluster repair </operating-scylla/nodetool-commands/cluster/repair/>` on **any** of the nodes in the cluster.
|
||||
|
||||
.. warning:: :term:`Colocated Tables <Colocated Table>` cannot be synchronized using cluster repair in this version.
|
||||
|
||||
To check if a keyspace enables tablets, use:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
@@ -202,3 +202,7 @@ Glossary
|
||||
The name comes from two basic operations, multiply (MU) and rotate (R), used in its inner loop.
|
||||
The MurmurHash3 version used in ScyllaDB originated from `Apache Cassandra <https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/digest/MurmurHash3.html>`_, and is **not** identical to the `official MurmurHash3 calculation <https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MurmurHash.java#L31-L33>`_. More `here <https://github.com/russss/murmur3-cassandra>`_.
|
||||
|
||||
Colocated Table
|
||||
An internal table of a special type in a :doc:`tablets </architecture/tablets>` enabled keyspace that is colocated with another base table, meaning it always has the same tablet replicas as the base table.
|
||||
Current types of colocated tables include CDC log tables, local indexes, and materialized views that have the same partition key as their base table.
|
||||
|
||||
|
||||
1
main.cc
1
main.cc
@@ -825,7 +825,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// See the comment at the definition of sstables::global_cache_index_pages.
|
||||
smp::invoke_on_all([&cfg] {
|
||||
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
|
||||
sstables::global_partition_index_cache_enabled = cfg->partition_index_cache_enabled.operator utils::updateable_value<bool>();
|
||||
}).get();
|
||||
|
||||
::sighup_handler sighup_handler(opts, *cfg);
|
||||
|
||||
@@ -176,7 +176,7 @@ void fsm::become_leader() {
|
||||
|
||||
_last_election_time = _clock.now();
|
||||
_ping_leader = false;
|
||||
// a new leader needs to commit at lease one entry to make sure that
|
||||
// a new leader needs to commit at least one entry to make sure that
|
||||
// all existing entries in its log are committed as well. Also it should
|
||||
// send append entries RPC as soon as possible to establish its leadership
|
||||
// (3.4). Do both of those by committing a dummy entry.
|
||||
|
||||
@@ -2793,6 +2793,7 @@ future<> database::flush_all_tables() {
|
||||
});
|
||||
_all_tables_flushed_at = db_clock::now();
|
||||
co_await _commitlog->wait_for_pending_deletes();
|
||||
dblog.info("Forcing new commitlog segment and flushing all tables complete");
|
||||
}
|
||||
|
||||
future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {
|
||||
|
||||
@@ -3385,16 +3385,15 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
continue;
|
||||
}
|
||||
|
||||
lister::scan_dir(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [datadir, &all_snapshots] (fs::path snapshots_dir, directory_entry de) {
|
||||
auto snapshot_name = de.name;
|
||||
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
||||
while (auto de = lister.get().get()) {
|
||||
auto snapshot_name = de->name;
|
||||
all_snapshots.emplace(snapshot_name, snapshot_details());
|
||||
return get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).then([&all_snapshots, snapshot_name] (auto details) {
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
}
|
||||
}
|
||||
return all_snapshots;
|
||||
});
|
||||
@@ -3402,38 +3401,61 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
|
||||
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
|
||||
table::snapshot_details details{};
|
||||
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
|
||||
if (!co_await file_exists(staging_dir->native())) {
|
||||
staging_dir.reset();
|
||||
}
|
||||
|
||||
co_await lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [datadir, &details] (fs::path snapshot_dir, directory_entry de) -> future<> {
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / de.name).native(), follow_symlink::no);
|
||||
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
while (auto de = co_await lister.get()) {
|
||||
const auto& name = de->name;
|
||||
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
|
||||
// See https://github.com/scylladb/seastar/pull/3163
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
|
||||
auto size = sd.allocated_size;
|
||||
|
||||
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
|
||||
//
|
||||
// All the others should just generate an exception: there is something wrong, so don't blindly
|
||||
// add it to the size.
|
||||
if (de.name != "manifest.json" && de.name != "schema.cql") {
|
||||
if (name != "manifest.json" && name != "schema.cql") {
|
||||
details.total += size;
|
||||
if (sd.number_of_links == 1) {
|
||||
// File exists only in the snapshot directory.
|
||||
details.live += size;
|
||||
continue;
|
||||
}
|
||||
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
|
||||
// So check the datadir for the file too.
|
||||
} else {
|
||||
size = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
|
||||
try {
|
||||
// File exists in the main SSTable directory. Snapshots are not contributing to size
|
||||
auto psd = co_await io_check(file_stat, (datadir / de.name).native(), follow_symlink::no);
|
||||
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
|
||||
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
|
||||
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
|
||||
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
|
||||
(datadir / de.name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / de.name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
details.live += size;
|
||||
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
co_return false;
|
||||
}
|
||||
} catch (std::system_error& e) {
|
||||
co_return true;
|
||||
} catch (std::system_error& e) {
|
||||
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
||||
throw;
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
};
|
||||
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
|
||||
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
|
||||
!co_await exists_in_dir(datadir / name)) {
|
||||
details.live += size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
co_return details;
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ seastar::future<> service::client_routes_service::set_client_routes_inner(const
|
||||
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
for (auto& entry : route_entries) {
|
||||
for (const auto& entry : route_entries) {
|
||||
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
|
||||
cmuts.emplace_back(std::move(mut));
|
||||
}
|
||||
@@ -103,24 +103,24 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
|
||||
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
seastar::future<> service::client_routes_service::set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_entries = std::move(route_entries)] {
|
||||
return cr.set_client_routes_inner(route_entries);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_keys = std::move(route_keys)] {
|
||||
return cr.delete_client_routes_inner(route_keys);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
|
||||
seastar::future<> service::client_routes_service::with_retry(Func func) const {
|
||||
int retries = 10;
|
||||
while (true) {
|
||||
try {
|
||||
|
||||
@@ -66,8 +66,8 @@ public:
|
||||
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
|
||||
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
|
||||
future<std::vector<client_route_entry>> get_client_routes() const;
|
||||
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
seastar::future<> set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries);
|
||||
seastar::future<> delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys);
|
||||
|
||||
|
||||
// notifications
|
||||
@@ -76,7 +76,7 @@ private:
|
||||
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
template <typename Func>
|
||||
seastar::future<> with_retry(Func&& func) const;
|
||||
seastar::future<> with_retry(Func func) const;
|
||||
|
||||
abort_source& _abort_source;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
@@ -344,3 +344,17 @@ void service::client_state::update_per_service_level_params(qos::service_level_o
|
||||
|
||||
_workload_type = slo.workload;
|
||||
}
|
||||
|
||||
future<> service::client_state::set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options) {
|
||||
for (const auto& [key, value] : client_options) {
|
||||
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "client_data.hh"
|
||||
|
||||
#include "transport/cql_protocol_extension.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -102,7 +103,8 @@ private:
|
||||
private volatile String keyspace;
|
||||
#endif
|
||||
std::optional<auth::authenticated_user> _user;
|
||||
std::optional<sstring> _driver_name, _driver_version;
|
||||
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
|
||||
std::list<client_option_key_value_cached_entry> _client_options;
|
||||
|
||||
auth_state _auth_state = auth_state::UNINITIALIZED;
|
||||
bool _control_connection = false;
|
||||
@@ -151,18 +153,33 @@ public:
|
||||
return _control_connection = true;
|
||||
}
|
||||
|
||||
std::optional<sstring> get_driver_name() const {
|
||||
std::optional<client_options_cache_entry_type> get_driver_name() const {
|
||||
return _driver_name;
|
||||
}
|
||||
void set_driver_name(sstring driver_name) {
|
||||
_driver_name = std::move(driver_name);
|
||||
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
|
||||
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<sstring> get_driver_version() const {
|
||||
const auto& get_client_options() const {
|
||||
return _client_options;
|
||||
}
|
||||
|
||||
future<> set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options);
|
||||
|
||||
std::optional<client_options_cache_entry_type> get_driver_version() const {
|
||||
return _driver_version;
|
||||
}
|
||||
void set_driver_version(sstring driver_version) {
|
||||
_driver_version = std::move(driver_version);
|
||||
future<> set_driver_version(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const sstring& driver_version)
|
||||
{
|
||||
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
}
|
||||
|
||||
client_state(external_tag,
|
||||
|
||||
@@ -779,18 +779,15 @@ public:
|
||||
index_reader(shared_sstable sst, reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state = {},
|
||||
use_caching caching = use_caching::yes,
|
||||
bool single_partition_read = false,
|
||||
use_caching use_partition_index_cache = use_caching::yes)
|
||||
bool single_partition_read = false)
|
||||
: _sstable(std::move(sst))
|
||||
, _permit(std::move(permit))
|
||||
, _trace_state(std::move(trace_state))
|
||||
// When use_partition_index_cache is true, use the global shared cache (_sstable->_index_cache).
|
||||
// When false, create a local non-shared cache that won't persist across reads.
|
||||
, _local_index_cache(use_partition_index_cache ? nullptr
|
||||
, _local_index_cache(caching ? nullptr
|
||||
: std::make_unique<partition_index_cache>(_sstable->manager().get_cache_tracker().get_lru(),
|
||||
_sstable->manager().get_cache_tracker().region(),
|
||||
_sstable->manager().get_cache_tracker().get_partition_index_cache_stats()))
|
||||
, _index_cache(use_partition_index_cache ? *_sstable->_index_cache : *_local_index_cache)
|
||||
, _index_cache(caching ? *_sstable->_index_cache : *_local_index_cache)
|
||||
, _alloc_section(abstract_formatter([sst = _sstable] (fmt::format_context& ctx) {
|
||||
fmt::format_to(ctx.out(), "index_reader {}", sst->get_filename());
|
||||
}))
|
||||
|
||||
@@ -1191,11 +1191,9 @@ private:
|
||||
}
|
||||
index_reader& get_index_reader() {
|
||||
if (!_index_reader) {
|
||||
auto bypass_cache = _slice.options.contains(query::partition_slice::option::bypass_cache);
|
||||
auto caching = use_caching(global_cache_index_pages && !bypass_cache);
|
||||
auto use_partition_index_cache = use_caching(global_partition_index_cache_enabled && !bypass_cache);
|
||||
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(),
|
||||
_consumer.trace_state(), caching, _single_partition_read, use_partition_index_cache);
|
||||
_consumer.trace_state(), caching, _single_partition_read);
|
||||
}
|
||||
return *_index_reader;
|
||||
}
|
||||
|
||||
@@ -105,17 +105,6 @@ namespace sstables {
|
||||
//
|
||||
thread_local utils::updateable_value<bool> global_cache_index_pages(true);
|
||||
|
||||
// The below flag governs whether the partition_index_cache is enabled.
|
||||
//
|
||||
// If set to true, partition index entries are cached in a global partition_index_cache.
|
||||
// If false, each index_reader creates its own local partition_index_cache that is
|
||||
// not shared and not persistent across reads.
|
||||
//
|
||||
// This is independent from global_cache_index_pages, which controls caching of
|
||||
// the index file pages themselves (cached_file).
|
||||
//
|
||||
thread_local utils::updateable_value<bool> global_partition_index_cache_enabled(true);
|
||||
|
||||
logging::logger sstlog("sstable");
|
||||
|
||||
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename) {
|
||||
@@ -2539,10 +2528,8 @@ sstable::make_reader(
|
||||
) {
|
||||
const auto reversed = slice.is_reversed();
|
||||
|
||||
auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
|
||||
auto index_caching = use_caching(global_cache_index_pages && !bypass_cache);
|
||||
auto partition_index_caching = use_caching(global_partition_index_cache_enabled && !bypass_cache);
|
||||
auto index_reader = make_index_reader(permit, trace_state, index_caching, range.is_singular(), partition_index_caching);
|
||||
auto index_caching = use_caching(global_cache_index_pages && !slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
auto index_reader = make_index_reader(permit, trace_state, index_caching, range.is_singular());
|
||||
|
||||
if (_version >= version_types::mc && (!reversed || range.is_singular())) {
|
||||
return mx::make_reader(
|
||||
@@ -3714,8 +3701,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
use_caching caching,
|
||||
bool single_partition_read,
|
||||
use_caching use_partition_index_cache
|
||||
bool single_partition_read
|
||||
) {
|
||||
if (!_index_file) {
|
||||
if (!_partitions_db_footer) [[unlikely]] {
|
||||
@@ -3751,7 +3737,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
|
||||
std::move(trace_state)
|
||||
);
|
||||
}
|
||||
return std::make_unique<index_reader>(shared_from_this(), std::move(permit), std::move(trace_state), caching, single_partition_read, use_partition_index_cache);
|
||||
return std::make_unique<index_reader>(shared_from_this(), std::move(permit), std::move(trace_state), caching, single_partition_read);
|
||||
}
|
||||
|
||||
// Returns error code, 0 is success
|
||||
|
||||
@@ -69,7 +69,6 @@ namespace sstables {
|
||||
struct abstract_index_reader;
|
||||
class sstable_directory;
|
||||
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
|
||||
extern thread_local utils::updateable_value<bool> global_partition_index_cache_enabled;
|
||||
|
||||
namespace mc {
|
||||
class writer;
|
||||
@@ -1084,8 +1083,7 @@ public:
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state = {},
|
||||
use_caching caching = use_caching::yes,
|
||||
bool single_partition_read = false,
|
||||
use_caching use_partition_index_cache = use_caching::yes);
|
||||
bool single_partition_read = false);
|
||||
|
||||
// Allow the test cases from sstable_test.cc to test private methods. We use
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
|
||||
@@ -604,18 +604,14 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
|
||||
service_level_names = [sl.service_level for sl in service_levels]
|
||||
assert "driver" not in service_level_names
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
|
||||
number_of_requests = 1000
|
||||
number_of_requests = 3000
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
logger.info(f"group={group}, tasks_processed={res}")
|
||||
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
@@ -627,8 +623,10 @@ async def _verify_tasks_processed_metrics(manager, server, used_group, unused_gr
|
||||
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
|
||||
tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
|
||||
assert tasks_processed_by_used_group - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert tasks_processed_by_unused_group - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:
|
||||
|
||||
@@ -50,7 +50,6 @@ class TestBypassCache(Tester):
|
||||
insert_data=True,
|
||||
smp=1,
|
||||
cache_index_pages=None,
|
||||
partition_index_cache_enabled=None,
|
||||
):
|
||||
self.keyspace_name = keyspace_name
|
||||
self.table_name = table_name
|
||||
@@ -60,8 +59,6 @@ class TestBypassCache(Tester):
|
||||
jvm_args = ["--smp", str(smp)]
|
||||
if cache_index_pages is not None:
|
||||
jvm_args += ["--cache-index-pages", "1" if cache_index_pages else "0"]
|
||||
if partition_index_cache_enabled is not None:
|
||||
jvm_args += ["--partition-index-cache-enabled", "1" if partition_index_cache_enabled else "0"]
|
||||
cluster.populate(nodes).start(jvm_args=jvm_args)
|
||||
node1 = cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node1)
|
||||
@@ -312,68 +309,3 @@ class TestBypassCache(Tester):
|
||||
alter_cmd = f"ALTER TABLE {self.keyspace_name}.{self.table_name} WITH CACHING = {{'enabled': 'false'}}"
|
||||
session.execute(alter_cmd)
|
||||
assert not self.verify_used_memory_grow(node=node, session=session), "expected to have writes without cache"
|
||||
|
||||
def test_partition_index_cache_disabled_independently(self):
|
||||
"""
|
||||
Test that partition_index_cache_enabled can be disabled independently of cache_index_pages.
|
||||
When partition_index_cache_enabled=False but cache_index_pages=True:
|
||||
- The cached_file cache (scylla_sstables_index_page_cache_*) should still be active
|
||||
- The partition_index_cache (scylla_sstables_index_page_*) should NOT be populated
|
||||
"""
|
||||
# Test with cache_index_pages=True but partition_index_cache_enabled=False
|
||||
session = self.prepare(insert_data=False, cache_index_pages=True, partition_index_cache_enabled=False)
|
||||
node = self.cluster.nodelist()[0]
|
||||
create_c1c2_table(session, cf=self.table_name)
|
||||
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
||||
node.flush()
|
||||
query = f"select * from {self.table_name}"
|
||||
|
||||
# When partition_index_cache is disabled:
|
||||
# - For BIG-index sstables (me format), scylla_sstables_index_page_hits should NOT increase
|
||||
# because partition_index_cache is disabled (but cached_file cache should work)
|
||||
# - For BTI-index sstables (ms format), only cached_file cache is used anyway
|
||||
if self.sstable_format != "ms":
|
||||
# For BIG-index format, verify partition_index_cache is not used
|
||||
errors = self.run_query_and_check_metrics(
|
||||
node,
|
||||
session,
|
||||
query,
|
||||
metrics_validators={
|
||||
"scylla_sstables_index_page_hits": self.gen_less_than(self.cache_thresh()),
|
||||
"scylla_sstables_index_page_cache_hits": "increased_by_at_least_1", # cached_file should still work
|
||||
"scylla_sstables_index_page_cache_misses": "increased_by_at_least_1",
|
||||
"scylla_sstables_index_page_cache_populations": "increased_by_at_least_1",
|
||||
},
|
||||
)
|
||||
assert not errors, "partition_index_cache should be disabled while cached_file cache is enabled:\n" + "\n".join(errors)
|
||||
|
||||
def test_partition_index_cache_enabled_independently(self):
|
||||
"""
|
||||
Test that partition_index_cache_enabled can be enabled independently of cache_index_pages.
|
||||
When partition_index_cache_enabled=True but cache_index_pages=False:
|
||||
- The cached_file cache (scylla_sstables_index_page_cache_*) should NOT be populated
|
||||
- The partition_index_cache (scylla_sstables_index_page_*) should still be active (for BIG-index)
|
||||
"""
|
||||
# Test with cache_index_pages=False but partition_index_cache_enabled=True
|
||||
session = self.prepare(insert_data=False, cache_index_pages=False, partition_index_cache_enabled=True)
|
||||
node = self.cluster.nodelist()[0]
|
||||
create_c1c2_table(session, cf=self.table_name)
|
||||
insert_c1c2(session, n=NUM_OF_QUERY_EXECUTIONS, cf=self.table_name, ks=self.keyspace_name)
|
||||
node.flush()
|
||||
query = f"select * from {self.table_name}"
|
||||
|
||||
# When cache_index_pages is disabled but partition_index_cache is enabled:
|
||||
# - For BIG-index sstables (me format), partition_index_cache should still populate
|
||||
# - For BTI-index sstables (ms format), there's no partition_index_cache to use
|
||||
if self.sstable_format != "ms":
|
||||
# For BIG-index format, verify partition_index_cache is used
|
||||
errors = self.run_query_and_check_metrics(
|
||||
node,
|
||||
session,
|
||||
query,
|
||||
metrics_validators={
|
||||
"scylla_sstables_index_page_hits": self.gen_more_than(self.cache_thresh()),
|
||||
"scylla_sstables_index_page_cache_hits": self.gen_less_than(self.cache_thresh()), # cached_file should not be used much
|
||||
},
|
||||
)
|
||||
assert not errors, "partition_index_cache should be enabled while cached_file cache is disabled:\n" + "\n".join(errors)
|
||||
|
||||
@@ -52,6 +52,18 @@ KNOWN_LOG_LEVELS = {
|
||||
"OFF": "info",
|
||||
}
|
||||
|
||||
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
|
||||
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
|
||||
|
||||
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Splits a "key : value" line into key and value.
|
||||
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
|
||||
|
||||
|
||||
class NodeError(Exception):
|
||||
def __init__(self, msg: str, process: int | None = None):
|
||||
@@ -528,6 +540,15 @@ class ScyllaNode:
|
||||
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
|
||||
|
||||
def stress(self, stress_options: list[str], **kwargs):
|
||||
"""
|
||||
Run `cassandra-stress` against this node.
|
||||
This method does not do any result parsing.
|
||||
|
||||
:param stress_options: List of options to pass to `cassandra-stress`.
|
||||
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
|
||||
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
|
||||
"""
|
||||
|
||||
cmd_args = ["cassandra-stress"] + stress_options
|
||||
|
||||
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
|
||||
@@ -549,6 +570,73 @@ class ScyllaNode:
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
def _set_stress_val(self, key, val, res):
|
||||
"""
|
||||
Normalize a stress result string and populate aggregate/read/write metrics.
|
||||
|
||||
Removes comma-thousands separators from numbers, converts to float,
|
||||
stores the aggregate metric under `key`.
|
||||
If the value contains a "[READ ..., WRITE ...]" block, also stores the
|
||||
read and write metrics under `key:read` and `key:write`.
|
||||
|
||||
:param key: The metric name
|
||||
:param val: The metric value string
|
||||
:param res: The dictionary to populate
|
||||
"""
|
||||
|
||||
def parse_num(s):
|
||||
return float(s.replace(',', ''))
|
||||
|
||||
if "[" in val:
|
||||
p = STRESS_SUMMARY_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key] = parse_num(m.group(1))
|
||||
p = STRESS_READ_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":read"] = parse_num(m.group(1))
|
||||
p = STRESS_WRITE_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":write"] = parse_num(m.group(1))
|
||||
else:
|
||||
try:
|
||||
res[key] = parse_num(val)
|
||||
except ValueError:
|
||||
res[key] = val
|
||||
|
||||
|
||||
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
|
||||
"""
|
||||
Run stress test and return results as a structured metrics dictionary.
|
||||
|
||||
Runs `stress()`, finds the `Results:` section in `stdout`, and then
|
||||
processes each `key : value` line, putting it into a dictionary.
|
||||
|
||||
:param stress_options: List of stress options to pass to `stress()`.
|
||||
:param ignore_errors: Deprecated (no effect).
|
||||
:param kwargs: Additional arguments to pass to `stress()`.
|
||||
:return: Dictionary of stress test results.
|
||||
"""
|
||||
if ignore_errors:
|
||||
self.warning("passing `ignore_errors` to stress_object() is deprecated")
|
||||
ret = self.stress(stress_options, **kwargs)
|
||||
p = STRESS_KEY_VALUE_PATTERN
|
||||
res = {}
|
||||
start = False
|
||||
for line in (s.strip() for s in ret.stdout.splitlines()):
|
||||
if start:
|
||||
m = p.match(line)
|
||||
if m:
|
||||
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
|
||||
else:
|
||||
if line == 'Results:':
|
||||
start = True
|
||||
return res
|
||||
|
||||
|
||||
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
|
||||
cmd = ["flush"]
|
||||
if ks:
|
||||
|
||||
690
test/cluster/dtest/schema_management_test.py
Normal file
690
test/cluster/dtest/schema_management_test.py
Normal file
@@ -0,0 +1,690 @@
|
||||
#
|
||||
# Copyright (C) 2015-present The Apache Software Foundation
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from concurrent import futures
|
||||
from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from cassandra.query import SimpleStatement, dict_factory
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from dtest_class import Tester, create_cf, create_ks, read_barrier
|
||||
from tools.assertions import assert_all, assert_invalid
|
||||
from tools.cluster_topology import generate_cluster_topology
|
||||
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestSchemaManagement(Tester):
|
||||
def prepare(self, racks_num: int, has_config: bool = True):
|
||||
cluster = self.cluster
|
||||
cluster_topology = generate_cluster_topology(rack_num=racks_num)
|
||||
|
||||
if has_config:
|
||||
config = {
|
||||
"ring_delay_ms": 5000,
|
||||
}
|
||||
cluster.set_configuration_options(values=config)
|
||||
|
||||
cluster.populate(cluster_topology)
|
||||
cluster.start(wait_other_notice=True)
|
||||
|
||||
return cluster
|
||||
|
||||
|
||||
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema...")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE users (
|
||||
id int,
|
||||
firstname text,
|
||||
lastname text,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
|
||||
insert_statement.consistency_level = ConsistencyLevel.ALL
|
||||
session.execute(insert_statement, [0])
|
||||
|
||||
logger.debug("Altering schema")
|
||||
session.execute("ALTER TABLE users WITH comment = 'updated'")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.stop(gently=True)
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("Restarting node3")
|
||||
node3.stop(gently=True)
|
||||
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
|
||||
|
||||
n_partitions = 20
|
||||
for i in range(n_partitions):
|
||||
session.execute(insert_statement, [i])
|
||||
|
||||
rows = session.execute("SELECT * FROM users")
|
||||
res = sorted(rows)
|
||||
assert len(res) == n_partitions
|
||||
for i in range(n_partitions):
|
||||
expected = [i, "A", "B"]
|
||||
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
|
||||
|
||||
def test_dropping_keyspace_with_many_columns(self):
|
||||
"""
|
||||
Exploits https://github.com/scylladb/scylla/issues/1484
|
||||
"""
|
||||
cluster = self.prepare(racks_num=1, has_config=False)
|
||||
|
||||
node1 = cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
for i in range(8):
|
||||
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
|
||||
session.execute("drop keyspace testxyz")
|
||||
|
||||
for node in cluster.nodelist():
|
||||
s = self.patient_cql_connection(node)
|
||||
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
s.execute("drop keyspace testxyz")
|
||||
|
||||
def test_multiple_create_table_in_parallel(self):
|
||||
"""
|
||||
Run multiple create table statements via different nodes
|
||||
1. Create a cluster of 3 nodes
|
||||
2. Run create table with different table names in parallel - check all complete
|
||||
3. Run create table with the same table name in parallel - check if they complete
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes")
|
||||
nodes_count = 3
|
||||
cluster = self.prepare(racks_num=nodes_count)
|
||||
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
|
||||
ks = "ks"
|
||||
create_ks(sessions[0], ks, nodes_count)
|
||||
|
||||
def create_table(session, table_name):
|
||||
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
|
||||
logger.debug(f"create_statement {create_statement}")
|
||||
session.execute(create_statement)
|
||||
|
||||
logger.debug("2. Run create table with different table names in parallel - check all complete")
|
||||
step2_tables = [f"t{i}" for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
list(executor.map(create_table, sessions, step2_tables))
|
||||
|
||||
for table in step2_tables:
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
|
||||
step3_table = "test"
|
||||
step3_tables = [step3_table for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
|
||||
for res_future in res_futures:
|
||||
try:
|
||||
res_future.result()
|
||||
except AlreadyExists as e:
|
||||
logger.info(f"expected cassandra.AlreadyExists error {e}")
|
||||
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
|
||||
def test_alter_table_in_parallel_to_read_and_write(self, case):
|
||||
"""
|
||||
Create a table and write into while altering the table
|
||||
1. Create a cluster of 3 nodes and populate a table
|
||||
2. Run write/read/read_and_write" statement in a loop
|
||||
3. Alter table while inserts are running
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes and populate a table")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
col_number = 20
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
|
||||
def run_stress(stress_type, col=col_number - 2):
|
||||
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
|
||||
|
||||
logger.debug("Populate")
|
||||
run_stress("write", col_number)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. Run {case} statement in a loop")
|
||||
statement_future = executor.submit(functools.partial(run_stress, case))
|
||||
|
||||
logger.debug(f"let's {case} statement work some time")
|
||||
time.sleep(2)
|
||||
|
||||
logger.debug("3. Alter table while inserts are running")
|
||||
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
|
||||
logger.debug(f"alter_statement {alter_statement}")
|
||||
alter_result = session.execute(alter_statement)
|
||||
logger.debug(alter_result.all())
|
||||
|
||||
logger.debug(f"wait till {case} statement finished")
|
||||
statement_future.result()
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
|
||||
|
||||
logger.debug("read and check data")
|
||||
run_stress("read")
|
||||
|
||||
@pytest.mark.skip("unimplemented")
|
||||
def commitlog_replays_after_schema_change(self):
|
||||
"""
|
||||
Commitlog can be replayed even though schema has been changed
|
||||
1. Create a table and insert data
|
||||
2. Alter table
|
||||
3. Kill node
|
||||
4. Boot node and verify that commitlog have been replayed and that all data is restored
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
|
||||
def test_update_schema_while_node_is_killed(self, case):
|
||||
"""
|
||||
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
def create_table_case():
|
||||
try:
|
||||
logger.debug("Creating table")
|
||||
create_c1c2_table(session)
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10)
|
||||
except AlreadyExists:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
pass
|
||||
|
||||
def alter_table_case():
|
||||
try:
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Invalid column name c3" in str(exc)
|
||||
|
||||
def drop_table_case():
|
||||
try:
|
||||
session.execute("DROP TABLE cf;", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Cannot drop non existing table" in str(exc)
|
||||
|
||||
logger.debug("Creating keyspace")
|
||||
create_ks(session, "ks", 3)
|
||||
if case != "create_table":
|
||||
create_table_case()
|
||||
|
||||
case_map = {
|
||||
"create_table": create_table_case,
|
||||
"alter_table": alter_table_case,
|
||||
"drop_table": drop_table_case,
|
||||
}
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. kill node during {case}")
|
||||
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
|
||||
case_map[case]()
|
||||
kill_node_future.result()
|
||||
|
||||
logger.debug("3. Start the stopped node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
session = self.patient_exclusive_cql_connection(node2)
|
||||
read_barrier(session)
|
||||
|
||||
def create_or_alter_table_expected_result(col_mun):
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
|
||||
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
|
||||
|
||||
expected_case_result_map = {
|
||||
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
|
||||
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
|
||||
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
|
||||
}
|
||||
logger.debug("verify that commitlog has been replayed and that all data is restored")
|
||||
expected_case_result_map[case]()
|
||||
|
||||
@pytest.mark.parametrize("is_gently_stop", [True, False])
|
||||
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
|
||||
"""
|
||||
Nodes rejoining the cluster synch on schema changes
|
||||
1. Create a cluster and insert data
|
||||
2. Stop a node
|
||||
3. Alter table
|
||||
4. Insert additional data
|
||||
5. Start the stopped node
|
||||
6. Verify the stopped node synchs on the updated schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
create_c1c2_table(session)
|
||||
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
|
||||
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
logger.debug("2 Stop a node1")
|
||||
node1.stop(gently=is_gently_stop, wait_other_notice=True)
|
||||
|
||||
logger.debug("3 Alter table")
|
||||
session = self.patient_cql_connection(node2)
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
|
||||
logger.debug("4 Insert additional data")
|
||||
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
|
||||
|
||||
logger.debug("5. Start the stopped node1")
|
||||
node1.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("6. Verify the stopped node synchs on the updated schema")
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
read_barrier(session)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [["test", "test", "test", "test"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
def test_reads_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
|
||||
|
||||
def test_writes_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True, wait_other_notice=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [[2, "2"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
|
||||
|
||||
|
||||
class TestLargePartitionAlterSchema(Tester):
|
||||
# Issue scylladb/scylla: #5135:
|
||||
#
|
||||
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
|
||||
# partition entry update
|
||||
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
|
||||
# entry being preempted.
|
||||
#
|
||||
# The scenario in which the problem could have happened has to involve:
|
||||
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
|
||||
# - appending writes to the partition (not overwrites)
|
||||
# - scans of the partition
|
||||
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
|
||||
# column lands in the middle (in alphabetical order) of the old column set.
|
||||
#
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read.
|
||||
#
|
||||
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
|
||||
|
||||
PARTITIONS = 50
|
||||
STRING_VALUE = string.ascii_lowercase
|
||||
|
||||
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
|
||||
if not self.cluster.nodelist():
|
||||
self.cluster.populate(cluster_topology)
|
||||
self.cluster.start(wait_other_notice=True)
|
||||
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node=node1)
|
||||
self.create_schema(session=session, rf=rf)
|
||||
|
||||
return session
|
||||
|
||||
def create_schema(self, session, rf):
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session=session, name="ks", rf=rf)
|
||||
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE lp_table (
|
||||
pk int,
|
||||
ck1 int,
|
||||
val1 text,
|
||||
val2 text,
|
||||
PRIMARY KEY (pk, ck1)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
|
||||
ck = ck_start
|
||||
def _populate_loop():
|
||||
nonlocal ck
|
||||
while True:
|
||||
if stop_populating is not None and stop_populating.is_set():
|
||||
return
|
||||
if ck_end is not None and ck >= ck_end:
|
||||
return
|
||||
for pk in range(self.PARTITIONS):
|
||||
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
|
||||
data.append(row)
|
||||
yield tuple(row)
|
||||
ck += 1
|
||||
|
||||
records_written = ck - ck_start
|
||||
|
||||
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
|
||||
|
||||
parameters = _populate_loop()
|
||||
|
||||
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
|
||||
|
||||
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
|
||||
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
|
||||
return data
|
||||
|
||||
def read(self, session, ck_max, stop_reading: threading.Event = None):
|
||||
def _read_loop():
|
||||
while True:
|
||||
for ck in range(ck_max):
|
||||
for pk in range(self.PARTITIONS):
|
||||
if stop_reading is not None and stop_reading.is_set():
|
||||
return
|
||||
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
|
||||
if stop_reading is None:
|
||||
return
|
||||
|
||||
logger.debug(f"Start reading..")
|
||||
_read_loop()
|
||||
logger.debug(f"Finish reading..")
|
||||
|
||||
def add_column(self, session, column_name, column_type):
|
||||
logger.debug(f"Add {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
|
||||
|
||||
def drop_column(self, session, column_name):
|
||||
logger.debug(f"Drop {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
|
||||
|
||||
def test_large_partition_with_add_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.add_column(session, "new_clmn", "int")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
future.result()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
# Add 'null' values for the new column `new_clmn` in the expected data
|
||||
for i, _ in enumerate(data):
|
||||
data[i].append(None)
|
||||
|
||||
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
|
||||
|
||||
def test_large_partition_with_drop_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.drop_column(session=session, column_name="val1")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
result = []
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
result.append(future.result())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# "Unknown identifier val1" is expected error
|
||||
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
|
||||
class HistoryVerifier:
|
||||
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
|
||||
"""
|
||||
Initialize parameters for further verification of schema history.
|
||||
:param table_name: table thats we change it's schema and verify schema history accordingly.
|
||||
"""
|
||||
|
||||
self.table_name = table_name
|
||||
self.keyspace_name = keyspace_name
|
||||
self.versions = []
|
||||
self.versions_dict = {}
|
||||
self.query = ""
|
||||
|
||||
def verify(self, session, expected_current_diff, expected_prev_diff, query):
|
||||
"""
|
||||
Verify current schema history entry by comparing to previous schema entry.
|
||||
:param session: python cql session
|
||||
:param expected_current_diff: difference of current schema from previous schema
|
||||
:param expected_prev_diff: difference of previous schema from current schema
|
||||
:param query: The query that created new schema
|
||||
"""
|
||||
|
||||
def get_table_id(session, keyspace_name, table_name):
|
||||
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
|
||||
assert table_name, f"Input table_name should have value, table_name={table_name}"
|
||||
query = "select keyspace_name,table_name,id from system_schema.tables"
|
||||
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
|
||||
current_rows = session.execute(query).current_rows
|
||||
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
|
||||
res = current_rows[0]
|
||||
return res["id"]
|
||||
|
||||
def read_schema_history_table(session, cf_id):
|
||||
"""
|
||||
read system.scylla_table_schema_history and verify current version diff from previous vesion
|
||||
:param session: python cql session
|
||||
:param cf_id: uuid of the table we changed it's schema
|
||||
"""
|
||||
|
||||
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
|
||||
res = session.execute(query).current_rows
|
||||
new_versions = list({
|
||||
entry["schema_version"]
|
||||
for entry in res
|
||||
if str(entry["schema_version"]) not in self.versions
|
||||
})
|
||||
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
|
||||
assert len(new_versions) == 1, msg
|
||||
current_version = str(new_versions[0])
|
||||
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
|
||||
columns_list = (
|
||||
{"column_name": entry["column_name"], "type": entry["type"]}
|
||||
for entry in res
|
||||
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
|
||||
)
|
||||
self.versions_dict[current_version] = {}
|
||||
for item in columns_list:
|
||||
self.versions_dict[current_version][item["column_name"]] = item["type"]
|
||||
|
||||
self.versions.append(current_version)
|
||||
if len(self.versions) > 1:
|
||||
current_id = self.versions[-1]
|
||||
previous_id = self.versions[-2]
|
||||
set_current = set(self.versions_dict[current_id].items())
|
||||
set_previous = set(self.versions_dict[previous_id].items())
|
||||
current_diff = set_current - set_previous
|
||||
previous_diff = set_previous - set_current
|
||||
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
|
||||
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
|
||||
if current_diff != expected_current_diff:
|
||||
logger.debug(msg1 + msg2)
|
||||
assert current_diff == expected_current_diff, msg1 + msg2
|
||||
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
|
||||
assert previous_diff == expected_prev_diff, msg1 + msg2
|
||||
|
||||
self.query = query
|
||||
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
|
||||
read_schema_history_table(session, cf_id)
|
||||
|
||||
|
||||
class DDL(NamedTuple):
|
||||
ddl_command: str
|
||||
expected_current_diff: set | None
|
||||
expected_prev_diff: set | None
|
||||
|
||||
|
||||
class TestSchemaHistory(Tester):
|
||||
def prepare(self):
|
||||
cluster = self.cluster
|
||||
# in case support tablets and rf-rack-valid-keyspaces
|
||||
# create cluster with 3 racks with 1 node in each rack
|
||||
cluster_topology = generate_cluster_topology(rack_num=3)
|
||||
rf = 3
|
||||
cluster.populate(cluster_topology).start(wait_other_notice=True)
|
||||
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
|
||||
create_ks(self.session, "lwt_load_ks", rf)
|
||||
|
||||
def test_schema_history_alter_table(self):
|
||||
"""test schema history changes following alter table cql commands"""
|
||||
self.prepare()
|
||||
verifier = HistoryVerifier(table_name="table2")
|
||||
queries_and_expected_diffs = [
|
||||
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
|
||||
]
|
||||
for ddl in queries_and_expected_diffs:
|
||||
self.session.execute(ddl.ddl_command)
|
||||
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)
|
||||
@@ -218,6 +218,18 @@ def assert_row_count_in_select_less(
|
||||
assert count < max_rows_expected, f'Expected a row count < of {max_rows_expected} in query "{query}", but got {count}'
|
||||
|
||||
|
||||
def assert_length_equal(object_with_length, expected_length):
|
||||
"""
|
||||
Assert an object has a specific length.
|
||||
@param object_with_length The object whose length will be checked
|
||||
@param expected_length The expected length of the object
|
||||
|
||||
Examples:
|
||||
assert_length_equal(res, nb_counter)
|
||||
"""
|
||||
assert len(object_with_length) == expected_length, f"Expected {object_with_length} to have length {expected_length}, but instead is of length {len(object_with_length)}"
|
||||
|
||||
|
||||
def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
|
||||
"""
|
||||
asserts that the contents of the two provided lists are equal
|
||||
|
||||
@@ -14,6 +14,7 @@ from cassandra.query import SimpleStatement
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
|
||||
from test.cluster.dtest.dtest_class import create_cf
|
||||
from test.cluster.dtest.tools import assertions
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -51,6 +52,27 @@ def insert_c1c2( # noqa: PLR0913
|
||||
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
|
||||
|
||||
|
||||
def query_c1c2( # noqa: PLR0913
|
||||
session,
|
||||
key,
|
||||
consistency=ConsistencyLevel.QUORUM,
|
||||
tolerate_missing=False,
|
||||
must_be_missing=False,
|
||||
c1_value="value1",
|
||||
c2_value="value2",
|
||||
ks="ks",
|
||||
cf="cf",
|
||||
):
|
||||
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
|
||||
rows = list(session.execute(query))
|
||||
if not tolerate_missing and not must_be_missing:
|
||||
assertions.assert_length_equal(rows, 1)
|
||||
res = rows[0]
|
||||
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
|
||||
if must_be_missing:
|
||||
assertions.assert_length_equal(rows, 0)
|
||||
|
||||
|
||||
def rows_to_list(rows):
|
||||
new_list = [list(row) for row in rows]
|
||||
return new_list
|
||||
|
||||
@@ -131,8 +131,9 @@ async def test_backup_move(manager: ManagerClient, object_storage, move_files):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the destination bucket does not exist'''
|
||||
@pytest.mark.parametrize("ne_parameter", [ "endpoint", "bucket", "snapshot" ])
|
||||
async def test_backup_with_non_existing_parameters(manager: ManagerClient, object_storage, ne_parameter):
|
||||
'''backup should fail if either of the parameters does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
@@ -142,7 +143,8 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_stor
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
backup_snap_name = 'backup'
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name = backup_snap_name)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
@@ -150,39 +152,18 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_stor
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', object_storage.address, "non-existant-bucket", prefix)
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf,
|
||||
backup_snap_name if ne_parameter != 'snapshot' else 'no-such-snapshot',
|
||||
object_storage.address if ne_parameter != 'endpoint' else 'no-such-endpoint',
|
||||
object_storage.bucket_name if ne_parameter != 'bucket' else 'no-such-bucket',
|
||||
prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
#assert 'S3 request failed. Code: 15. Reason: Access Denied.' in status['error']
|
||||
if ne_parameter == 'endpoint':
|
||||
assert status['error'] == 'std::invalid_argument (endpoint no-such-endpoint not found)'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_endpoint(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the endpoint is invalid/inaccessible'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', "does_not_exist", object_storage.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
assert status['error'] == 'std::invalid_argument (endpoint does_not_exist not found)'
|
||||
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
@@ -236,38 +217,6 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the snapshot does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'nonexistent-snapshot',
|
||||
object_storage.address, object_storage.bucket_name, prefix)
|
||||
# The task is expected to fail immediately due to invalid snapshot name.
|
||||
# However, since internal implementation details may change, we'll wait for
|
||||
# task completion if immediate failure doesn't occur.
|
||||
actual_state = None
|
||||
for status_api in [manager.api.get_task_status,
|
||||
manager.api.wait_task]:
|
||||
status = await status_api(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
actual_state = status['state']
|
||||
if actual_state == 'failed':
|
||||
break
|
||||
else:
|
||||
assert actual_state == 'failed'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, object_storage):
|
||||
|
||||
@@ -181,11 +181,14 @@ async def test_random_failures(manager: ManagerClient,
|
||||
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
|
||||
await manager.server_stop(server_id=s_info.server_id)
|
||||
|
||||
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
|
||||
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
|
||||
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("Wait until the new node initialization completes or fails.")
|
||||
await server_log.wait_for("init - (Startup failed:|Scylla version .* initialization completed)", timeout=120)
|
||||
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
|
||||
|
||||
if await server_log.grep("init - Startup failed:"):
|
||||
if await server_log.grep(STARTUP_FAILED_PATTERN):
|
||||
LOGGER.info("Check that the new node is dead.")
|
||||
expected_statuses = [psutil.STATUS_DEAD]
|
||||
else:
|
||||
@@ -216,7 +219,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
else:
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("The new node is dead. Check if it failed to startup.")
|
||||
assert await server_log.grep("init - Startup failed:")
|
||||
assert await server_log.grep(STARTUP_FAILED_PATTERN)
|
||||
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
|
||||
|
||||
LOGGER.info("Try to remove the dead new node from the cluster.")
|
||||
|
||||
@@ -26,6 +26,7 @@ skip_in_release:
|
||||
- test_raft_cluster_features
|
||||
- test_cluster_features
|
||||
- dtest/limits_test
|
||||
- dtest/schema_management_test
|
||||
skip_in_debug:
|
||||
- test_shutdown_hang
|
||||
- test_replace
|
||||
|
||||
@@ -146,13 +146,13 @@ async def test_joining_old_node_fails(manager: ManagerClient) -> None:
|
||||
|
||||
# Try to add a node that doesn't support the feature - should fail
|
||||
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
|
||||
# Try to replace with a node that doesn't support the feature - should fail
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
|
||||
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -131,7 +131,7 @@ async def test_major_compaction_flush_all_tables(manager: ManagerClient, compact
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks, cf)
|
||||
|
||||
flush_log = await log.grep("Forcing new commitlog segment and flushing all tables", from_mark=mark)
|
||||
assert len(flush_log) == (1 if expect_all_table_flush else 0)
|
||||
assert len(flush_log) == (2 if expect_all_table_flush else 0)
|
||||
|
||||
# all tables should be flushed the first time unless compaction_flush_all_tables_before_major_seconds == 0
|
||||
await check_all_table_flush_in_major_compaction(compaction_flush_all_tables_before_major_seconds != 0)
|
||||
|
||||
@@ -74,7 +74,6 @@ def test_cast_int_literal_with_type_hint_to_blob(cql, table1, scylla_only):
|
||||
# An int can always be converted to a valid blob, but blobs might have wrong amount of bytes
|
||||
# and can't be converted to a valid int.
|
||||
def test_cast_blob_literal_to_int(cql, table1):
|
||||
pk = unique_key_int()
|
||||
with pytest.raises(InvalidRequest, match='HEX'):
|
||||
cql.execute(f"INSERT INTO {table1} (pk) VALUES (0xBAAAAAAD)")
|
||||
with pytest.raises(InvalidRequest, match='blob'):
|
||||
|
||||
@@ -61,7 +61,7 @@ def test_select_default_order(cql, table_int_desc):
|
||||
def test_multi_column_relation_desc(cql, table2):
|
||||
k = unique_key_int()
|
||||
stmt = cql.prepare(f'INSERT INTO {table2} (p, c1, c2) VALUES (?, ?, ?)')
|
||||
cql.execute(stmt, [0, 1, 0])
|
||||
cql.execute(stmt, [0, 1, 1])
|
||||
cql.execute(stmt, [0, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = 0 AND (c1, c2) >= (1, 1)'))
|
||||
cql.execute(stmt, [k, 1, 0])
|
||||
cql.execute(stmt, [k, 1, 1])
|
||||
cql.execute(stmt, [k, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = {k} AND (c1, c2) >= (1, 1)'))
|
||||
|
||||
@@ -352,7 +352,7 @@ def test_storage_options_alter_type(cql, scylla_only):
|
||||
ksdef_local = "WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : '1' } " \
|
||||
"AND STORAGE = { 'type' : 'S3', 'bucket' : '/b1', 'endpoint': 'localhost'}"
|
||||
with pytest.raises(InvalidRequest):
|
||||
res = cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
|
||||
# Reproducer for scylladb#14139
|
||||
def test_alter_keyspace_preserves_udt(cql):
|
||||
|
||||
@@ -171,7 +171,6 @@ def test_grant_revoke_data_permissions(cql, test_keyspace):
|
||||
# Test that permissions for user-defined functions are serialized in a Cassandra-compatible way
|
||||
def test_udf_permissions_serialization(cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace, new_user(cql) as user:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
# Creating a bilingual function makes this test case work for both Scylla and Cassandra
|
||||
@@ -247,7 +246,6 @@ def test_udf_permissions_quoted_names(cassandra_bug, cql):
|
||||
# permissions. Cassandra erroneously reports the unrelated missing permissions.
|
||||
# Reported to Cassandra as CASSANDRA-19005.
|
||||
def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
schema = "a int primary key"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
|
||||
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42L;'"
|
||||
@@ -288,7 +286,6 @@ def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
# Tests for ALTER are separate, because they are qualified as cassandra_bug
|
||||
def test_grant_revoke_udf_permissions(cql):
|
||||
schema = "a int primary key, b list<int>"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int, l list<int>) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
@@ -335,7 +332,6 @@ def test_grant_revoke_udf_permissions(cql):
|
||||
# and yet it's not enforced
|
||||
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
|
||||
@@ -90,8 +90,6 @@ def test_attached_service_level(scylla_only, cql):
|
||||
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
||||
|
||||
def test_list_effective_service_level(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
timeout = "10s"
|
||||
workload_type = "batch"
|
||||
|
||||
@@ -120,8 +118,6 @@ def test_list_effective_service_level(scylla_only, cql):
|
||||
assert row.value == "batch"
|
||||
|
||||
def test_list_effective_service_level_shares(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
shares1 = 500
|
||||
shares2 = 200
|
||||
|
||||
@@ -184,8 +180,6 @@ def test_default_shares_in_listings(scylla_only, cql):
|
||||
# and that the messages Scylla returns are informative.
|
||||
def test_manipulating_default_service_level(cql, scylla_only):
|
||||
default_sl = "default"
|
||||
# Service levels are case-sensitive (if used with quotation marks).
|
||||
fake_default_sl = '"DeFaUlT"'
|
||||
|
||||
with new_user(cql) as role:
|
||||
# Creation.
|
||||
|
||||
@@ -76,6 +76,7 @@ def test_clients(scylla_only, cql):
|
||||
'ssl_enabled',
|
||||
'ssl_protocol',
|
||||
'username',
|
||||
'client_options',
|
||||
])
|
||||
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
|
||||
# There must be at least one connection - the one that sent this SELECT
|
||||
@@ -84,6 +85,9 @@ def test_clients(scylla_only, cql):
|
||||
for cl in cls:
|
||||
assert(cl[0] == '127.0.0.1')
|
||||
assert(cl[2] == 'cql')
|
||||
client_options = cl[13]
|
||||
assert(client_options.get('DRIVER_NAME') == cl[4])
|
||||
assert(client_options.get('DRIVER_VERSION') == cl[5])
|
||||
|
||||
# We only want to check that the table exists with the listed columns, to assert
|
||||
# backwards compatibility.
|
||||
|
||||
@@ -23,7 +23,7 @@ def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
|
||||
try:
|
||||
f42 = unique_name()
|
||||
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
|
||||
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
|
||||
except NoHostAvailable as err:
|
||||
if "not enabled" in str(err):
|
||||
@@ -373,8 +373,7 @@ def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
assert len(res) == 1 and res[0].result == 177147
|
||||
|
||||
# Test that only compilable input is accepted
|
||||
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
def test_compilable(cql, test_keyspace, scylla_with_wasm_only):
|
||||
wrong_source = f"""
|
||||
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
|
||||
"""
|
||||
@@ -384,8 +383,7 @@ Dear wasmtime compiler, please return a function which returns its float argumen
|
||||
|
||||
# Test that not exporting a function with matching name
|
||||
# results in an error
|
||||
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
@@ -403,8 +401,7 @@ def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
f"AS '{wrong_source}'")
|
||||
|
||||
# Test that trying to use something that is exported, but is not a function, won't work
|
||||
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
def test_not_a_function(cql, test_keyspace, scylla_with_wasm_only):
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
|
||||
@@ -49,6 +49,9 @@ RUN_ID = pytest.StashKey[int]()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Store pytest config globally so we can access it in hooks that only receive report
|
||||
_pytest_config: pytest.Config | None = None
|
||||
|
||||
|
||||
def pytest_addoption(parser: pytest.Parser) -> None:
|
||||
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
|
||||
@@ -184,6 +187,52 @@ def pytest_sessionstart(session: pytest.Session) -> None:
|
||||
)
|
||||
|
||||
|
||||
@pytest.hookimpl(trylast=True)
|
||||
def pytest_runtest_logreport(report):
|
||||
"""Add custom XML attributes to JUnit testcase elements.
|
||||
|
||||
This hook wraps the node_reporter's to_xml method to add custom attributes
|
||||
when the XML element is created. This approach works with pytest-xdist because
|
||||
it modifies the XML element directly when it's generated, rather than trying
|
||||
to modify attrs before finalize() is called.
|
||||
|
||||
Attributes added:
|
||||
- function_path: The function path of the test case (excluding parameters).
|
||||
|
||||
Uses trylast=True to run after LogXML's hook has created the node_reporter.
|
||||
"""
|
||||
from _pytest.junitxml import xml_key
|
||||
|
||||
# Only process call phase
|
||||
if report.when != "call":
|
||||
return
|
||||
|
||||
# Get the XML reporter
|
||||
config = _pytest_config
|
||||
if config is None:
|
||||
return
|
||||
|
||||
xml = config.stash.get(xml_key, None)
|
||||
if xml is None:
|
||||
return
|
||||
|
||||
node_reporter = xml.node_reporter(report)
|
||||
|
||||
nodeid = report.nodeid
|
||||
function_path = f'test/{nodeid.rsplit('.', 2)[0].rsplit('[', 1)[0]}'
|
||||
|
||||
# Wrap the to_xml method to add custom attributes to the element
|
||||
original_to_xml = node_reporter.to_xml
|
||||
|
||||
def custom_to_xml():
|
||||
"""Wrapper that adds custom attributes to the testcase element."""
|
||||
element = original_to_xml()
|
||||
element.set("function_path", function_path)
|
||||
return element
|
||||
|
||||
node_reporter.to_xml = custom_to_xml
|
||||
|
||||
|
||||
def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
if not session.config.getoption("--test-py-init"):
|
||||
return
|
||||
@@ -196,6 +245,9 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
|
||||
|
||||
def pytest_configure(config: pytest.Config) -> None:
|
||||
global _pytest_config
|
||||
_pytest_config = config
|
||||
|
||||
config.build_modes = get_modes_to_run(config)
|
||||
|
||||
if testpy_run_id := config.getoption("--run_id"):
|
||||
|
||||
@@ -243,7 +243,7 @@ async def get_scylla_2025_1_executable(build_mode: str) -> str:
|
||||
if not unpacked_marker.exists():
|
||||
if not downloaded_marker.exists():
|
||||
archive_path.unlink(missing_ok=True)
|
||||
await run_process(["curl", "--silent", "--show-error", "--output", archive_path, url])
|
||||
await run_process(["curl", "--retry", "10", "--fail", "--silent", "--show-error", "--output", archive_path, url])
|
||||
downloaded_marker.touch()
|
||||
shutil.rmtree(unpack_dir, ignore_errors=True)
|
||||
unpack_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
@@ -40,16 +40,8 @@ struct test_pinger: public direct_failure_detector::pinger {
|
||||
co_return;
|
||||
}
|
||||
|
||||
promise<> p;
|
||||
auto f = p.get_future();
|
||||
auto sub = as.subscribe([&, p = std::move(p)] () mutable noexcept {
|
||||
p.set_value();
|
||||
});
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
co_await std::move(f);
|
||||
throw abort_requested_exception{};
|
||||
// Simulate a blocking ping that only returns when aborted.
|
||||
co_await sleep_abortable(std::chrono::hours(1), as);
|
||||
}, as);
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -2930,6 +2930,18 @@ private:
|
||||
|
||||
static constexpr elem_t magic = 54313;
|
||||
|
||||
static void check_digest_value(elem_t d) {
|
||||
if (d < 0 || d >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value out of range: {}", d));
|
||||
}
|
||||
}
|
||||
|
||||
static void validate_digest_value(elem_t d_new, elem_t d_old, elem_t x) {
|
||||
if (d_new < 0 || d_new >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value invalid after appending/removing element: d_new {}, d_old {}, x {}", d_new, d_old, x));
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
append_seq(std::vector<elem_t> v) : _seq{make_lw_shared<std::vector<elem_t>>(std::move(v))}, _end{_seq->size()}, _digest{0} {
|
||||
for (auto x : *_seq) {
|
||||
@@ -2938,20 +2950,26 @@ public:
|
||||
}
|
||||
|
||||
static elem_t digest_append(elem_t d, elem_t x) {
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
check_digest_value(d);
|
||||
|
||||
auto y = (d + x) % magic;
|
||||
SCYLLA_ASSERT(digest_remove(y, x) == d);
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
}
|
||||
|
||||
static elem_t digest_remove(elem_t d, elem_t x) {
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
check_digest_value(d);
|
||||
|
||||
auto y = (d - x) % magic;
|
||||
return y < 0 ? y + magic : y;
|
||||
|
||||
if (y < 0) {
|
||||
y += magic;
|
||||
}
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
}
|
||||
|
||||
elem_t digest() const {
|
||||
|
||||
@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
|
||||
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
|
||||
|
||||
|
||||
class random_content_file:
|
||||
class RandomContentFile:
|
||||
def __init__(self, path: str, size_in_bytes: int):
|
||||
path = pathlib.Path(path)
|
||||
self.filename = path if path.is_file() else path / str(uuid.uuid4())
|
||||
@@ -64,11 +64,11 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
for server in servers:
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks)
|
||||
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
|
||||
|
||||
try:
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
cmdline = [*global_cmdline,
|
||||
"--logger-log-level", "compaction=debug"]
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
|
||||
@@ -134,7 +134,7 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
|
||||
|
||||
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
s2_mark = await s2_log.mark()
|
||||
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
|
||||
|
||||
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
|
||||
|
||||
|
||||
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
|
||||
|
||||
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
|
||||
mark = await log.mark()
|
||||
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
|
||||
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
|
||||
await manager.server_restart(servers[0].server_id, wait_others=2)
|
||||
|
||||
|
||||
@@ -198,47 +198,62 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
auto cfg = config();
|
||||
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
|
||||
auto vs = vector_store_client{cfg};
|
||||
auto count = 0;
|
||||
bool fail_dns_resolution = true;
|
||||
auto as = abort_source_timeout();
|
||||
auto address = inet_address("127.0.0.1");
|
||||
configure(vs)
|
||||
.with_dns_refresh_interval(milliseconds(10))
|
||||
.with_wait_for_client_timeout(milliseconds(20))
|
||||
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
|
||||
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
|
||||
BOOST_CHECK_EQUAL(host, "good.authority.here");
|
||||
count++;
|
||||
if (count % 3 != 0) {
|
||||
if (fail_dns_resolution) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
co_return inet_address(format("127.0.0.{}", count));
|
||||
co_return address;
|
||||
});
|
||||
|
||||
vs.start_background_tasks();
|
||||
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
BOOST_CHECK_EQUAL(count, 3);
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
|
||||
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
BOOST_CHECK_EQUAL(count, 6);
|
||||
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
|
||||
fail_dns_resolution = true;
|
||||
// Trigger DNS resolver to check for address changes
|
||||
// Resolver will not re-check automatically after successful resolution
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail again
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
// Resolve to a different address
|
||||
address = inet_address("127.0.0.2");
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
|
||||
|
||||
co_await vs.stop();
|
||||
}
|
||||
|
||||
@@ -353,7 +353,7 @@ future<> controller::set_cql_ready(bool ready) {
|
||||
return _gossiper.local().add_local_application_state(gms::application_state::RPC_READY, gms::versioned_value::cql_ready(ready));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
return _server ? _server->local().get_client_data() : protocol_server::get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ public:
|
||||
virtual future<> start_server() override;
|
||||
virtual future<> stop_server() override;
|
||||
virtual future<> request_stop_server() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
future<> update_connections_scheduling_group();
|
||||
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/net/socket_defs.hh>
|
||||
#include <vector>
|
||||
#include "client_data.hh"
|
||||
@@ -43,8 +44,8 @@ public:
|
||||
/// This variant is used by the REST API so failure is acceptable.
|
||||
virtual future<> request_stop_server() = 0;
|
||||
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<client_data>>(utils::chunked_vector<client_data>());
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>>();
|
||||
}
|
||||
|
||||
protocol_server(seastar::scheduling_group sg) noexcept : _sched_group(std::move(sg)) {}
|
||||
|
||||
@@ -691,6 +691,7 @@ client_data cql_server::connection::make_client_data() const {
|
||||
cd.connection_stage = client_connection_stage::authenticating;
|
||||
}
|
||||
cd.scheduling_group_name = _current_scheduling_group.name();
|
||||
cd.client_options = _client_state.get_client_options();
|
||||
|
||||
cd.ssl_enabled = _ssl_enabled;
|
||||
cd.ssl_protocol = _ssl_protocol;
|
||||
@@ -958,12 +959,17 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
|
||||
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
|
||||
_client_state.set_driver_version(driver_ver_opt->second);
|
||||
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
|
||||
}
|
||||
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
|
||||
_client_state.set_driver_name(driver_name_opt->second);
|
||||
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
|
||||
}
|
||||
|
||||
// Store all received client options for later exposure in the system.clients 'client_options' column
|
||||
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
|
||||
// identical key/value sets across multiple connections (e.g., same driver name/version).
|
||||
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
|
||||
|
||||
cql_protocol_extension_enum_set cql_proto_exts;
|
||||
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
||||
if (options.contains(protocol_extension_name(ext))) {
|
||||
@@ -1647,6 +1653,9 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
|
||||
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
|
||||
opts.insert({"COMPRESSION", "lz4"});
|
||||
opts.insert({"COMPRESSION", "snappy"});
|
||||
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
|
||||
// e.g. CQL driver configuration.
|
||||
opts.insert({"CLIENT_OPTIONS", ""});
|
||||
if (_server._config.allow_shard_aware_drivers) {
|
||||
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
|
||||
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
||||
@@ -2308,11 +2317,11 @@ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata
|
||||
return _response_metadata_id.value();
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
co_await for_each_gently([&ret] (const generic_server::connection& c) {
|
||||
const connection& conn = dynamic_cast<const connection&>(c);
|
||||
ret.emplace_back(conn.make_client_data());
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -206,6 +206,7 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
std::unique_ptr<event_notifier> _notifier;
|
||||
private:
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
transport_stats _stats;
|
||||
auth::service& _auth_service;
|
||||
qos::service_level_controller& _sl_controller;
|
||||
@@ -234,7 +235,7 @@ public:
|
||||
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<> update_connections_scheduling_group();
|
||||
future<> update_connections_service_level_params();
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -1547,8 +1547,8 @@ void reclaim_timer::report() const noexcept {
|
||||
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
|
||||
auto info_level = _stall_detected ? log_level::info : log_level::debug;
|
||||
auto MiB = 1024*1024;
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected,
|
||||
_stall_detected ? current_backtrace() : saved_backtrace{});
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected && !_preemptible,
|
||||
(_stall_detected && !_preemptible) ? current_backtrace() : saved_backtrace{});
|
||||
|
||||
timing_logger.log(time_level, "{} took {} us, trying to release {:.3f} MiB {}preemptibly, reserve: {{goal: {}, max: {}}}{}",
|
||||
_name, (_duration + 500ns) / 1us, (float)_memory_to_release / MiB, _preemptible ? "" : "non-",
|
||||
|
||||
Reference in New Issue
Block a user