Compare commits
2 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b399606af1 | ||
|
|
5dc8dce827 |
@@ -169,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -708,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -989,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
ret.emplace_back(r.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -108,7 +107,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -100,8 +100,9 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
const auto route_entries = parse_set_client_array(root);
|
||||
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_await cr.local().set_client_routes(route_entries);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
@@ -131,7 +132,8 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
const auto route_keys = parse_delete_client_array(root);
|
||||
co_await cr.local().delete_client_routes(route_keys);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,9 +10,7 @@
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
enum class client_type {
|
||||
@@ -29,20 +27,6 @@ enum class client_connection_stage {
|
||||
ready,
|
||||
};
|
||||
|
||||
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
|
||||
struct options_cache_value_type {};
|
||||
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
|
||||
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
|
||||
using client_options_cache_key_type = client_options_cache_type::key_type;
|
||||
|
||||
// This struct represents a single OPTION key-value pair from the client's connection options.
|
||||
// Both key and value are represented by corresponding "references" to their cached values.
|
||||
// Each "reference" is effectively a lw_shared_ptr value.
|
||||
struct client_option_key_value_cached_entry {
|
||||
client_options_cache_entry_type key;
|
||||
client_options_cache_entry_type value;
|
||||
};
|
||||
|
||||
sstring to_string(client_connection_stage ct);
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
@@ -53,8 +37,8 @@ struct client_data {
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
std::optional<client_options_cache_entry_type> driver_name;
|
||||
std::optional<client_options_cache_entry_type> driver_version;
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
std::optional<int32_t> protocol_version;
|
||||
std::optional<sstring> ssl_cipher_suite;
|
||||
@@ -62,7 +46,6 @@ struct client_data {
|
||||
std::optional<sstring> ssl_protocol;
|
||||
std::optional<sstring> username;
|
||||
std::optional<sstring> scheduling_group_name;
|
||||
std::list<client_option_key_value_cached_entry> client_options;
|
||||
|
||||
sstring stage_str() const { return to_string(connection_stage); }
|
||||
sstring client_type_str() const { return to_string(ct); }
|
||||
|
||||
@@ -125,6 +125,10 @@ if(target_arch)
|
||||
add_compile_options("-march=${target_arch}")
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
|
||||
endif()
|
||||
|
||||
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
|
||||
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")
|
||||
|
||||
@@ -2251,6 +2251,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
|
||||
if debuginfo and mode_config['can_have_debug_info']:
|
||||
cxxflags += ['-g', '-gz']
|
||||
|
||||
if 'clang' in cxx:
|
||||
# Since AssignmentTracking was enabled by default in clang
|
||||
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
|
||||
# coroutine frame debugging info (`coro_frame_ty`) is broken.
|
||||
#
|
||||
# It seems that we aren't losing much by disabling AssigmentTracking,
|
||||
# so for now we choose to disable it to get `coro_frame_ty` back.
|
||||
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
|
||||
|
||||
return cxxflags
|
||||
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <unordered_map>
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
@@ -109,9 +110,9 @@ struct frozen_schema_diff {
|
||||
extended_frozen_schema old_schema;
|
||||
extended_frozen_schema new_schema;
|
||||
};
|
||||
std::vector<extended_frozen_schema> created;
|
||||
std::vector<altered_schema> altered;
|
||||
std::vector<extended_frozen_schema> dropped;
|
||||
utils::chunked_vector<extended_frozen_schema> created;
|
||||
utils::chunked_vector<altered_schema> altered;
|
||||
utils::chunked_vector<extended_frozen_schema> dropped;
|
||||
};
|
||||
|
||||
// schema_diff represents what is happening with tables or views during schema merge
|
||||
@@ -121,9 +122,9 @@ struct schema_diff_per_shard {
|
||||
schema_ptr new_schema;
|
||||
};
|
||||
|
||||
std::vector<schema_ptr> created;
|
||||
std::vector<altered_schema> altered;
|
||||
std::vector<schema_ptr> dropped;
|
||||
utils::chunked_vector<schema_ptr> created;
|
||||
utils::chunked_vector<altered_schema> altered;
|
||||
utils::chunked_vector<schema_ptr> dropped;
|
||||
|
||||
future<frozen_schema_diff> freeze() const;
|
||||
|
||||
@@ -143,7 +144,7 @@ struct affected_tables_and_views_per_shard {
|
||||
schema_diff_per_shard tables;
|
||||
schema_diff_per_shard cdc;
|
||||
schema_diff_per_shard views;
|
||||
std::vector<bool> columns_changed;
|
||||
utils::chunked_vector<bool> columns_changed;
|
||||
};
|
||||
|
||||
struct affected_tables_and_views {
|
||||
|
||||
@@ -3157,10 +3157,7 @@ static bool must_have_tokens(service::node_state nst) {
|
||||
// A decommissioning node doesn't have tokens at the end, they are
|
||||
// removed during transition to the left_token_ring state.
|
||||
case service::node_state::decommissioning: return false;
|
||||
// A removing node might or might not have tokens depending on whether
|
||||
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
|
||||
// cases, we allow removing nodes to not have tokens.
|
||||
case service::node_state::removing: return false;
|
||||
case service::node_state::removing: return true;
|
||||
case service::node_state::rebuilding: return true;
|
||||
case service::node_state::normal: return true;
|
||||
case service::node_state::left: return false;
|
||||
|
||||
@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
|
||||
|
||||
future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
|
||||
} catch (...) {
|
||||
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
|
||||
sleep = true;
|
||||
}
|
||||
|
||||
if (sleep) {
|
||||
vbw_logger.debug("Sleeping after exception.");
|
||||
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
|
||||
|
||||
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
|
||||
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
|
||||
auto it = vbw._state._batch->tasks.begin();
|
||||
while (it != vbw._state._batch->tasks.end()) {
|
||||
auto id = it->first;
|
||||
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
|
||||
|
||||
++it; // Advance the iterator before potentially removing the entry from the map.
|
||||
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
|
||||
for (auto& [id, t]: tasks_map) {
|
||||
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
|
||||
if (!task_opt || task_opt->get().aborted) {
|
||||
co_await vbw._state._batch->abort_task(id);
|
||||
}
|
||||
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
if (processing_base_table != building_state.currently_processed_base_table) {
|
||||
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_vbw.local()._vb_state_machine.event.broadcast();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
|
||||
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
tasks.insert({id, *task_opt});
|
||||
}
|
||||
#ifdef SEASTAR_DEBUG
|
||||
{
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
auto& some_task = tasks.begin()->second;
|
||||
for (auto& [_, t]: tasks) {
|
||||
SCYLLA_ASSERT(t.base_id == some_task.base_id);
|
||||
SCYLLA_ASSERT(t.last_token == some_task.last_token);
|
||||
SCYLLA_ASSERT(t.replica == some_task.replica);
|
||||
SCYLLA_ASSERT(t.type == some_task.type);
|
||||
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
co_return collect_completed_tasks();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
|
||||
.with_column("ssl_protocol", utf8_type)
|
||||
.with_column("username", utf8_type)
|
||||
.with_column("scheduling_group", utf8_type)
|
||||
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Collect
|
||||
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
|
||||
using client_data_vec = utils::chunked_vector<client_data>;
|
||||
using shard_client_data = std::vector<client_data_vec>;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
|
||||
cd_vec.resize(smp::count);
|
||||
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
|
||||
for (unsigned i = 0; i < smp::count; i++) {
|
||||
for (auto&& ps_cdc : *cd_vec[i]) {
|
||||
for (auto&& cd : ps_cdc) {
|
||||
if (cd_map.contains(cd->ip)) {
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
if (cd_map.contains(cd.ip)) {
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
} else {
|
||||
dht::decorated_key key = make_partition_key(cd->ip);
|
||||
dht::decorated_key key = make_partition_key(cd.ip);
|
||||
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
|
||||
ips.insert(decorated_ip{std::move(key), cd->ip});
|
||||
cd_map[cd->ip].emplace_back(std::move(cd));
|
||||
ips.insert(decorated_ip{std::move(key), cd.ip});
|
||||
cd_map[cd.ip].emplace_back(std::move(cd));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
|
||||
co_await result.emit_partition_start(dip.key);
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
|
||||
return a.port < b.port || a.client_type_str() < b.client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd->shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd->stage_str());
|
||||
if (cd->driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
|
||||
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
|
||||
set_cell(cr.cells(), "shard_id", cd.shard_id);
|
||||
set_cell(cr.cells(), "connection_stage", cd.stage_str());
|
||||
if (cd.driver_name) {
|
||||
set_cell(cr.cells(), "driver_name", *cd.driver_name);
|
||||
}
|
||||
if (cd->driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
|
||||
if (cd.driver_version) {
|
||||
set_cell(cr.cells(), "driver_version", *cd.driver_version);
|
||||
}
|
||||
if (cd->hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd->hostname);
|
||||
if (cd.hostname) {
|
||||
set_cell(cr.cells(), "hostname", *cd.hostname);
|
||||
}
|
||||
if (cd->protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
|
||||
if (cd.protocol_version) {
|
||||
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
|
||||
}
|
||||
if (cd->ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
|
||||
if (cd.ssl_cipher_suite) {
|
||||
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
|
||||
}
|
||||
if (cd->ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
|
||||
if (cd.ssl_enabled) {
|
||||
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
|
||||
}
|
||||
if (cd->ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
|
||||
if (cd.ssl_protocol) {
|
||||
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
|
||||
}
|
||||
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
|
||||
if (cd->scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
|
||||
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
|
||||
if (cd.scheduling_group_name) {
|
||||
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(
|
||||
utf8_type,
|
||||
utf8_type,
|
||||
false
|
||||
);
|
||||
|
||||
auto prepare_client_options = [] (const auto& client_options) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& co: client_options) {
|
||||
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
};
|
||||
|
||||
set_cell(cr.cells(), "client_options",
|
||||
make_map_value(map_type, prepare_client_options(cd->client_options)));
|
||||
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
|
||||
@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
|
||||
@@ -1043,8 +1043,6 @@ The following modes are available:
|
||||
* - ``immediate``
|
||||
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
|
||||
@@ -74,8 +74,6 @@ The keys and values are:
|
||||
as an indicator to which shard client wants to connect. The desired shard number
|
||||
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
|
||||
Its value is a decimal representation of type `uint16_t`, by default `19142`.
|
||||
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
|
||||
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
|
||||
|
||||
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
|
||||
`biased-token-round-robin`. To apply the algorithm,
|
||||
|
||||
@@ -86,7 +86,6 @@ stateDiagram-v2
|
||||
de_left_token_ring --> [*]
|
||||
}
|
||||
state removing {
|
||||
re_left_token_ring : left_token_ring
|
||||
re_tablet_draining : tablet_draining
|
||||
re_tablet_migration : tablet_migration
|
||||
re_write_both_read_old : write_both_read_old
|
||||
@@ -99,8 +98,7 @@ stateDiagram-v2
|
||||
re_tablet_draining --> re_write_both_read_old
|
||||
re_write_both_read_old --> re_write_both_read_new: streaming completed
|
||||
re_write_both_read_old --> re_rollback_to_normal: rollback
|
||||
re_write_both_read_new --> re_left_token_ring
|
||||
re_left_token_ring --> [*]
|
||||
re_write_both_read_new --> [*]
|
||||
}
|
||||
rebuilding --> normal: streaming completed
|
||||
decommissioning --> left: operation succeeded
|
||||
@@ -124,10 +122,9 @@ Note that these are not all states, as there are other states specific to tablet
|
||||
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
|
||||
to modified token ring), reads are using old replicas.
|
||||
- `write_both_read_new` - as above, but reads are using new replicas.
|
||||
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
|
||||
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
|
||||
We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
|
||||
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
|
||||
moving the node we tried to decommission/remove back to the normal state.
|
||||
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
|
||||
@@ -144,9 +141,7 @@ reads that started before this point exist in the system. Finally we remove the
|
||||
transitioning state.
|
||||
|
||||
Decommission, removenode and replace work similarly, except they don't go through
|
||||
`commit_cdc_generation`. Both decommission and removenode go through the
|
||||
`left_token_ring` state to run a global barrier ensuring all nodes are aware
|
||||
of the topology change before the operation completes.
|
||||
`commit_cdc_generation`.
|
||||
|
||||
The state machine may also go only through the `commit_cdc_generation` state
|
||||
after getting a request from the user to create a new CDC generation if the
|
||||
|
||||
@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
|
||||
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
|
||||
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
|
||||
|
||||
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
|
||||
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
|
||||
- a keyspace/view was dropped
|
||||
- tablet operations (see [tablet operations section](#tablet-operations))
|
||||
In the first case we simply delete relevant view building tasks as they are no longer needed.
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
|
||||
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
|
||||
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
|
||||
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
|
||||
|
||||
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,
|
||||
|
||||
@@ -17,7 +17,6 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
Workload Prioritization </features/workload-prioritization>
|
||||
Backup and Restore </features/backup-and-restore>
|
||||
Incremental Repair </features/incremental-repair/>
|
||||
Vector Search </features/vector-search/>
|
||||
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
@@ -44,5 +43,3 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
|
||||
efficient and lightweight approach to maintaining data consistency by
|
||||
repairing only the data that has changed since the last repair.
|
||||
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
|
||||
similarity-based queries on vector embeddings.
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
=================================
|
||||
Vector Search in ScyllaDB
|
||||
=================================
|
||||
|
||||
.. note::
|
||||
|
||||
This feature is currently available only in `ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
What Is Vector Search
|
||||
-------------------------
|
||||
|
||||
Vector Search enables similarity-based queries over high-dimensional data,
|
||||
such as text, images, audio, or user behavior. Instead of searching for exact
|
||||
matches, it allows applications to find items that are semantically similar to
|
||||
a given input.
|
||||
|
||||
To do this, Vector Search works on vector embeddings, which are numerical
|
||||
representations of data that capture semantic meaning. This enables queries
|
||||
such as:
|
||||
|
||||
* “Find documents similar to this paragraph”
|
||||
* “Find products similar to what the user just viewed”
|
||||
* “Find previous tickets related to this support request”
|
||||
|
||||
Rather than relying on exact values or keywords, Vector Search returns results
|
||||
based on distance or similarity between vectors. This capability is
|
||||
increasingly used in modern workloads such as AI-powered search, recommendation
|
||||
systems, and retrieval-augmented generation (RAG).
|
||||
|
||||
Why Vector Search Matters
|
||||
------------------------------------
|
||||
|
||||
Many applications already rely on ScyllaDB for high throughput, low and
|
||||
predictable latency, and large-scale data storage.
|
||||
|
||||
Vector Search complements these strengths by enabling new classes of workloads,
|
||||
including:
|
||||
|
||||
* Semantic search over text or documents
|
||||
* Recommendations based on user or item similarity
|
||||
* AI and ML applications, including RAG pipelines
|
||||
* Anomaly and pattern detection
|
||||
|
||||
With Vector Search, ScyllaDB can serve as the similarity search backend for
|
||||
AI-driven applications.
|
||||
|
||||
Availability
|
||||
--------------
|
||||
|
||||
Vector Search is currently available only in ScyllaDB Cloud, the fully managed
|
||||
ScyllaDB service.
|
||||
|
||||
|
||||
👉 For details on using Vector Search, refer to the
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/index.html>`_.
|
||||
@@ -20,10 +20,7 @@ You can run your ScyllaDB workloads on AWS, GCE, and Azure using a ScyllaDB imag
|
||||
Amazon Web Services (AWS)
|
||||
-----------------------------
|
||||
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`,
|
||||
:ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`,
|
||||
:ref:`i7ie <system-requirements-i7ie-instances>`, :ref:`i8g<system-requirements-i8g-instances>`,
|
||||
and :ref:`i8ge <system-requirements-i8ge-instances>`.
|
||||
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`, :ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`, and :ref:`i7ie <system-requirements-i7ie-instances>`.
|
||||
|
||||
.. note::
|
||||
|
||||
@@ -198,118 +195,6 @@ All i7i instances have the following specs:
|
||||
|
||||
See `Amazon EC2 I7i Instances <https://aws.amazon.com/ec2/instance-types/i7i/>`_ for details.
|
||||
|
||||
|
||||
.. _system-requirements-i8g-instances:
|
||||
|
||||
i8g instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8g instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8g.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 468 GB
|
||||
* - i8g.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 937 GB
|
||||
* - i8g.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 1 x 1,875 GB
|
||||
* - i8g.4xlarge
|
||||
- 16
|
||||
- 128
|
||||
- 1 x 3,750 GB
|
||||
* - i8g.8xlarge
|
||||
- 32
|
||||
- 256
|
||||
- 2 x 3,750 GB
|
||||
* - i8g.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 3 x 3,750 GB
|
||||
* - i8g.16xlarge
|
||||
- 64
|
||||
- 512
|
||||
- 4 x 3,750 GB
|
||||
|
||||
All i8g instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 100 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 45 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
.. _system-requirements-i8ge-instances:
|
||||
|
||||
i8ge instances
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
The following i8ge instances are supported.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 20 20 30
|
||||
:header-rows: 1
|
||||
|
||||
* - Model
|
||||
- vCPU
|
||||
- Mem (GiB)
|
||||
- Storage (GB)
|
||||
* - i8ge.large
|
||||
- 2
|
||||
- 16
|
||||
- 1 x 1,250 GB
|
||||
* - i8ge.xlarge
|
||||
- 4
|
||||
- 32
|
||||
- 1 x 2,500 GB
|
||||
* - i8ge.2xlarge
|
||||
- 8
|
||||
- 64
|
||||
- 2 x 2,500 GB
|
||||
* - i8ge.3xlarge
|
||||
- 12
|
||||
- 96
|
||||
- 1 x 7,500 GB
|
||||
* - i8ge.6xlarge
|
||||
- 24
|
||||
- 192
|
||||
- 2 x 7,500 GB
|
||||
* - i8ge.12xlarge
|
||||
- 48
|
||||
- 384
|
||||
- 4 x 7,500 GB
|
||||
* - i8ge.18xlarge
|
||||
- 72
|
||||
- 576
|
||||
- 6 x 7,500 GB
|
||||
|
||||
All i8ge instances have the following specs:
|
||||
|
||||
* Powered by AWS Graviton4 processors
|
||||
* 3rd generation AWS Nitro SSD storage
|
||||
* DDR5-5600 memory for improved throughput
|
||||
* Up to 300 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
|
||||
Amazon Elastic Block Store (EBS)
|
||||
* Instance sizes offer up to 120 TB of total local NVMe instance storage
|
||||
|
||||
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
|
||||
|
||||
|
||||
Im4gn and Is4gen instances
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
ScyllaDB supports Arm-based Im4gn and Is4gen instances. See `Amazon EC2 Im4gn and Is4gen instances <https://aws.amazon.com/ec2/instance-types/i4g/>`_ for specification details.
|
||||
|
||||
@@ -25,7 +25,8 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Install ScyllaDB </getting-started/install-scylla/index/>`
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
|
||||
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
|
||||
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
* :doc:`Create a ScyllaDB Cluster - Single Data Center (DC) </operating-scylla/procedures/cluster-management/create-cluster/>`
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
ScyllaDB Housekeeping and how to disable it
|
||||
============================================
|
||||
|
||||
It is always recommended to run the latest stable version of ScyllaDB.
|
||||
It is always recommended to run the latest version of ScyllaDB.
|
||||
The latest stable release version is always available from the `Download Center <https://www.scylladb.com/download/>`_.
|
||||
|
||||
When you install ScyllaDB, it installs by default two services: **scylla-housekeeping-restart** and **scylla-housekeeping-daily**. These services check for the latest ScyllaDB version and prompt the user if they are using a version that is older than what is publicly available.
|
||||
Information about your ScyllaDB deployment, including the ScyllaDB version currently used, as well as unique user and server identifiers, are collected by a centralized service.
|
||||
|
||||
@@ -9,8 +9,6 @@ Running ``cluster repair`` on a **single node** synchronizes all data on all nod
|
||||
To synchronize all data in clusters that have both tablets-based and vnodes-based keyspaces, run :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair/>` on **all**
|
||||
of the nodes in the cluster, and :doc:`nodetool cluster repair </operating-scylla/nodetool-commands/cluster/repair/>` on **any** of the nodes in the cluster.
|
||||
|
||||
.. warning:: :term:`Colocated Tables <Colocated Table>` cannot be synchronized using cluster repair in this version.
|
||||
|
||||
To check if a keyspace enables tablets, use:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
@@ -202,7 +202,3 @@ Glossary
|
||||
The name comes from two basic operations, multiply (MU) and rotate (R), used in its inner loop.
|
||||
The MurmurHash3 version used in ScyllaDB originated from `Apache Cassandra <https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/digest/MurmurHash3.html>`_, and is **not** identical to the `official MurmurHash3 calculation <https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MurmurHash.java#L31-L33>`_. More `here <https://github.com/russss/murmur3-cassandra>`_.
|
||||
|
||||
Colocated Table
|
||||
An internal table of a special type in a :doc:`tablets </architecture/tablets>` enabled keyspace that is colocated with another base table, meaning it always has the same tablet replicas as the base table.
|
||||
Current types of colocated tables include CDC log tables, local indexes, and materialized views that have the same partition key as their base table.
|
||||
|
||||
|
||||
@@ -177,7 +177,6 @@ public:
|
||||
gms::feature driver_service_level { *this, "DRIVER_SERVICE_LEVEL"sv };
|
||||
gms::feature strongly_consistent_tables { *this, "STRONGLY_CONSISTENT_TABLES"sv };
|
||||
gms::feature client_routes { *this, "CLIENT_ROUTES"sv };
|
||||
gms::feature removenode_with_left_token_ring { *this, "REMOVENODE_WITH_LEFT_TOKEN_RING"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -176,7 +176,7 @@ void fsm::become_leader() {
|
||||
|
||||
_last_election_time = _clock.now();
|
||||
_ping_leader = false;
|
||||
// a new leader needs to commit at least one entry to make sure that
|
||||
// a new leader needs to commit at lease one entry to make sure that
|
||||
// all existing entries in its log are committed as well. Also it should
|
||||
// send append entries RPC as soon as possible to establish its leadership
|
||||
// (3.4). Do both of those by committing a dummy entry.
|
||||
|
||||
@@ -2793,7 +2793,6 @@ future<> database::flush_all_tables() {
|
||||
});
|
||||
_all_tables_flushed_at = db_clock::now();
|
||||
co_await _commitlog->wait_for_pending_deletes();
|
||||
dblog.info("Forcing new commitlog segment and flushing all tables complete");
|
||||
}
|
||||
|
||||
future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {
|
||||
|
||||
@@ -3385,15 +3385,16 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
continue;
|
||||
}
|
||||
|
||||
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
||||
while (auto de = lister.get().get()) {
|
||||
auto snapshot_name = de->name;
|
||||
lister::scan_dir(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [datadir, &all_snapshots] (fs::path snapshots_dir, directory_entry de) {
|
||||
auto snapshot_name = de.name;
|
||||
all_snapshots.emplace(snapshot_name, snapshot_details());
|
||||
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
}
|
||||
return get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).then([&all_snapshots, snapshot_name] (auto details) {
|
||||
auto& sd = all_snapshots.at(snapshot_name);
|
||||
sd.total += details.total;
|
||||
sd.live += details.live;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
return all_snapshots;
|
||||
});
|
||||
@@ -3401,61 +3402,38 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
|
||||
|
||||
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
|
||||
table::snapshot_details details{};
|
||||
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
|
||||
if (!co_await file_exists(staging_dir->native())) {
|
||||
staging_dir.reset();
|
||||
}
|
||||
|
||||
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
while (auto de = co_await lister.get()) {
|
||||
const auto& name = de->name;
|
||||
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
|
||||
// See https://github.com/scylladb/seastar/pull/3163
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
|
||||
co_await lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [datadir, &details] (fs::path snapshot_dir, directory_entry de) -> future<> {
|
||||
auto sd = co_await io_check(file_stat, (snapshot_dir / de.name).native(), follow_symlink::no);
|
||||
auto size = sd.allocated_size;
|
||||
|
||||
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
|
||||
//
|
||||
// All the others should just generate an exception: there is something wrong, so don't blindly
|
||||
// add it to the size.
|
||||
if (name != "manifest.json" && name != "schema.cql") {
|
||||
if (de.name != "manifest.json" && de.name != "schema.cql") {
|
||||
details.total += size;
|
||||
if (sd.number_of_links == 1) {
|
||||
// File exists only in the snapshot directory.
|
||||
details.live += size;
|
||||
continue;
|
||||
}
|
||||
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
|
||||
// So check the datadir for the file too.
|
||||
} else {
|
||||
continue;
|
||||
size = 0;
|
||||
}
|
||||
|
||||
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
|
||||
try {
|
||||
try {
|
||||
// File exists in the main SSTable directory. Snapshots are not contributing to size
|
||||
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
|
||||
auto psd = co_await io_check(file_stat, (datadir / de.name).native(), follow_symlink::no);
|
||||
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
|
||||
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
|
||||
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
|
||||
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
co_return false;
|
||||
(datadir / de.name).native(), psd.device_id, psd.inode_number, psd.size,
|
||||
(snapshot_dir / de.name).native(), sd.device_id, sd.inode_number, sd.size);
|
||||
details.live += size;
|
||||
}
|
||||
co_return true;
|
||||
} catch (std::system_error& e) {
|
||||
} catch (std::system_error& e) {
|
||||
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
||||
throw;
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
};
|
||||
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
|
||||
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
|
||||
!co_await exists_in_dir(datadir / name)) {
|
||||
details.live += size;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
co_return details;
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ seastar::future<> service::client_routes_service::set_client_routes_inner(const
|
||||
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
for (const auto& entry : route_entries) {
|
||||
for (auto& entry : route_entries) {
|
||||
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
|
||||
cmuts.emplace_back(std::move(mut));
|
||||
}
|
||||
@@ -103,24 +103,24 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
|
||||
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_entries = std::move(route_entries)] {
|
||||
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.set_client_routes_inner(route_entries);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) mutable -> future<> {
|
||||
return cr.with_retry([&cr, route_keys = std::move(route_keys)] {
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.delete_client_routes_inner(route_keys);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
seastar::future<> service::client_routes_service::with_retry(Func func) const {
|
||||
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
|
||||
int retries = 10;
|
||||
while (true) {
|
||||
try {
|
||||
|
||||
@@ -66,8 +66,8 @@ public:
|
||||
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
|
||||
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
|
||||
future<std::vector<client_route_entry>> get_client_routes() const;
|
||||
seastar::future<> set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries);
|
||||
seastar::future<> delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys);
|
||||
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
|
||||
|
||||
// notifications
|
||||
@@ -76,7 +76,7 @@ private:
|
||||
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
template <typename Func>
|
||||
seastar::future<> with_retry(Func func) const;
|
||||
seastar::future<> with_retry(Func&& func) const;
|
||||
|
||||
abort_source& _abort_source;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
@@ -344,17 +344,3 @@ void service::client_state::update_per_service_level_params(qos::service_level_o
|
||||
|
||||
_workload_type = slo.workload;
|
||||
}
|
||||
|
||||
future<> service::client_state::set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options) {
|
||||
for (const auto& [key, value] : client_options) {
|
||||
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "client_data.hh"
|
||||
|
||||
#include "transport/cql_protocol_extension.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -103,8 +102,7 @@ private:
|
||||
private volatile String keyspace;
|
||||
#endif
|
||||
std::optional<auth::authenticated_user> _user;
|
||||
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
|
||||
std::list<client_option_key_value_cached_entry> _client_options;
|
||||
std::optional<sstring> _driver_name, _driver_version;
|
||||
|
||||
auth_state _auth_state = auth_state::UNINITIALIZED;
|
||||
bool _control_connection = false;
|
||||
@@ -153,33 +151,18 @@ public:
|
||||
return _control_connection = true;
|
||||
}
|
||||
|
||||
std::optional<client_options_cache_entry_type> get_driver_name() const {
|
||||
std::optional<sstring> get_driver_name() const {
|
||||
return _driver_name;
|
||||
}
|
||||
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
|
||||
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
void set_driver_name(sstring driver_name) {
|
||||
_driver_name = std::move(driver_name);
|
||||
}
|
||||
|
||||
const auto& get_client_options() const {
|
||||
return _client_options;
|
||||
}
|
||||
|
||||
future<> set_client_options(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const std::unordered_map<sstring, sstring>& client_options);
|
||||
|
||||
std::optional<client_options_cache_entry_type> get_driver_version() const {
|
||||
std::optional<sstring> get_driver_version() const {
|
||||
return _driver_version;
|
||||
}
|
||||
future<> set_driver_version(
|
||||
client_options_cache_type& keys_and_values_cache,
|
||||
const sstring& driver_version)
|
||||
{
|
||||
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
void set_driver_version(sstring driver_version) {
|
||||
_driver_version = std::move(driver_version);
|
||||
}
|
||||
|
||||
client_state(external_tag,
|
||||
|
||||
@@ -588,16 +588,12 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
}
|
||||
break;
|
||||
case node_state::decommissioning:
|
||||
[[fallthrough]];
|
||||
case node_state::removing:
|
||||
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
|
||||
// A decommissioning node loses its tokens when topology moves to left_token_ring.
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
|
||||
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
|
||||
on_internal_error(
|
||||
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
|
||||
}
|
||||
break;
|
||||
}
|
||||
[[fallthrough]];
|
||||
case node_state::removing:
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
|
||||
// no need for double writes anymore since op failed
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
|
||||
@@ -2672,7 +2672,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
while (utils::get_local_injector().enter("topology_coordinator_pause_after_streaming")) {
|
||||
co_await sleep_abortable(std::chrono::milliseconds(10), _as);
|
||||
}
|
||||
const bool removenode_with_left_token_ring = _feature_service.removenode_with_left_token_ring;
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
bool barrier_failed = false;
|
||||
// In this state writes goes to old and new replicas but reads start to be done from new replicas
|
||||
@@ -2727,9 +2726,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case node_state::removing: {
|
||||
co_await utils::get_local_injector().inject("delay_node_removal", utils::wait_for_message(std::chrono::minutes(5)));
|
||||
if (!removenode_with_left_token_ring) {
|
||||
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
|
||||
}
|
||||
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
|
||||
}
|
||||
[[fallthrough]];
|
||||
case node_state::decommissioning: {
|
||||
@@ -2737,10 +2734,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
node_state next_state;
|
||||
utils::chunked_vector<canonical_mutation> muts;
|
||||
muts.reserve(2);
|
||||
if (removenode_with_left_token_ring || node.rs->state == node_state::decommissioning) {
|
||||
// Both decommission and removenode go through left_token_ring state
|
||||
// to ensure a global barrier is executed before the request is marked as done.
|
||||
// This ensures all nodes have observed the topology change.
|
||||
if (node.rs->state == node_state::decommissioning) {
|
||||
next_state = node.rs->state;
|
||||
builder.set_transition_state(topology::transition_state::left_token_ring);
|
||||
} else {
|
||||
@@ -2815,16 +2809,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
case topology::transition_state::left_token_ring: {
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
|
||||
// Need to be captured as the node variable might become invalid (e.g. moved out) at particular points.
|
||||
const auto node_rs_state = node.rs->state;
|
||||
|
||||
const bool is_removenode = node_rs_state == node_state::removing;
|
||||
|
||||
if (is_removenode && !_feature_service.removenode_with_left_token_ring) {
|
||||
on_internal_error(
|
||||
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
|
||||
}
|
||||
|
||||
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
|
||||
// Remove the node from group0 here - in general, it won't be able to leave on its own
|
||||
// because we'll ban it as soon as we tell it to shut down.
|
||||
@@ -2844,16 +2828,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
muts.push_back(builder.build());
|
||||
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
|
||||
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
|
||||
auto str = std::invoke([&]() {
|
||||
switch (node_rs_state) {
|
||||
case node_state::decommissioning:
|
||||
return ::format("finished decommissioning node {}", node.id);
|
||||
case node_state::removing:
|
||||
return ::format("finished removing node {}", node.id);
|
||||
default:
|
||||
return ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
|
||||
}
|
||||
});
|
||||
auto str = node.rs->state == node_state::decommissioning
|
||||
? ::format("finished decommissioning node {}", node.id)
|
||||
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
|
||||
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
|
||||
};
|
||||
|
||||
@@ -2866,11 +2843,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (node.id == _raft.id()) {
|
||||
// Removed node must be dead, so it shouldn't enter here (it can't coordinate its own removal).
|
||||
if (is_removenode) {
|
||||
on_internal_error(rtlogger, "removenode operation cannot be coordinated by the removed node itself");
|
||||
}
|
||||
|
||||
// Someone else needs to coordinate the rest of the decommission process,
|
||||
// because the decommissioning node is going to shut down in the middle of this state.
|
||||
rtlogger.info("coordinator is decommissioning; giving up leadership");
|
||||
@@ -2884,13 +2856,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
bool barrier_failed = false;
|
||||
// Wait until other nodes observe the new token ring and stop sending writes to this node.
|
||||
auto excluded_nodes = get_excluded_nodes_for_topology_request(node);
|
||||
try {
|
||||
// Removed node is added to ignored nodes, so it should be automatically excluded.
|
||||
if (is_removenode && !excluded_nodes.contains(node.id)) {
|
||||
on_internal_error(rtlogger, "removenode operation must have the removed node in excluded_nodes");
|
||||
}
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), std::move(excluded_nodes)), node.id);
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
@@ -2907,17 +2874,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (barrier_failed) {
|
||||
// If barrier above failed it means there may be unfinished writes to a decommissioned node,
|
||||
// or some nodes might not have observed the new topology yet (one purpose of the barrier
|
||||
// is to make sure all nodes observed the new topology before completing the request).
|
||||
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
|
||||
// Lets wait for the ring delay for those writes to complete and new topology to propagate
|
||||
// before continuing.
|
||||
co_await sleep_abortable(_ring_delay, _as);
|
||||
node = retake_node(co_await start_operation(), node.id);
|
||||
}
|
||||
|
||||
// Make decommissioning/removed node a non voter before reporting operation completion below.
|
||||
// Otherwise the node may see the completion and exit before it is removed from
|
||||
// Make decommissioning node a non voter before reporting operation completion below.
|
||||
// Otherwise the decommissioned node may see the completion and exit before it is removed from
|
||||
// the config at which point the removal from the config will hang if the cluster had only two
|
||||
// nodes before the decommission.
|
||||
co_await _voter_handler.on_node_removed(node.id, _as);
|
||||
@@ -2928,7 +2893,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
|
||||
|
||||
// For decommission/rollback: Tell the node to shut down.
|
||||
// Tell the node to shut down.
|
||||
// This is done to improve user experience when there are no failures.
|
||||
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
|
||||
// so there's no guarantee that it would learn about entering that state even if it was still
|
||||
@@ -2937,19 +2902,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// There is the possibility that the node will never get the message
|
||||
// and decommission will hang on that node.
|
||||
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
|
||||
//
|
||||
// For removenode: The node is already dead, no need to send shutdown command.
|
||||
auto node_id = node.id;
|
||||
bool shutdown_failed = false;
|
||||
if (!is_removenode) {
|
||||
try {
|
||||
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
|
||||
} catch (...) {
|
||||
rtlogger.warn("failed to tell node {} to shut down - it may hang."
|
||||
" It's safe to shut it down manually now. (Exception: {})",
|
||||
node.id, std::current_exception());
|
||||
shutdown_failed = true;
|
||||
}
|
||||
try {
|
||||
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
|
||||
} catch (...) {
|
||||
rtlogger.warn("failed to tell node {} to shut down - it may hang."
|
||||
" It's safe to shut it down manually now. (Exception: {})",
|
||||
node.id, std::current_exception());
|
||||
shutdown_failed = true;
|
||||
}
|
||||
if (shutdown_failed) {
|
||||
node = retake_node(co_await start_operation(), node_id);
|
||||
|
||||
@@ -604,14 +604,18 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
|
||||
service_level_names = [sl.service_level for sl in service_levels]
|
||||
assert "driver" not in service_level_names
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
|
||||
number_of_requests = 3000
|
||||
number_of_requests = 1000
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
logger.info(f"group={group}, tasks_processed={res}")
|
||||
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
@@ -623,10 +627,8 @@ async def _verify_tasks_processed_metrics(manager, server, used_group, unused_gr
|
||||
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
|
||||
tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
|
||||
assert tasks_processed_by_used_group - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert tasks_processed_by_unused_group - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:
|
||||
|
||||
@@ -52,18 +52,6 @@ KNOWN_LOG_LEVELS = {
|
||||
"OFF": "info",
|
||||
}
|
||||
|
||||
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
|
||||
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
|
||||
|
||||
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Splits a "key : value" line into key and value.
|
||||
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
|
||||
|
||||
|
||||
class NodeError(Exception):
|
||||
def __init__(self, msg: str, process: int | None = None):
|
||||
@@ -540,15 +528,6 @@ class ScyllaNode:
|
||||
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
|
||||
|
||||
def stress(self, stress_options: list[str], **kwargs):
|
||||
"""
|
||||
Run `cassandra-stress` against this node.
|
||||
This method does not do any result parsing.
|
||||
|
||||
:param stress_options: List of options to pass to `cassandra-stress`.
|
||||
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
|
||||
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
|
||||
"""
|
||||
|
||||
cmd_args = ["cassandra-stress"] + stress_options
|
||||
|
||||
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
|
||||
@@ -570,73 +549,6 @@ class ScyllaNode:
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
def _set_stress_val(self, key, val, res):
|
||||
"""
|
||||
Normalize a stress result string and populate aggregate/read/write metrics.
|
||||
|
||||
Removes comma-thousands separators from numbers, converts to float,
|
||||
stores the aggregate metric under `key`.
|
||||
If the value contains a "[READ ..., WRITE ...]" block, also stores the
|
||||
read and write metrics under `key:read` and `key:write`.
|
||||
|
||||
:param key: The metric name
|
||||
:param val: The metric value string
|
||||
:param res: The dictionary to populate
|
||||
"""
|
||||
|
||||
def parse_num(s):
|
||||
return float(s.replace(',', ''))
|
||||
|
||||
if "[" in val:
|
||||
p = STRESS_SUMMARY_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key] = parse_num(m.group(1))
|
||||
p = STRESS_READ_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":read"] = parse_num(m.group(1))
|
||||
p = STRESS_WRITE_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":write"] = parse_num(m.group(1))
|
||||
else:
|
||||
try:
|
||||
res[key] = parse_num(val)
|
||||
except ValueError:
|
||||
res[key] = val
|
||||
|
||||
|
||||
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
|
||||
"""
|
||||
Run stress test and return results as a structured metrics dictionary.
|
||||
|
||||
Runs `stress()`, finds the `Results:` section in `stdout`, and then
|
||||
processes each `key : value` line, putting it into a dictionary.
|
||||
|
||||
:param stress_options: List of stress options to pass to `stress()`.
|
||||
:param ignore_errors: Deprecated (no effect).
|
||||
:param kwargs: Additional arguments to pass to `stress()`.
|
||||
:return: Dictionary of stress test results.
|
||||
"""
|
||||
if ignore_errors:
|
||||
self.warning("passing `ignore_errors` to stress_object() is deprecated")
|
||||
ret = self.stress(stress_options, **kwargs)
|
||||
p = STRESS_KEY_VALUE_PATTERN
|
||||
res = {}
|
||||
start = False
|
||||
for line in (s.strip() for s in ret.stdout.splitlines()):
|
||||
if start:
|
||||
m = p.match(line)
|
||||
if m:
|
||||
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
|
||||
else:
|
||||
if line == 'Results:':
|
||||
start = True
|
||||
return res
|
||||
|
||||
|
||||
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
|
||||
cmd = ["flush"]
|
||||
if ks:
|
||||
|
||||
@@ -1,690 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2015-present The Apache Software Foundation
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from concurrent import futures
|
||||
from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from cassandra.query import SimpleStatement, dict_factory
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from dtest_class import Tester, create_cf, create_ks, read_barrier
|
||||
from tools.assertions import assert_all, assert_invalid
|
||||
from tools.cluster_topology import generate_cluster_topology
|
||||
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestSchemaManagement(Tester):
|
||||
def prepare(self, racks_num: int, has_config: bool = True):
|
||||
cluster = self.cluster
|
||||
cluster_topology = generate_cluster_topology(rack_num=racks_num)
|
||||
|
||||
if has_config:
|
||||
config = {
|
||||
"ring_delay_ms": 5000,
|
||||
}
|
||||
cluster.set_configuration_options(values=config)
|
||||
|
||||
cluster.populate(cluster_topology)
|
||||
cluster.start(wait_other_notice=True)
|
||||
|
||||
return cluster
|
||||
|
||||
|
||||
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema...")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE users (
|
||||
id int,
|
||||
firstname text,
|
||||
lastname text,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
|
||||
insert_statement.consistency_level = ConsistencyLevel.ALL
|
||||
session.execute(insert_statement, [0])
|
||||
|
||||
logger.debug("Altering schema")
|
||||
session.execute("ALTER TABLE users WITH comment = 'updated'")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.stop(gently=True)
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("Restarting node3")
|
||||
node3.stop(gently=True)
|
||||
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
|
||||
|
||||
n_partitions = 20
|
||||
for i in range(n_partitions):
|
||||
session.execute(insert_statement, [i])
|
||||
|
||||
rows = session.execute("SELECT * FROM users")
|
||||
res = sorted(rows)
|
||||
assert len(res) == n_partitions
|
||||
for i in range(n_partitions):
|
||||
expected = [i, "A", "B"]
|
||||
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
|
||||
|
||||
def test_dropping_keyspace_with_many_columns(self):
|
||||
"""
|
||||
Exploits https://github.com/scylladb/scylla/issues/1484
|
||||
"""
|
||||
cluster = self.prepare(racks_num=1, has_config=False)
|
||||
|
||||
node1 = cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
for i in range(8):
|
||||
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
|
||||
session.execute("drop keyspace testxyz")
|
||||
|
||||
for node in cluster.nodelist():
|
||||
s = self.patient_cql_connection(node)
|
||||
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
s.execute("drop keyspace testxyz")
|
||||
|
||||
def test_multiple_create_table_in_parallel(self):
|
||||
"""
|
||||
Run multiple create table statements via different nodes
|
||||
1. Create a cluster of 3 nodes
|
||||
2. Run create table with different table names in parallel - check all complete
|
||||
3. Run create table with the same table name in parallel - check if they complete
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes")
|
||||
nodes_count = 3
|
||||
cluster = self.prepare(racks_num=nodes_count)
|
||||
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
|
||||
ks = "ks"
|
||||
create_ks(sessions[0], ks, nodes_count)
|
||||
|
||||
def create_table(session, table_name):
|
||||
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
|
||||
logger.debug(f"create_statement {create_statement}")
|
||||
session.execute(create_statement)
|
||||
|
||||
logger.debug("2. Run create table with different table names in parallel - check all complete")
|
||||
step2_tables = [f"t{i}" for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
list(executor.map(create_table, sessions, step2_tables))
|
||||
|
||||
for table in step2_tables:
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
|
||||
step3_table = "test"
|
||||
step3_tables = [step3_table for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
|
||||
for res_future in res_futures:
|
||||
try:
|
||||
res_future.result()
|
||||
except AlreadyExists as e:
|
||||
logger.info(f"expected cassandra.AlreadyExists error {e}")
|
||||
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
|
||||
def test_alter_table_in_parallel_to_read_and_write(self, case):
|
||||
"""
|
||||
Create a table and write into while altering the table
|
||||
1. Create a cluster of 3 nodes and populate a table
|
||||
2. Run write/read/read_and_write" statement in a loop
|
||||
3. Alter table while inserts are running
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes and populate a table")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
col_number = 20
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
|
||||
def run_stress(stress_type, col=col_number - 2):
|
||||
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
|
||||
|
||||
logger.debug("Populate")
|
||||
run_stress("write", col_number)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. Run {case} statement in a loop")
|
||||
statement_future = executor.submit(functools.partial(run_stress, case))
|
||||
|
||||
logger.debug(f"let's {case} statement work some time")
|
||||
time.sleep(2)
|
||||
|
||||
logger.debug("3. Alter table while inserts are running")
|
||||
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
|
||||
logger.debug(f"alter_statement {alter_statement}")
|
||||
alter_result = session.execute(alter_statement)
|
||||
logger.debug(alter_result.all())
|
||||
|
||||
logger.debug(f"wait till {case} statement finished")
|
||||
statement_future.result()
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
|
||||
|
||||
logger.debug("read and check data")
|
||||
run_stress("read")
|
||||
|
||||
@pytest.mark.skip("unimplemented")
|
||||
def commitlog_replays_after_schema_change(self):
|
||||
"""
|
||||
Commitlog can be replayed even though schema has been changed
|
||||
1. Create a table and insert data
|
||||
2. Alter table
|
||||
3. Kill node
|
||||
4. Boot node and verify that commitlog have been replayed and that all data is restored
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
|
||||
def test_update_schema_while_node_is_killed(self, case):
|
||||
"""
|
||||
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
def create_table_case():
|
||||
try:
|
||||
logger.debug("Creating table")
|
||||
create_c1c2_table(session)
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10)
|
||||
except AlreadyExists:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
pass
|
||||
|
||||
def alter_table_case():
|
||||
try:
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Invalid column name c3" in str(exc)
|
||||
|
||||
def drop_table_case():
|
||||
try:
|
||||
session.execute("DROP TABLE cf;", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Cannot drop non existing table" in str(exc)
|
||||
|
||||
logger.debug("Creating keyspace")
|
||||
create_ks(session, "ks", 3)
|
||||
if case != "create_table":
|
||||
create_table_case()
|
||||
|
||||
case_map = {
|
||||
"create_table": create_table_case,
|
||||
"alter_table": alter_table_case,
|
||||
"drop_table": drop_table_case,
|
||||
}
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. kill node during {case}")
|
||||
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
|
||||
case_map[case]()
|
||||
kill_node_future.result()
|
||||
|
||||
logger.debug("3. Start the stopped node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
session = self.patient_exclusive_cql_connection(node2)
|
||||
read_barrier(session)
|
||||
|
||||
def create_or_alter_table_expected_result(col_mun):
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
|
||||
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
|
||||
|
||||
expected_case_result_map = {
|
||||
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
|
||||
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
|
||||
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
|
||||
}
|
||||
logger.debug("verify that commitlog has been replayed and that all data is restored")
|
||||
expected_case_result_map[case]()
|
||||
|
||||
@pytest.mark.parametrize("is_gently_stop", [True, False])
|
||||
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
|
||||
"""
|
||||
Nodes rejoining the cluster synch on schema changes
|
||||
1. Create a cluster and insert data
|
||||
2. Stop a node
|
||||
3. Alter table
|
||||
4. Insert additional data
|
||||
5. Start the stopped node
|
||||
6. Verify the stopped node synchs on the updated schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
create_c1c2_table(session)
|
||||
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
|
||||
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
logger.debug("2 Stop a node1")
|
||||
node1.stop(gently=is_gently_stop, wait_other_notice=True)
|
||||
|
||||
logger.debug("3 Alter table")
|
||||
session = self.patient_cql_connection(node2)
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
|
||||
logger.debug("4 Insert additional data")
|
||||
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
|
||||
|
||||
logger.debug("5. Start the stopped node1")
|
||||
node1.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("6. Verify the stopped node synchs on the updated schema")
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
read_barrier(session)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [["test", "test", "test", "test"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
def test_reads_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
|
||||
|
||||
def test_writes_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True, wait_other_notice=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [[2, "2"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
|
||||
|
||||
|
||||
class TestLargePartitionAlterSchema(Tester):
|
||||
# Issue scylladb/scylla: #5135:
|
||||
#
|
||||
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
|
||||
# partition entry update
|
||||
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
|
||||
# entry being preempted.
|
||||
#
|
||||
# The scenario in which the problem could have happened has to involve:
|
||||
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
|
||||
# - appending writes to the partition (not overwrites)
|
||||
# - scans of the partition
|
||||
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
|
||||
# column lands in the middle (in alphabetical order) of the old column set.
|
||||
#
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read.
|
||||
#
|
||||
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
|
||||
|
||||
PARTITIONS = 50
|
||||
STRING_VALUE = string.ascii_lowercase
|
||||
|
||||
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
|
||||
if not self.cluster.nodelist():
|
||||
self.cluster.populate(cluster_topology)
|
||||
self.cluster.start(wait_other_notice=True)
|
||||
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node=node1)
|
||||
self.create_schema(session=session, rf=rf)
|
||||
|
||||
return session
|
||||
|
||||
def create_schema(self, session, rf):
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session=session, name="ks", rf=rf)
|
||||
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE lp_table (
|
||||
pk int,
|
||||
ck1 int,
|
||||
val1 text,
|
||||
val2 text,
|
||||
PRIMARY KEY (pk, ck1)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
|
||||
ck = ck_start
|
||||
def _populate_loop():
|
||||
nonlocal ck
|
||||
while True:
|
||||
if stop_populating is not None and stop_populating.is_set():
|
||||
return
|
||||
if ck_end is not None and ck >= ck_end:
|
||||
return
|
||||
for pk in range(self.PARTITIONS):
|
||||
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
|
||||
data.append(row)
|
||||
yield tuple(row)
|
||||
ck += 1
|
||||
|
||||
records_written = ck - ck_start
|
||||
|
||||
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
|
||||
|
||||
parameters = _populate_loop()
|
||||
|
||||
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
|
||||
|
||||
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
|
||||
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
|
||||
return data
|
||||
|
||||
def read(self, session, ck_max, stop_reading: threading.Event = None):
|
||||
def _read_loop():
|
||||
while True:
|
||||
for ck in range(ck_max):
|
||||
for pk in range(self.PARTITIONS):
|
||||
if stop_reading is not None and stop_reading.is_set():
|
||||
return
|
||||
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
|
||||
if stop_reading is None:
|
||||
return
|
||||
|
||||
logger.debug(f"Start reading..")
|
||||
_read_loop()
|
||||
logger.debug(f"Finish reading..")
|
||||
|
||||
def add_column(self, session, column_name, column_type):
|
||||
logger.debug(f"Add {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
|
||||
|
||||
def drop_column(self, session, column_name):
|
||||
logger.debug(f"Drop {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
|
||||
|
||||
def test_large_partition_with_add_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.add_column(session, "new_clmn", "int")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
future.result()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
# Add 'null' values for the new column `new_clmn` in the expected data
|
||||
for i, _ in enumerate(data):
|
||||
data[i].append(None)
|
||||
|
||||
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
|
||||
|
||||
def test_large_partition_with_drop_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.drop_column(session=session, column_name="val1")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
result = []
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
result.append(future.result())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# "Unknown identifier val1" is expected error
|
||||
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
|
||||
class HistoryVerifier:
|
||||
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
|
||||
"""
|
||||
Initialize parameters for further verification of schema history.
|
||||
:param table_name: table thats we change it's schema and verify schema history accordingly.
|
||||
"""
|
||||
|
||||
self.table_name = table_name
|
||||
self.keyspace_name = keyspace_name
|
||||
self.versions = []
|
||||
self.versions_dict = {}
|
||||
self.query = ""
|
||||
|
||||
def verify(self, session, expected_current_diff, expected_prev_diff, query):
|
||||
"""
|
||||
Verify current schema history entry by comparing to previous schema entry.
|
||||
:param session: python cql session
|
||||
:param expected_current_diff: difference of current schema from previous schema
|
||||
:param expected_prev_diff: difference of previous schema from current schema
|
||||
:param query: The query that created new schema
|
||||
"""
|
||||
|
||||
def get_table_id(session, keyspace_name, table_name):
|
||||
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
|
||||
assert table_name, f"Input table_name should have value, table_name={table_name}"
|
||||
query = "select keyspace_name,table_name,id from system_schema.tables"
|
||||
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
|
||||
current_rows = session.execute(query).current_rows
|
||||
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
|
||||
res = current_rows[0]
|
||||
return res["id"]
|
||||
|
||||
def read_schema_history_table(session, cf_id):
|
||||
"""
|
||||
read system.scylla_table_schema_history and verify current version diff from previous vesion
|
||||
:param session: python cql session
|
||||
:param cf_id: uuid of the table we changed it's schema
|
||||
"""
|
||||
|
||||
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
|
||||
res = session.execute(query).current_rows
|
||||
new_versions = list({
|
||||
entry["schema_version"]
|
||||
for entry in res
|
||||
if str(entry["schema_version"]) not in self.versions
|
||||
})
|
||||
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
|
||||
assert len(new_versions) == 1, msg
|
||||
current_version = str(new_versions[0])
|
||||
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
|
||||
columns_list = (
|
||||
{"column_name": entry["column_name"], "type": entry["type"]}
|
||||
for entry in res
|
||||
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
|
||||
)
|
||||
self.versions_dict[current_version] = {}
|
||||
for item in columns_list:
|
||||
self.versions_dict[current_version][item["column_name"]] = item["type"]
|
||||
|
||||
self.versions.append(current_version)
|
||||
if len(self.versions) > 1:
|
||||
current_id = self.versions[-1]
|
||||
previous_id = self.versions[-2]
|
||||
set_current = set(self.versions_dict[current_id].items())
|
||||
set_previous = set(self.versions_dict[previous_id].items())
|
||||
current_diff = set_current - set_previous
|
||||
previous_diff = set_previous - set_current
|
||||
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
|
||||
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
|
||||
if current_diff != expected_current_diff:
|
||||
logger.debug(msg1 + msg2)
|
||||
assert current_diff == expected_current_diff, msg1 + msg2
|
||||
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
|
||||
assert previous_diff == expected_prev_diff, msg1 + msg2
|
||||
|
||||
self.query = query
|
||||
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
|
||||
read_schema_history_table(session, cf_id)
|
||||
|
||||
|
||||
class DDL(NamedTuple):
|
||||
ddl_command: str
|
||||
expected_current_diff: set | None
|
||||
expected_prev_diff: set | None
|
||||
|
||||
|
||||
class TestSchemaHistory(Tester):
|
||||
def prepare(self):
|
||||
cluster = self.cluster
|
||||
# in case support tablets and rf-rack-valid-keyspaces
|
||||
# create cluster with 3 racks with 1 node in each rack
|
||||
cluster_topology = generate_cluster_topology(rack_num=3)
|
||||
rf = 3
|
||||
cluster.populate(cluster_topology).start(wait_other_notice=True)
|
||||
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
|
||||
create_ks(self.session, "lwt_load_ks", rf)
|
||||
|
||||
def test_schema_history_alter_table(self):
|
||||
"""test schema history changes following alter table cql commands"""
|
||||
self.prepare()
|
||||
verifier = HistoryVerifier(table_name="table2")
|
||||
queries_and_expected_diffs = [
|
||||
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
|
||||
]
|
||||
for ddl in queries_and_expected_diffs:
|
||||
self.session.execute(ddl.ddl_command)
|
||||
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)
|
||||
@@ -218,18 +218,6 @@ def assert_row_count_in_select_less(
|
||||
assert count < max_rows_expected, f'Expected a row count < of {max_rows_expected} in query "{query}", but got {count}'
|
||||
|
||||
|
||||
def assert_length_equal(object_with_length, expected_length):
|
||||
"""
|
||||
Assert an object has a specific length.
|
||||
@param object_with_length The object whose length will be checked
|
||||
@param expected_length The expected length of the object
|
||||
|
||||
Examples:
|
||||
assert_length_equal(res, nb_counter)
|
||||
"""
|
||||
assert len(object_with_length) == expected_length, f"Expected {object_with_length} to have length {expected_length}, but instead is of length {len(object_with_length)}"
|
||||
|
||||
|
||||
def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
|
||||
"""
|
||||
asserts that the contents of the two provided lists are equal
|
||||
|
||||
@@ -14,7 +14,6 @@ from cassandra.query import SimpleStatement
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
|
||||
from test.cluster.dtest.dtest_class import create_cf
|
||||
from test.cluster.dtest.tools import assertions
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -52,27 +51,6 @@ def insert_c1c2( # noqa: PLR0913
|
||||
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
|
||||
|
||||
|
||||
def query_c1c2( # noqa: PLR0913
|
||||
session,
|
||||
key,
|
||||
consistency=ConsistencyLevel.QUORUM,
|
||||
tolerate_missing=False,
|
||||
must_be_missing=False,
|
||||
c1_value="value1",
|
||||
c2_value="value2",
|
||||
ks="ks",
|
||||
cf="cf",
|
||||
):
|
||||
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
|
||||
rows = list(session.execute(query))
|
||||
if not tolerate_missing and not must_be_missing:
|
||||
assertions.assert_length_equal(rows, 1)
|
||||
res = rows[0]
|
||||
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
|
||||
if must_be_missing:
|
||||
assertions.assert_length_equal(rows, 0)
|
||||
|
||||
|
||||
def rows_to_list(rows):
|
||||
new_list = [list(row) for row in rows]
|
||||
return new_list
|
||||
|
||||
@@ -131,9 +131,8 @@ async def test_backup_move(manager: ManagerClient, object_storage, move_files):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("ne_parameter", [ "endpoint", "bucket", "snapshot" ])
|
||||
async def test_backup_with_non_existing_parameters(manager: ManagerClient, object_storage, ne_parameter):
|
||||
'''backup should fail if either of the parameters does not exist'''
|
||||
async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the destination bucket does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
@@ -143,8 +142,7 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
backup_snap_name = 'backup'
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name = backup_snap_name)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
@@ -152,18 +150,39 @@ async def test_backup_with_non_existing_parameters(manager: ManagerClient, objec
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf,
|
||||
backup_snap_name if ne_parameter != 'snapshot' else 'no-such-snapshot',
|
||||
object_storage.address if ne_parameter != 'endpoint' else 'no-such-endpoint',
|
||||
object_storage.bucket_name if ne_parameter != 'bucket' else 'no-such-bucket',
|
||||
prefix)
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', object_storage.address, "non-existant-bucket", prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
if ne_parameter == 'endpoint':
|
||||
assert status['error'] == 'std::invalid_argument (endpoint no-such-endpoint not found)'
|
||||
#assert 'S3 request failed. Code: 15. Reason: Access Denied.' in status['error']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_endpoint(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the endpoint is invalid/inaccessible'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
|
||||
assert len(files) > 0
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', "does_not_exist", object_storage.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
assert status['state'] == 'failed'
|
||||
assert status['error'] == 'std::invalid_argument (endpoint does_not_exist not found)'
|
||||
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
@@ -217,6 +236,38 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, object_storage):
|
||||
'''backup should fail if the snapshot does not exist'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
prefix = f'{cf}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'nonexistent-snapshot',
|
||||
object_storage.address, object_storage.bucket_name, prefix)
|
||||
# The task is expected to fail immediately due to invalid snapshot name.
|
||||
# However, since internal implementation details may change, we'll wait for
|
||||
# task completion if immediate failure doesn't occur.
|
||||
actual_state = None
|
||||
for status_api in [manager.api.get_task_status,
|
||||
manager.api.wait_task]:
|
||||
status = await status_api(server.ip_addr, tid)
|
||||
assert status is not None
|
||||
actual_state = status['state']
|
||||
if actual_state == 'failed':
|
||||
break
|
||||
else:
|
||||
assert actual_state == 'failed'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, object_storage):
|
||||
|
||||
@@ -181,14 +181,11 @@ async def test_random_failures(manager: ManagerClient,
|
||||
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
|
||||
await manager.server_stop(server_id=s_info.server_id)
|
||||
|
||||
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
|
||||
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
|
||||
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("Wait until the new node initialization completes or fails.")
|
||||
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
|
||||
await server_log.wait_for("init - (Startup failed:|Scylla version .* initialization completed)", timeout=120)
|
||||
|
||||
if await server_log.grep(STARTUP_FAILED_PATTERN):
|
||||
if await server_log.grep("init - Startup failed:"):
|
||||
LOGGER.info("Check that the new node is dead.")
|
||||
expected_statuses = [psutil.STATUS_DEAD]
|
||||
else:
|
||||
@@ -219,7 +216,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
else:
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("The new node is dead. Check if it failed to startup.")
|
||||
assert await server_log.grep(STARTUP_FAILED_PATTERN)
|
||||
assert await server_log.grep("init - Startup failed:")
|
||||
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
|
||||
|
||||
LOGGER.info("Try to remove the dead new node from the cluster.")
|
||||
|
||||
@@ -26,7 +26,6 @@ skip_in_release:
|
||||
- test_raft_cluster_features
|
||||
- test_cluster_features
|
||||
- dtest/limits_test
|
||||
- dtest/schema_management_test
|
||||
skip_in_debug:
|
||||
- test_shutdown_hang
|
||||
- test_replace
|
||||
|
||||
@@ -146,13 +146,13 @@ async def test_joining_old_node_fails(manager: ManagerClient) -> None:
|
||||
|
||||
# Try to add a node that doesn't support the feature - should fail
|
||||
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
|
||||
# Try to replace with a node that doesn't support the feature - should fail
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
|
||||
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -131,7 +131,7 @@ async def test_major_compaction_flush_all_tables(manager: ManagerClient, compact
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks, cf)
|
||||
|
||||
flush_log = await log.grep("Forcing new commitlog segment and flushing all tables", from_mark=mark)
|
||||
assert len(flush_log) == (2 if expect_all_table_flush else 0)
|
||||
assert len(flush_log) == (1 if expect_all_table_flush else 0)
|
||||
|
||||
# all tables should be flushed the first time unless compaction_flush_all_tables_before_major_seconds == 0
|
||||
await check_all_table_flush_in_major_compaction(compaction_flush_all_tables_before_major_seconds != 0)
|
||||
|
||||
@@ -67,7 +67,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
|
||||
|
||||
logger.info(f"Removing node {servers[0]} using {servers[1]}")
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
servers = servers[1:]
|
||||
|
||||
logger.info("Checking results of the background writes")
|
||||
|
||||
@@ -74,7 +74,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
|
||||
|
||||
logger.info(f"Removing node {servers[0]} using {servers[1]}")
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
servers = servers[1:]
|
||||
|
||||
logger.info("Checking results of the background writes")
|
||||
|
||||
@@ -74,6 +74,7 @@ def test_cast_int_literal_with_type_hint_to_blob(cql, table1, scylla_only):
|
||||
# An int can always be converted to a valid blob, but blobs might have wrong amount of bytes
|
||||
# and can't be converted to a valid int.
|
||||
def test_cast_blob_literal_to_int(cql, table1):
|
||||
pk = unique_key_int()
|
||||
with pytest.raises(InvalidRequest, match='HEX'):
|
||||
cql.execute(f"INSERT INTO {table1} (pk) VALUES (0xBAAAAAAD)")
|
||||
with pytest.raises(InvalidRequest, match='blob'):
|
||||
|
||||
@@ -61,7 +61,7 @@ def test_select_default_order(cql, table_int_desc):
|
||||
def test_multi_column_relation_desc(cql, table2):
|
||||
k = unique_key_int()
|
||||
stmt = cql.prepare(f'INSERT INTO {table2} (p, c1, c2) VALUES (?, ?, ?)')
|
||||
cql.execute(stmt, [k, 1, 0])
|
||||
cql.execute(stmt, [k, 1, 1])
|
||||
cql.execute(stmt, [k, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = {k} AND (c1, c2) >= (1, 1)'))
|
||||
cql.execute(stmt, [0, 1, 0])
|
||||
cql.execute(stmt, [0, 1, 1])
|
||||
cql.execute(stmt, [0, 1, 2])
|
||||
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = 0 AND (c1, c2) >= (1, 1)'))
|
||||
|
||||
@@ -352,7 +352,7 @@ def test_storage_options_alter_type(cql, scylla_only):
|
||||
ksdef_local = "WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : '1' } " \
|
||||
"AND STORAGE = { 'type' : 'S3', 'bucket' : '/b1', 'endpoint': 'localhost'}"
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
res = cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
|
||||
|
||||
# Reproducer for scylladb#14139
|
||||
def test_alter_keyspace_preserves_udt(cql):
|
||||
|
||||
@@ -171,6 +171,7 @@ def test_grant_revoke_data_permissions(cql, test_keyspace):
|
||||
# Test that permissions for user-defined functions are serialized in a Cassandra-compatible way
|
||||
def test_udf_permissions_serialization(cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace, new_user(cql) as user:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
# Creating a bilingual function makes this test case work for both Scylla and Cassandra
|
||||
@@ -246,6 +247,7 @@ def test_udf_permissions_quoted_names(cassandra_bug, cql):
|
||||
# permissions. Cassandra erroneously reports the unrelated missing permissions.
|
||||
# Reported to Cassandra as CASSANDRA-19005.
|
||||
def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
schema = "a int primary key"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
|
||||
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42L;'"
|
||||
@@ -286,6 +288,7 @@ def test_drop_udf_with_same_name(cql, cassandra_bug):
|
||||
# Tests for ALTER are separate, because they are qualified as cassandra_bug
|
||||
def test_grant_revoke_udf_permissions(cql):
|
||||
schema = "a int primary key, b list<int>"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int, l list<int>) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
@@ -332,6 +335,7 @@ def test_grant_revoke_udf_permissions(cql):
|
||||
# and yet it's not enforced
|
||||
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
|
||||
@@ -90,6 +90,8 @@ def test_attached_service_level(scylla_only, cql):
|
||||
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
||||
|
||||
def test_list_effective_service_level(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
timeout = "10s"
|
||||
workload_type = "batch"
|
||||
|
||||
@@ -118,6 +120,8 @@ def test_list_effective_service_level(scylla_only, cql):
|
||||
assert row.value == "batch"
|
||||
|
||||
def test_list_effective_service_level_shares(scylla_only, cql):
|
||||
sl1 = "sl1"
|
||||
sl2 = "sl2"
|
||||
shares1 = 500
|
||||
shares2 = 200
|
||||
|
||||
@@ -180,6 +184,8 @@ def test_default_shares_in_listings(scylla_only, cql):
|
||||
# and that the messages Scylla returns are informative.
|
||||
def test_manipulating_default_service_level(cql, scylla_only):
|
||||
default_sl = "default"
|
||||
# Service levels are case-sensitive (if used with quotation marks).
|
||||
fake_default_sl = '"DeFaUlT"'
|
||||
|
||||
with new_user(cql) as role:
|
||||
# Creation.
|
||||
|
||||
@@ -76,7 +76,6 @@ def test_clients(scylla_only, cql):
|
||||
'ssl_enabled',
|
||||
'ssl_protocol',
|
||||
'username',
|
||||
'client_options',
|
||||
])
|
||||
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
|
||||
# There must be at least one connection - the one that sent this SELECT
|
||||
@@ -85,9 +84,6 @@ def test_clients(scylla_only, cql):
|
||||
for cl in cls:
|
||||
assert(cl[0] == '127.0.0.1')
|
||||
assert(cl[2] == 'cql')
|
||||
client_options = cl[13]
|
||||
assert(client_options.get('DRIVER_NAME') == cl[4])
|
||||
assert(client_options.get('DRIVER_VERSION') == cl[5])
|
||||
|
||||
# We only want to check that the table exists with the listed columns, to assert
|
||||
# backwards compatibility.
|
||||
|
||||
@@ -23,7 +23,7 @@ def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
|
||||
try:
|
||||
f42 = unique_name()
|
||||
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
|
||||
cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
||||
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
|
||||
except NoHostAvailable as err:
|
||||
if "not enabled" in str(err):
|
||||
@@ -373,7 +373,8 @@ def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
assert len(res) == 1 and res[0].result == 177147
|
||||
|
||||
# Test that only compilable input is accepted
|
||||
def test_compilable(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
|
||||
"""
|
||||
@@ -383,7 +384,8 @@ Dear wasmtime compiler, please return a function which returns its float argumen
|
||||
|
||||
# Test that not exporting a function with matching name
|
||||
# results in an error
|
||||
def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
@@ -401,7 +403,8 @@ def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
|
||||
f"AS '{wrong_source}'")
|
||||
|
||||
# Test that trying to use something that is exported, but is not a function, won't work
|
||||
def test_not_a_function(cql, test_keyspace, scylla_with_wasm_only):
|
||||
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
|
||||
table = table1
|
||||
wrong_source = f"""
|
||||
(module
|
||||
(type (;0;) (func (param f32) (result f32)))
|
||||
|
||||
@@ -49,9 +49,6 @@ RUN_ID = pytest.StashKey[int]()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Store pytest config globally so we can access it in hooks that only receive report
|
||||
_pytest_config: pytest.Config | None = None
|
||||
|
||||
|
||||
def pytest_addoption(parser: pytest.Parser) -> None:
|
||||
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
|
||||
@@ -187,52 +184,6 @@ def pytest_sessionstart(session: pytest.Session) -> None:
|
||||
)
|
||||
|
||||
|
||||
@pytest.hookimpl(trylast=True)
|
||||
def pytest_runtest_logreport(report):
|
||||
"""Add custom XML attributes to JUnit testcase elements.
|
||||
|
||||
This hook wraps the node_reporter's to_xml method to add custom attributes
|
||||
when the XML element is created. This approach works with pytest-xdist because
|
||||
it modifies the XML element directly when it's generated, rather than trying
|
||||
to modify attrs before finalize() is called.
|
||||
|
||||
Attributes added:
|
||||
- function_path: The function path of the test case (excluding parameters).
|
||||
|
||||
Uses trylast=True to run after LogXML's hook has created the node_reporter.
|
||||
"""
|
||||
from _pytest.junitxml import xml_key
|
||||
|
||||
# Only process call phase
|
||||
if report.when != "call":
|
||||
return
|
||||
|
||||
# Get the XML reporter
|
||||
config = _pytest_config
|
||||
if config is None:
|
||||
return
|
||||
|
||||
xml = config.stash.get(xml_key, None)
|
||||
if xml is None:
|
||||
return
|
||||
|
||||
node_reporter = xml.node_reporter(report)
|
||||
|
||||
nodeid = report.nodeid
|
||||
function_path = f'test/{nodeid.rsplit('.', 2)[0].rsplit('[', 1)[0]}'
|
||||
|
||||
# Wrap the to_xml method to add custom attributes to the element
|
||||
original_to_xml = node_reporter.to_xml
|
||||
|
||||
def custom_to_xml():
|
||||
"""Wrapper that adds custom attributes to the testcase element."""
|
||||
element = original_to_xml()
|
||||
element.set("function_path", function_path)
|
||||
return element
|
||||
|
||||
node_reporter.to_xml = custom_to_xml
|
||||
|
||||
|
||||
def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
if not session.config.getoption("--test-py-init"):
|
||||
return
|
||||
@@ -245,9 +196,6 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
|
||||
|
||||
def pytest_configure(config: pytest.Config) -> None:
|
||||
global _pytest_config
|
||||
_pytest_config = config
|
||||
|
||||
config.build_modes = get_modes_to_run(config)
|
||||
|
||||
if testpy_run_id := config.getoption("--run_id"):
|
||||
|
||||
@@ -243,7 +243,7 @@ async def get_scylla_2025_1_executable(build_mode: str) -> str:
|
||||
if not unpacked_marker.exists():
|
||||
if not downloaded_marker.exists():
|
||||
archive_path.unlink(missing_ok=True)
|
||||
await run_process(["curl", "--retry", "10", "--fail", "--silent", "--show-error", "--output", archive_path, url])
|
||||
await run_process(["curl", "--silent", "--show-error", "--output", archive_path, url])
|
||||
downloaded_marker.touch()
|
||||
shutil.rmtree(unpack_dir, ignore_errors=True)
|
||||
unpack_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
@@ -40,8 +40,16 @@ struct test_pinger: public direct_failure_detector::pinger {
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Simulate a blocking ping that only returns when aborted.
|
||||
co_await sleep_abortable(std::chrono::hours(1), as);
|
||||
promise<> p;
|
||||
auto f = p.get_future();
|
||||
auto sub = as.subscribe([&, p = std::move(p)] () mutable noexcept {
|
||||
p.set_value();
|
||||
});
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
co_await std::move(f);
|
||||
throw abort_requested_exception{};
|
||||
}, as);
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -2930,18 +2930,6 @@ private:
|
||||
|
||||
static constexpr elem_t magic = 54313;
|
||||
|
||||
static void check_digest_value(elem_t d) {
|
||||
if (d < 0 || d >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value out of range: {}", d));
|
||||
}
|
||||
}
|
||||
|
||||
static void validate_digest_value(elem_t d_new, elem_t d_old, elem_t x) {
|
||||
if (d_new < 0 || d_new >= magic) {
|
||||
on_fatal_internal_error(tlogger, fmt::format("Digest value invalid after appending/removing element: d_new {}, d_old {}, x {}", d_new, d_old, x));
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
append_seq(std::vector<elem_t> v) : _seq{make_lw_shared<std::vector<elem_t>>(std::move(v))}, _end{_seq->size()}, _digest{0} {
|
||||
for (auto x : *_seq) {
|
||||
@@ -2950,26 +2938,20 @@ public:
|
||||
}
|
||||
|
||||
static elem_t digest_append(elem_t d, elem_t x) {
|
||||
check_digest_value(d);
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
|
||||
auto y = (d + x) % magic;
|
||||
SCYLLA_ASSERT(digest_remove(y, x) == d);
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
}
|
||||
|
||||
static elem_t digest_remove(elem_t d, elem_t x) {
|
||||
check_digest_value(d);
|
||||
BOOST_REQUIRE_LE(0, d);
|
||||
BOOST_REQUIRE_LT(d, magic);
|
||||
|
||||
auto y = (d - x) % magic;
|
||||
|
||||
if (y < 0) {
|
||||
y += magic;
|
||||
}
|
||||
|
||||
validate_digest_value(y, d, x);
|
||||
return y;
|
||||
return y < 0 ? y + magic : y;
|
||||
}
|
||||
|
||||
elem_t digest() const {
|
||||
|
||||
@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
|
||||
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
|
||||
|
||||
|
||||
class RandomContentFile:
|
||||
class random_content_file:
|
||||
def __init__(self, path: str, size_in_bytes: int):
|
||||
path = pathlib.Path(path)
|
||||
self.filename = path if path.is_file() else path / str(uuid.uuid4())
|
||||
@@ -64,11 +64,11 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
for server in servers:
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks)
|
||||
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
|
||||
|
||||
try:
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
cmdline = [*global_cmdline,
|
||||
"--logger-log-level", "compaction=debug"]
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
|
||||
@@ -134,7 +134,7 @@ async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
|
||||
|
||||
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
s2_mark = await s2_log.mark()
|
||||
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
|
||||
|
||||
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
|
||||
|
||||
|
||||
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
|
||||
|
||||
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
|
||||
mark = await log.mark()
|
||||
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
|
||||
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
|
||||
await manager.server_restart(servers[0].server_id, wait_others=2)
|
||||
|
||||
|
||||
@@ -198,62 +198,47 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
auto cfg = config();
|
||||
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
|
||||
auto vs = vector_store_client{cfg};
|
||||
bool fail_dns_resolution = true;
|
||||
auto count = 0;
|
||||
auto as = abort_source_timeout();
|
||||
auto address = inet_address("127.0.0.1");
|
||||
configure(vs)
|
||||
.with_dns_refresh_interval(milliseconds(10))
|
||||
.with_wait_for_client_timeout(milliseconds(20))
|
||||
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
|
||||
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
|
||||
BOOST_CHECK_EQUAL(host, "good.authority.here");
|
||||
if (fail_dns_resolution) {
|
||||
count++;
|
||||
if (count % 3 != 0) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
co_return address;
|
||||
co_return inet_address(format("127.0.0.{}", count));
|
||||
});
|
||||
|
||||
vs.start_background_tasks();
|
||||
|
||||
// Wait for the DNS resolution to fail
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
|
||||
BOOST_CHECK_EQUAL(count, 3);
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
|
||||
|
||||
fail_dns_resolution = true;
|
||||
// Trigger DNS resolver to check for address changes
|
||||
// Resolver will not re-check automatically after successful resolution
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail again
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
// Resolve to a different address
|
||||
address = inet_address("127.0.0.2");
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
|
||||
BOOST_CHECK_EQUAL(count, 6);
|
||||
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
|
||||
|
||||
co_await vs.stop();
|
||||
}
|
||||
|
||||
@@ -353,7 +353,7 @@ future<> controller::set_cql_ready(bool ready) {
|
||||
return _gossiper.local().add_local_application_state(gms::application_state::RPC_READY, gms::versioned_value::cql_ready(ready));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server ? _server->local().get_client_data() : protocol_server::get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ public:
|
||||
virtual future<> start_server() override;
|
||||
virtual future<> stop_server() override;
|
||||
virtual future<> request_stop_server() override;
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
future<> update_connections_scheduling_group();
|
||||
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/net/socket_defs.hh>
|
||||
#include <vector>
|
||||
#include "client_data.hh"
|
||||
@@ -44,8 +43,8 @@ public:
|
||||
/// This variant is used by the REST API so failure is acceptable.
|
||||
virtual future<> request_stop_server() = 0;
|
||||
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>>();
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() {
|
||||
return make_ready_future<utils::chunked_vector<client_data>>(utils::chunked_vector<client_data>());
|
||||
}
|
||||
|
||||
protocol_server(seastar::scheduling_group sg) noexcept : _sched_group(std::move(sg)) {}
|
||||
|
||||
@@ -691,7 +691,6 @@ client_data cql_server::connection::make_client_data() const {
|
||||
cd.connection_stage = client_connection_stage::authenticating;
|
||||
}
|
||||
cd.scheduling_group_name = _current_scheduling_group.name();
|
||||
cd.client_options = _client_state.get_client_options();
|
||||
|
||||
cd.ssl_enabled = _ssl_enabled;
|
||||
cd.ssl_protocol = _ssl_protocol;
|
||||
@@ -959,17 +958,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
|
||||
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
|
||||
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
|
||||
_client_state.set_driver_version(driver_ver_opt->second);
|
||||
}
|
||||
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
|
||||
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
|
||||
_client_state.set_driver_name(driver_name_opt->second);
|
||||
}
|
||||
|
||||
// Store all received client options for later exposure in the system.clients 'client_options' column
|
||||
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
|
||||
// identical key/value sets across multiple connections (e.g., same driver name/version).
|
||||
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
|
||||
|
||||
cql_protocol_extension_enum_set cql_proto_exts;
|
||||
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
||||
if (options.contains(protocol_extension_name(ext))) {
|
||||
@@ -1653,9 +1647,6 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
|
||||
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
|
||||
opts.insert({"COMPRESSION", "lz4"});
|
||||
opts.insert({"COMPRESSION", "snappy"});
|
||||
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
|
||||
// e.g. CQL driver configuration.
|
||||
opts.insert({"CLIENT_OPTIONS", ""});
|
||||
if (_server._config.allow_shard_aware_drivers) {
|
||||
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
|
||||
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
||||
@@ -2317,11 +2308,11 @@ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata
|
||||
return _response_metadata_id.value();
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> cql_server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await for_each_gently([&ret] (const generic_server::connection& c) {
|
||||
const connection& conn = dynamic_cast<const connection&>(c);
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
|
||||
ret.emplace_back(conn.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -206,7 +206,6 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
std::unique_ptr<event_notifier> _notifier;
|
||||
private:
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
transport_stats _stats;
|
||||
auth::service& _auth_service;
|
||||
qos::service_level_controller& _sl_controller;
|
||||
@@ -235,7 +234,7 @@ public:
|
||||
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
future<> update_connections_scheduling_group();
|
||||
future<> update_connections_service_level_params();
|
||||
future<std::vector<connection_service_level_params>> get_connections_service_level_params();
|
||||
|
||||
@@ -1547,8 +1547,8 @@ void reclaim_timer::report() const noexcept {
|
||||
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
|
||||
auto info_level = _stall_detected ? log_level::info : log_level::debug;
|
||||
auto MiB = 1024*1024;
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected && !_preemptible,
|
||||
(_stall_detected && !_preemptible) ? current_backtrace() : saved_backtrace{});
|
||||
auto msg_extra = extra_msg_when_stall_detected(_stall_detected,
|
||||
_stall_detected ? current_backtrace() : saved_backtrace{});
|
||||
|
||||
timing_logger.log(time_level, "{} took {} us, trying to release {:.3f} MiB {}preemptibly, reserve: {{goal: {}, max: {}}}{}",
|
||||
_name, (_duration + 500ns) / 1us, (float)_memory_to_release / MiB, _preemptible ? "" : "non-",
|
||||
|
||||
Reference in New Issue
Block a user