Compare commits

..

7 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
446e4c11d0 cql3: fix documentation and avoid unnecessary copy in execute_paged_internal
- Remove duplicate comment block in header file
- Avoid copying query_state when it's provided by using a reference
  instead of a copy in the ternary expression

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 11:31:22 +00:00
copilot-swe-agent[bot]
db4e08b846 cql3: consolidate execute_paged_internal into single function with optional parameter
Replace two overloaded functions with a single function that takes an
optional query_state pointer (defaults to nullptr). When nullptr, it
uses the default internal query state. This eliminates code duplication
while maintaining the same functionality.

Addresses review feedback from @ptrsmrn.

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 11:28:59 +00:00
copilot-swe-agent[bot]
cabb712e11 cql3: improve documentation for new query_internal overloads
Add detailed parameter descriptions to the new overloads that accept
query_state, matching the documentation style of existing methods.

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 11:05:50 +00:00
copilot-swe-agent[bot]
ce2d887bff cql3: refactor execute_paged_internal to reduce code duplication
Make the original execute_paged_internal call the new overload that
accepts query_state, eliminating duplicate code.

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 11:02:39 +00:00
copilot-swe-agent[bot]
50783eee8b cql3: add query_internal overloads with custom timeout support
Add overloads for query_internal, execute_paged_internal, and
for_each_cql_result that accept a service::query_state parameter,
allowing callers to specify custom timeouts for paged internal queries.

This enables the auth migration code to use paged queries with the
5-minute timeout that was previously used with execute_internal.

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 10:59:13 +00:00
copilot-swe-agent[bot]
2c813f3a9b auth: restore timeout in migrate_to_auth_v2 API call
Change the timeout parameter in announce_mutations_with_batching from
std::nullopt to get_raft_timeout() to match the pattern used elsewhere
in auth code and address the review comment in PR #25395.

Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2025-12-11 09:54:33 +00:00
copilot-swe-agent[bot]
aac36549b6 Initial plan 2025-12-11 09:45:46 +00:00
7 changed files with 98 additions and 31 deletions

View File

@@ -979,8 +979,9 @@ client_data server::ongoing_request::make_client_data() const {
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
return cd;
}

View File

@@ -884,15 +884,6 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
::service::client_state cs(::service::client_state::internal_tag{}, tc);
::service::query_state qs(cs, empty_service_permit());
auto rows = co_await qp.execute_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
qs,
{},
cql3::query_processor::cache_internal::no);
if (rows->empty()) {
continue;
}
std::vector<sstring> col_names;
for (const auto& col : schema->all_columns()) {
col_names.push_back(col.name_as_cql_string());
@@ -901,7 +892,14 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
for (size_t i = 1; i < col_names.size(); ++i) {
val_binders_str += ", ?";
}
for (const auto& row : *rows) {
co_await qp.query_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
qs,
{},
1000,
[&](const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
std::vector<data_value_or_unset> values;
for (const auto& col : schema->all_columns()) {
if (row.has(col.name_as_text())) {
@@ -925,7 +923,8 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
format("expecting single insert mutation, got {}", muts.size()));
}
co_yield std::move(muts[0]);
}
co_return stop_iteration::no;
});
}
co_yield co_await sys_ks.make_auth_version_mutation(ts,
db::system_keyspace::auth_version_t::v2);
@@ -934,7 +933,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
start_operation_func,
std::move(gen),
as,
std::nullopt);
get_raft_timeout());
}
}

View File

