mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-02 14:15:46 +00:00
Merge '[Backport 2026.1] cql3: prepare list statments metadta_id during prepare statement , send the correct metadata_id directly to the client' from Scylladb[bot]
This series makes result metadata handling for auth LIST statements consistent and adds coverage for the driver-visible behavior.
The first patch makes the result-column metadata construction shared across the affected statements, so the metadata shape used for PREPARE and EXECUTE stays uniform and easier to reason about.
The second patch adds regression coverage for both sides of the metadata-id flow:
- a Python auth-cluster test verifies that prepared LIST ROLES OF returns a non-empty result metadata id and that a later EXECUTE reuses it without METADATA_CHANGED
- a Boost transport test covers the recovery path where the client sends an empty request metadata id and the server responds with METADATA_CHANGED and the full metadata
Together these patches tighten the implementation and protect the prepared-metadata-id behavior exposed to drivers.
Fixes: SCYLLADB-1543
backport: this change should be backported to all active branches to help the driver operation
- (cherry picked from commit de19714763)
Parent PR: #29347
Closes scylladb/scylladb#29477
* github.com:scylladb/scylladb:
test/cluster: cover prepared LIST metadata ids in one setup Precompute the expected metadata-id hashes for the prepared LIST auth and service-level statements and verify that PREPARE returns them while EXECUTE reuses the prepared metadata without METADATA_CHANGED. Run all cases in a single auth-cluster test after preparing the cluster, role, and service level once through the regular manager fixture.
cql: expose stable result metadata for prepared LIST statements Prepared LIST statements were not calculating metadata in PREPARE path, and sent empty string hash to client causing problematic behaviour where metadat_id was not recalculated correctly. This patch moves metadata construction into get_result_metadata() for the affected LIST statements and reuse that metadata when building the result set. This gives PREPARE a stable metadata id for LIST ROLES, LIST USERS, LIST PERMISSIONS and the service-level variants. This patch also adds a new boost test that verifies that when an EXECUTE request carries an empty result metadata id while the server has a real metadata id for the result set, the response is marked METADATA_CHANGED and includes the full result metadata plus the server metadata id. This covers the recovery path for clients that send an empty or otherwise unusable metadata id instead of a matching cached one.
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "utils/assert.hh"
|
||||
#include "cql3/column_specification.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -31,4 +32,12 @@ bool column_specification::all_in_same_table(const std::vector<lw_shared_ptr<col
|
||||
});
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_specification> make_column_spec(std::string_view ks_name, std::string_view cf_name, sstring name, data_type type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
ks_name,
|
||||
cf_name,
|
||||
::make_shared<column_identifier>(std::move(name), true),
|
||||
std::move(type));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -42,4 +42,6 @@ public:
|
||||
static bool all_in_same_table(const std::vector<lw_shared_ptr<column_specification>>& names);
|
||||
};
|
||||
|
||||
lw_shared_ptr<column_specification> make_column_spec(std::string_view ks_name, std::string_view cf_name, sstring name, data_type type);
|
||||
|
||||
}
|
||||
|
||||
@@ -30,13 +30,14 @@ list_effective_service_level_statement::prepare(data_dictionary::database db, cq
|
||||
return std::make_unique<prepared_statement>(audit_info(), ::make_shared<list_effective_service_level_statement>(*this));
|
||||
}
|
||||
|
||||
static auto make_column(sstring name, const shared_ptr<const abstract_type> type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
"QOS",
|
||||
"effective_service_level",
|
||||
::make_shared<column_identifier>(std::move(name), true),
|
||||
type);
|
||||
};
|
||||
shared_ptr<const cql3::metadata> list_effective_service_level_statement::get_result_metadata() const {
|
||||
return ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<column_specification>>{
|
||||
make_column_spec("QOS", "effective_service_level", "service_level_option", utf8_type),
|
||||
make_column_spec("QOS", "effective_service_level", "effective_service_level", utf8_type),
|
||||
make_column_spec("QOS", "effective_service_level", "value", utf8_type)
|
||||
});
|
||||
}
|
||||
|
||||
static bytes_opt decompose_timeout (const qos::service_level_options::timeout_type& duration) {
|
||||
return std::visit(overloaded_functor{
|
||||
@@ -69,11 +70,6 @@ static bytes_opt decompose_shares(const qos::service_level_options::shares_type&
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
list_effective_service_level_statement::execute(query_processor& qp, service::query_state& state, const query_options&, std::optional<service::group0_guard>) const {
|
||||
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({
|
||||
make_column("service_level_option", utf8_type),
|
||||
make_column("effective_service_level", utf8_type),
|
||||
make_column("value", utf8_type)
|
||||
});
|
||||
auto& role_manager = state.get_client_state().get_auth_service()->underlying_role_manager();
|
||||
|
||||
if (!co_await role_manager.exists(_role_name)) {
|
||||
@@ -87,7 +83,7 @@ list_effective_service_level_statement::execute(query_processor& qp, service::qu
|
||||
throw exceptions::invalid_request_exception(format("Role {} doesn't have assigned any service level", _role_name));
|
||||
}
|
||||
|
||||
auto rs = std::make_unique<result_set>(metadata);
|
||||
auto rs = std::make_unique<result_set>(::make_shared<cql3::metadata>(*get_result_metadata()));
|
||||
rs->add_row({
|
||||
utf8_type->decompose("workload_type"),
|
||||
utf8_type->decompose(slo->effective_names->workload),
|
||||
@@ -110,4 +106,4 @@ list_effective_service_level_statement::execute(query_processor& qp, service::qu
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,9 +21,11 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,9 +15,18 @@
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
shared_ptr<const cql3::metadata> cql3::statements::list_permissions_statement::get_result_metadata() const {
|
||||
return ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>>{
|
||||
make_column_spec(db::system_keyspace::NAME, "permissions", "role", utf8_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "permissions", "username", utf8_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "permissions", "resource", utf8_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "permissions", "permission", utf8_type)});
|
||||
}
|
||||
|
||||
cql3::statements::list_permissions_statement::list_permissions_statement(
|
||||
auth::permission_set permissions,
|
||||
std::optional<auth::resource> resource,
|
||||
@@ -80,18 +89,6 @@ cql3::statements::list_permissions_statement::execute(
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
auto make_column = [auth_ks = auth::get_auth_ks_name(qp)](sstring name) {
|
||||
return make_lw_shared<column_specification>(
|
||||
auth_ks,
|
||||
"permissions",
|
||||
::make_shared<column_identifier>(std::move(name), true),
|
||||
utf8_type);
|
||||
};
|
||||
|
||||
std::vector<lw_shared_ptr<column_specification>> metadata({
|
||||
make_column("role"), make_column("username"), make_column("resource"), make_column("permission")
|
||||
});
|
||||
|
||||
const auto make_resource_filter = [this]()
|
||||
-> std::optional<std::pair<auth::resource, auth::recursive_permissions>> {
|
||||
if (!_resource) {
|
||||
@@ -104,6 +101,7 @@ cql3::statements::list_permissions_statement::execute(
|
||||
};
|
||||
|
||||
const auto& as = *state.get_client_state().get_auth_service();
|
||||
auto metadata = ::make_shared<cql3::metadata>(*get_result_metadata());
|
||||
|
||||
return do_with(make_resource_filter(), [this, &as, metadata = std::move(metadata)](const auto& resource_filter) mutable {
|
||||
return auth::list_filtered_permissions(
|
||||
|
||||
@@ -34,6 +34,8 @@ public:
|
||||
|
||||
std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
|
||||
void validate(query_processor&, const service::client_state&) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
@@ -35,6 +35,8 @@ public:
|
||||
|
||||
std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "cql3/statements/list_service_level_attachments_statement.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/query_state.hh"
|
||||
@@ -17,6 +16,15 @@ namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
|
||||
shared_ptr<const cql3::metadata> list_service_level_attachments_statement::get_result_metadata() const {
|
||||
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({
|
||||
make_column_spec("QOS", "service_levels_attachments", "role", utf8_type),
|
||||
make_column_spec("QOS", "service_levels_attachments", "service_level", utf8_type)
|
||||
});
|
||||
|
||||
return ::make_shared<cql3::metadata>(metadata);
|
||||
}
|
||||
|
||||
list_service_level_attachments_statement::list_service_level_attachments_statement(sstring role_name) :
|
||||
_role_name(role_name), _describe_all(false) {
|
||||
}
|
||||
@@ -40,19 +48,7 @@ list_service_level_attachments_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
|
||||
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
"QOS",
|
||||
"service_levels_attachments",
|
||||
::make_shared<column_identifier>(std::move(name), true),
|
||||
type);
|
||||
};
|
||||
|
||||
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({
|
||||
make_column("role", utf8_type), make_column("service_level", utf8_type)
|
||||
});
|
||||
|
||||
auto metadata = ::make_shared<cql3::metadata>(*get_result_metadata());
|
||||
|
||||
return make_ready_future().then([this, &state] () {
|
||||
if (_describe_all) {
|
||||
@@ -67,7 +63,7 @@ list_service_level_attachments_statement::execute(query_processor& qp,
|
||||
});
|
||||
|
||||
}
|
||||
}).then([] (std::unordered_map<sstring, sstring> roles_to_att_val) {
|
||||
}).then([metadata = std::move(metadata)] (std::unordered_map<sstring, sstring> roles_to_att_val) {
|
||||
|
||||
auto rs = std::make_unique<result_set>(metadata);
|
||||
for (auto&& role_to_sl : roles_to_att_val) {
|
||||
|
||||
@@ -22,6 +22,7 @@ public:
|
||||
list_service_level_attachments_statement(sstring role_name);
|
||||
list_service_level_attachments_statement();
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "cql3/statements/list_service_level_statement.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
@@ -19,6 +18,20 @@ namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
|
||||
shared_ptr<const cql3::metadata> list_service_level_statement::get_result_metadata() const {
|
||||
std::vector<lw_shared_ptr<column_specification>> metadata{
|
||||
make_column_spec("QOS", "service_levels", "service_level", utf8_type),
|
||||
make_column_spec("QOS", "service_levels", "timeout", duration_type),
|
||||
make_column_spec("QOS", "service_levels", "workload_type", utf8_type),
|
||||
make_column_spec("QOS", "service_levels", "shares", int32_type),
|
||||
};
|
||||
if (_describe_all) {
|
||||
metadata.push_back(make_column_spec("QOS", "service_levels", "percentage of all service level shares", utf8_type));
|
||||
}
|
||||
|
||||
return ::make_shared<cql3::metadata>(std::move(metadata));
|
||||
}
|
||||
|
||||
list_service_level_statement::list_service_level_statement(sstring service_level, bool describe_all) :
|
||||
_service_level(service_level), _describe_all(describe_all) {
|
||||
}
|
||||
@@ -38,23 +51,7 @@ list_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
|
||||
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
"QOS",
|
||||
"service_levels",
|
||||
::make_shared<column_identifier>(std::move(name), true),
|
||||
type);
|
||||
};
|
||||
|
||||
std::vector<lw_shared_ptr<column_specification>> metadata({make_column("service_level", utf8_type),
|
||||
make_column("timeout", duration_type),
|
||||
make_column("workload_type", utf8_type),
|
||||
make_column("shares", int32_type),
|
||||
});
|
||||
if (_describe_all) {
|
||||
metadata.push_back(make_column("percentage of all service level shares", utf8_type));
|
||||
}
|
||||
auto metadata = ::make_shared<cql3::metadata>(*get_result_metadata());
|
||||
|
||||
return make_ready_future().then([this, &state] () {
|
||||
if (_describe_all) {
|
||||
|
||||
@@ -21,6 +21,7 @@ class list_service_level_statement final : public service_level_statement {
|
||||
public:
|
||||
list_service_level_statement(sstring service_level, bool describe_all);
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
@@ -12,10 +12,17 @@
|
||||
#include "list_users_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
shared_ptr<const cql3::metadata> cql3::statements::list_users_statement::get_result_metadata() const {
|
||||
return ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>>{
|
||||
cql3::make_column_spec(db::system_keyspace::NAME, "users", "name", utf8_type),
|
||||
cql3::make_column_spec(db::system_keyspace::NAME, "users", "super", boolean_type)});
|
||||
}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement> cql3::statements::list_users_statement::prepare(
|
||||
data_dictionary::database db, cql_stats& stats) {
|
||||
return std::make_unique<prepared_statement>(audit_info(), ::make_shared<list_users_statement>(*this));
|
||||
@@ -28,20 +35,7 @@ future<> cql3::statements::list_users_statement::check_access(query_processor& q
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
static const sstring virtual_table_name("users");
|
||||
|
||||
const auto make_column_spec = [auth_ks = auth::get_auth_ks_name(qp)](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
|
||||
return make_lw_shared<column_specification>(
|
||||
auth_ks,
|
||||
virtual_table_name,
|
||||
::make_shared<column_identifier>(name, true),
|
||||
ty);
|
||||
};
|
||||
|
||||
auto metadata = ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<column_specification>>{
|
||||
make_column_spec("name", utf8_type),
|
||||
make_column_spec("super", boolean_type)});
|
||||
auto metadata = ::make_shared<cql3::metadata>(*get_result_metadata());
|
||||
|
||||
auto make_results = [metadata = std::move(metadata)](const auth::service& as, std::unordered_set<sstring>&& roles) mutable {
|
||||
using cql_transport::messages::result_message;
|
||||
|
||||
@@ -23,6 +23,8 @@ public:
|
||||
|
||||
std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor&
|
||||
, service::query_state&
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "cql3/statements/list_roles_statement.hh"
|
||||
#include "cql3/statements/revoke_role_statement.hh"
|
||||
#include "cql3/statements/request_validations.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
@@ -347,6 +348,17 @@ std::unique_ptr<prepared_statement> list_roles_statement::prepare(
|
||||
return std::make_unique<prepared_statement>(audit_info(), ::make_shared<list_roles_statement>(*this));
|
||||
}
|
||||
|
||||
shared_ptr<const cql3::metadata> list_roles_statement::get_result_metadata() const {
|
||||
static const thread_local auto custom_options_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
|
||||
|
||||
return ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<column_specification>>{
|
||||
make_column_spec(db::system_keyspace::NAME, "roles", "role", utf8_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "roles", "super", boolean_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "roles", "login", boolean_type),
|
||||
make_column_spec(db::system_keyspace::NAME, "roles", "options", custom_options_type)});
|
||||
}
|
||||
|
||||
future<> list_roles_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
state.ensure_not_anonymous();
|
||||
|
||||
@@ -376,24 +388,8 @@ future<> list_roles_statement::check_access(query_processor& qp, const service::
|
||||
|
||||
future<result_message_ptr>
|
||||
list_roles_statement::execute(query_processor& qp, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
|
||||
static const sstring virtual_table_name("roles");
|
||||
|
||||
const auto make_column_spec = [auth_ks = auth::get_auth_ks_name(qp)](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
|
||||
return make_lw_shared<column_specification>(
|
||||
auth_ks,
|
||||
virtual_table_name,
|
||||
::make_shared<column_identifier>(name, true),
|
||||
ty);
|
||||
};
|
||||
|
||||
static const thread_local auto custom_options_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
|
||||
|
||||
auto metadata = ::make_shared<cql3::metadata>(
|
||||
std::vector<lw_shared_ptr<column_specification>>{
|
||||
make_column_spec("role", utf8_type),
|
||||
make_column_spec("super", boolean_type),
|
||||
make_column_spec("login", boolean_type),
|
||||
make_column_spec("options", custom_options_type)});
|
||||
auto metadata = ::make_shared<cql3::metadata>(*get_result_metadata());
|
||||
|
||||
auto make_results = [metadata = std::move(metadata)](
|
||||
auth::role_manager& rm,
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
|
||||
#include "transport/request.hh"
|
||||
#include "transport/response.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "utils/memory_data_sink.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
@@ -164,3 +165,38 @@ SEASTAR_THREAD_TEST_CASE(test_response_request_reader) {
|
||||
BOOST_CHECK_EQUAL(req.read_short().value(), 1);
|
||||
BOOST_CHECK_EQUAL(req.read_string().value(), "zed");
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_response_metadata_changed_for_empty_request_metadata_id) {
|
||||
auto col = make_lw_shared<cql3::column_specification>(
|
||||
"ks", "cf", ::make_shared<cql3::column_identifier>("v", true), utf8_type);
|
||||
cql3::metadata m({col});
|
||||
auto calculated_metadata_id = m.calculate_metadata_id();
|
||||
auto expected_metadata_id = bytes(calculated_metadata_id._metadata_id);
|
||||
|
||||
auto res = cql_transport::response(0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr());
|
||||
res.write(m, cql_transport::cql_metadata_id_wrapper(
|
||||
cql3::cql_metadata_id_type(bytes{}),
|
||||
cql3::cql_metadata_id_type(bytes(expected_metadata_id))), true);
|
||||
|
||||
memory_data_sink_buffers buffers;
|
||||
{
|
||||
output_stream<char> out(data_sink(std::make_unique<memory_data_sink>(buffers)));
|
||||
res.write_message(out, 4, cql_transport::cql_compression::none, deleter()).get();
|
||||
}
|
||||
auto total_length = buffers.size();
|
||||
auto fbufs = fragmented_temporary_buffer(buffers.buffers() | std::views::as_rvalue | std::ranges::to<std::vector>(), total_length);
|
||||
|
||||
bytes_ostream linearization_buffer;
|
||||
auto req = cql_transport::request_reader(fbufs.get_istream(), linearization_buffer);
|
||||
BOOST_REQUIRE(req.read_byte());
|
||||
BOOST_REQUIRE(req.read_byte());
|
||||
BOOST_REQUIRE(req.read_short());
|
||||
BOOST_REQUIRE(req.read_byte());
|
||||
BOOST_REQUIRE(req.read_int());
|
||||
|
||||
auto flags = req.read_int().value();
|
||||
BOOST_CHECK(flags & cql3::metadata::flag_enum_set::mask_for<cql3::metadata::flag::METADATA_CHANGED>());
|
||||
BOOST_CHECK(!(flags & cql3::metadata::flag_enum_set::mask_for<cql3::metadata::flag::NO_METADATA>()));
|
||||
BOOST_CHECK_EQUAL(req.read_int().value(), 1);
|
||||
BOOST_CHECK_EQUAL(req.read_short_bytes().value(), expected_metadata_id);
|
||||
}
|
||||
|
||||
173
test/cluster/auth_cluster/test_prepared_metadata_id.py
Normal file
173
test/cluster/auth_cluster/test_prepared_metadata_id.py
Normal file
@@ -0,0 +1,173 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import ctypes
|
||||
import hashlib
|
||||
from collections.abc import Sequence
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from cassandra import ProtocolVersion
|
||||
from cassandra.application_info import ApplicationInfoBase
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.protocol import ResultMessage
|
||||
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import unique_name
|
||||
|
||||
|
||||
class _UseMetadataId(ApplicationInfoBase):
|
||||
def add_startup_options(self, options: dict) -> None:
|
||||
options["SCYLLA_USE_METADATA_ID"] = ""
|
||||
|
||||
|
||||
_SIZE_T_BYTES = ctypes.sizeof(ctypes.c_size_t)
|
||||
_UTF8_TYPE = "org.apache.cassandra.db.marshal.UTF8Type"
|
||||
_BOOLEAN_TYPE = "org.apache.cassandra.db.marshal.BooleanType"
|
||||
_DURATION_TYPE = "org.apache.cassandra.db.marshal.DurationType"
|
||||
_INT32_TYPE = "org.apache.cassandra.db.marshal.Int32Type"
|
||||
_TEXT_MAP_TYPE = f"org.apache.cassandra.db.marshal.MapType({_UTF8_TYPE},{_UTF8_TYPE})"
|
||||
|
||||
# Keep these schemas in sync with the corresponding get_result_metadata()
|
||||
# implementations in cql3/statements/*.cc.
|
||||
_LIST_ROLES_SCHEMA = (("role", _UTF8_TYPE),
|
||||
("super", _BOOLEAN_TYPE),
|
||||
("login", _BOOLEAN_TYPE),
|
||||
("options", _TEXT_MAP_TYPE))
|
||||
|
||||
# cql3/statements/list_users_statement.cc
|
||||
_LIST_USERS_SCHEMA = (("name", _UTF8_TYPE),
|
||||
("super", _BOOLEAN_TYPE))
|
||||
|
||||
# cql3/statements/list_permissions_statement.cc
|
||||
_LIST_PERMISSIONS_SCHEMA = (("role", _UTF8_TYPE),
|
||||
("username", _UTF8_TYPE),
|
||||
("resource", _UTF8_TYPE),
|
||||
("permission", _UTF8_TYPE))
|
||||
|
||||
# cql3/statements/list_service_level_statement.cc for LIST SERVICE LEVEL <name>
|
||||
_LIST_SERVICE_LEVEL_SCHEMA = (("service_level", _UTF8_TYPE),
|
||||
("timeout", _DURATION_TYPE),
|
||||
("workload_type", _UTF8_TYPE),
|
||||
("shares", _INT32_TYPE))
|
||||
|
||||
# cql3/statements/list_service_level_statement.cc for LIST ALL SERVICE LEVELS
|
||||
_LIST_ALL_SERVICE_LEVELS_SCHEMA = (*_LIST_SERVICE_LEVEL_SCHEMA,
|
||||
("percentage of all service level shares", _UTF8_TYPE))
|
||||
|
||||
# cql3/statements/list_service_level_attachments_statement.cc
|
||||
_LIST_ATTACHED_SERVICE_LEVEL_SCHEMA = (("role", _UTF8_TYPE),
|
||||
("service_level", _UTF8_TYPE))
|
||||
|
||||
# cql3/statements/list_effective_service_level_statement.cc
|
||||
_LIST_EFFECTIVE_SERVICE_LEVEL_SCHEMA = (("service_level_option", _UTF8_TYPE),
|
||||
("effective_service_level", _UTF8_TYPE),
|
||||
("value", _UTF8_TYPE))
|
||||
|
||||
|
||||
def _feed_string_for_metadata_id(hasher, value: str) -> None:
|
||||
encoded = value.encode("utf-8")
|
||||
hasher.update(
|
||||
len(encoded).to_bytes(_SIZE_T_BYTES, byteorder="little", signed=False)
|
||||
)
|
||||
hasher.update(encoded)
|
||||
|
||||
|
||||
def _calculate_metadata_id(columns: Sequence[tuple[str, str]]) -> bytes:
|
||||
# Match cql3::metadata::calculate_metadata_id() and appending_hash<std::string>.
|
||||
hasher = hashlib.sha256()
|
||||
for column_name, type_name in columns:
|
||||
_feed_string_for_metadata_id(hasher, column_name)
|
||||
_feed_string_for_metadata_id(hasher, type_name)
|
||||
return hasher.digest()[:16]
|
||||
|
||||
|
||||
_LIST_METADATA_CASES: list[tuple[str, bytes]] = [
|
||||
("LIST ROLES OF {role}",
|
||||
_calculate_metadata_id(_LIST_ROLES_SCHEMA)),
|
||||
("LIST USERS",
|
||||
_calculate_metadata_id(_LIST_USERS_SCHEMA)),
|
||||
("LIST ALL PERMISSIONS",
|
||||
_calculate_metadata_id(_LIST_PERMISSIONS_SCHEMA)),
|
||||
("LIST SERVICE LEVEL {service_level}",
|
||||
_calculate_metadata_id(_LIST_SERVICE_LEVEL_SCHEMA)),
|
||||
("LIST ALL SERVICE LEVELS",
|
||||
_calculate_metadata_id(_LIST_ALL_SERVICE_LEVELS_SCHEMA)),
|
||||
("LIST ATTACHED SERVICE LEVEL OF {role}",
|
||||
_calculate_metadata_id(_LIST_ATTACHED_SERVICE_LEVEL_SCHEMA)),
|
||||
("LIST EFFECTIVE SERVICE LEVEL OF {role}",
|
||||
_calculate_metadata_id(_LIST_EFFECTIVE_SERVICE_LEVEL_SCHEMA))
|
||||
]
|
||||
|
||||
|
||||
def _prepare_and_execute(host: str, query: str) -> tuple[bytes, bool, int]:
|
||||
captured = {"metadata_id": None, "metadata_changed": False}
|
||||
original_recv = ResultMessage.recv_results_metadata
|
||||
|
||||
def _capturing_recv(self: ResultMessage, f, user_type_map) -> None:
|
||||
original_recv(self, f, user_type_map)
|
||||
metadata_id = getattr(self, "result_metadata_id", None)
|
||||
if metadata_id is not None:
|
||||
captured["metadata_id"] = metadata_id
|
||||
captured["metadata_changed"] = True
|
||||
|
||||
with mock.patch.object(
|
||||
ProtocolVersion, "uses_prepared_metadata", staticmethod(lambda _: True)
|
||||
):
|
||||
cluster = Cluster(
|
||||
contact_points=[host],
|
||||
port=9042,
|
||||
protocol_version=4,
|
||||
auth_provider=PlainTextAuthProvider("cassandra", "cassandra"),
|
||||
application_info=_UseMetadataId(),
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([host]))
|
||||
session = cluster.connect()
|
||||
try:
|
||||
prepared = session.prepare(query)
|
||||
prepared_metadata_id = prepared.result_metadata_id
|
||||
assert prepared_metadata_id is not None
|
||||
with mock.patch.object(ResultMessage, "recv_results_metadata", _capturing_recv):
|
||||
rows = list(session.execute(prepared))
|
||||
return prepared_metadata_id, captured["metadata_changed"], len(rows)
|
||||
finally:
|
||||
session.shutdown()
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepared_list_metadata_ids(manager: ManagerClient) -> None:
|
||||
servers = await manager.running_servers()
|
||||
if servers:
|
||||
server = servers[0]
|
||||
else:
|
||||
server = await manager.server_add(config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
role = "r" + unique_name()
|
||||
service_level = "sl" + unique_name()
|
||||
|
||||
try:
|
||||
await cql.run_async(f"CREATE ROLE {role} WITH PASSWORD = '{role}' AND LOGIN = true")
|
||||
await cql.run_async(f"GRANT SELECT ON ALL KEYSPACES TO {role}")
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {service_level} WITH TIMEOUT = 10s AND WORKLOAD_TYPE = 'batch' AND SHARES = 100")
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL {service_level} TO {role}")
|
||||
|
||||
for query_template, expected_metadata_id in _LIST_METADATA_CASES:
|
||||
query = query_template.format(role=role, service_level=service_level)
|
||||
|
||||
# _prepare_and_execute() uses the synchronous Python driver, so run it in
|
||||
# a worker thread instead of blocking the asyncio-based test harness.
|
||||
prepared_metadata_id, metadata_changed, row_count = await asyncio.to_thread(_prepare_and_execute, server.ip_addr, query)
|
||||
assert row_count > 0, query
|
||||
assert prepared_metadata_id == expected_metadata_id, query
|
||||
assert not metadata_changed, query
|
||||
finally:
|
||||
await cql.run_async(f"DETACH SERVICE LEVEL FROM {role}")
|
||||
await cql.run_async(f"DROP SERVICE LEVEL IF EXISTS {service_level}")
|
||||
await cql.run_async(f"DROP ROLE IF EXISTS {role}")
|
||||
@@ -2240,7 +2240,6 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
|
||||
flags.set<cql3::metadata::flag::NO_METADATA>();
|
||||
}
|
||||
|
||||
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
||||
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
||||
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||
|
||||
Reference in New Issue
Block a user