@@ -886,9 +886,10 @@ future<> query_processor::for_each_cql_result(
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_paged_internal(internal_query_state& state) {
query_processor::execute_paged_internal(internal_query_state& state, service::query_state* query_state) {
state.p->statement->validate(*this, service::client_state::for_internal_calls());
auto qs = query_state_for_internal_call();
auto default_qs = query_state ? std::nullopt : std::make_optional(query_state_for_internal_call());
auto& qs = query_state ? *query_state : *default_qs;
::shared_ptr<cql_transport::messages::result_message> msg =
co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
@@ -925,6 +926,20 @@ query_processor::execute_paged_internal(internal_query_state& state) {
co_return ::make_shared<untyped_result_set>(msg);
}
future<> query_processor::for_each_cql_result(
cql3::internal_query_state& state,
service::query_state& query_state,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set::row&)> f) {
do {
auto msg = co_await execute_paged_internal(state, &query_state);
for (auto& row : *msg) {
if ((co_await f(row)) == stop_iteration::yes) {
co_return;
}
}
} while (has_more_results(state));
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
const sstring& query_string,
@@ -1202,6 +1217,17 @@ future<> query_processor::query_internal(
co_return co_await for_each_cql_result(query_state, std::move(f));
}
future<> query_processor::query_internal(
const sstring& query_string,
db::consistency_level cl,
service::query_state& query_state,
const data_value_list& values,
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f) {
auto paged_state = create_paged_state(query_string, cl, values, page_size);
co_return co_await for_each_cql_result(paged_state, query_state, std::move(f));
}
future<> query_processor::query_internal(
const sstring& query_string,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f) {

View File

@@ -332,6 +332,29 @@ public:
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*!
* \brief iterate over all cql results using paging with a custom query_state (for timeout control)
*
* You can use placeholders in the query, the statement will only be prepared once.
*
* query_string - the cql string, can contain placeholders
* cl - consistency level of the query
* query_state - query state with custom timeout configuration
* values - values to be substituted for the placeholders in the query
* page_size - maximum page size
* f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop
*
* \note This function is optimized for convenience, not performance.
*/
future<> query_internal(
const sstring& query_string,
db::consistency_level cl,
service::query_state& query_state,
const data_value_list& values,
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*
* \brief iterate over all cql results using paging
* An overload of query_internal without query parameters
@@ -501,11 +524,14 @@ private:
int32_t page_size);
/*!
* \brief run a query using paging
* \brief run a query using paging with an optional custom query_state (for timeout control)
*
* state - internal query state containing prepared statement and options
* query_state - optional query state with custom timeout configuration (defaults to internal query state)
*
* \note Optimized for convenience, not performance.
*/
future<::shared_ptr<untyped_result_set>> execute_paged_internal(internal_query_state& state);
future<::shared_ptr<untyped_result_set>> execute_paged_internal(internal_query_state& state, service::query_state* query_state = nullptr);
/*!
* \brief iterate over all results using paging, accept a function that returns a future
@@ -516,6 +542,21 @@ private:
cql3::internal_query_state& state,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*!
* \brief iterate over all results using paging with a custom query_state (for timeout control)
*
* state - internal query state containing prepared statement and options
* query_state - query state with custom timeout configuration
* f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop
*
* \note Optimized for convenience, not performance.
*/
future<> for_each_cql_result(
cql3::internal_query_state& state,
service::query_state& query_state,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*!
* \brief check, based on the state if there are additional results
* Users of the paging, should not use the internal_query_state directly

View File

@@ -14,7 +14,6 @@ import cassandra.cluster
from contextlib import contextmanager
import re
import ssl
import time
# This function normalizes the SSL cipher suite name (a string),
@@ -67,12 +66,13 @@ def test_tls_versions(cql):
# a regression test for #9216
def test_system_clients_stores_tls_info(cql):
if not cql.cluster.ssl_context:
table_result = cql.execute(f"SELECT * FROM system.clients")
for row in table_result:
assert not row.ssl_enabled
assert row.ssl_protocol is None
assert row.ssl_cipher_suite is None
else:
table_result = cql.execute(f"SELECT * FROM system.clients")
for row in table_result:
assert not row.ssl_enabled
assert row.ssl_protocol is None
assert row.ssl_cipher_suite is None
if cql.cluster.ssl_context:
# TLS v1.2 must be supported, because this is the default version that
# "cqlsh --ssl" uses. If this fact changes in the future, we may need
# to reconsider this test.
@@ -82,8 +82,7 @@ def test_system_clients_stores_tls_info(cql):
# so we need to retry until all connections are initialized and have their TLS info recorded in system.clients,
# otherwise we'd end up with some connections e.g. having their ssl_enabled=True but other fields still None.
expected_ciphers = [normalize_cipher(cipher['name']) for cipher in ssl.create_default_context().get_ciphers()]
deadline = time.time() + 10 # 10 seconds timeout
while time.time() < deadline:
for _ in range(1000): # try for up to 1000 * 0.01s = 10s seconds
rows = session.execute(f"SELECT * FROM system.clients")
if rows and all(
row.ssl_enabled
@@ -93,7 +92,7 @@ def test_system_clients_stores_tls_info(cql):
):
return
time.sleep(0.01)
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10 seconds")
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10s seconds")
@contextmanager

View File

@@ -414,8 +414,9 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
conn->_ssl_cipher_suite = cipher_suite;
return make_ready_future<bool>(true);
});
}).handle_exception([conn](std::exception_ptr ep) {
return seastar::make_exception_future<bool>(std::runtime_error(fmt::format("Inspecting TLS connection failed: {}", ep)));
}).handle_exception([this, conn](std::exception_ptr ep) {
_logger.warn("Inspecting TLS connection failed: {}", ep);
return make_ready_future<bool>(false);
})
: make_ready_future<bool>(true)
).then([conn] (bool ok){

View File

@@ -63,7 +63,7 @@ protected:
bool _ssl_enabled = false;
std::optional<sstring> _ssl_cipher_suite = std::nullopt;
std::optional<sstring> _ssl_protocol = std::nullopt;
std::optional<sstring> _ssl_protocol = std::nullopt;;
private:
future<> process_until_tenant_switch();