Compare commits

...

10 Commits

Author SHA1 Message Date
Piotr Sarna
fcb349b026 tests: add tests for per-role timeouts
The test cases verify that setting timeout parameters per-role
works and is validated.
2020-11-27 12:43:53 +01:00
Piotr Sarna
28c558af95 docs: add a paragaph about per-role parameters
This paragraph is also the first one in newly crated roles.md,
which should be later filled with more information about roles.
2020-11-27 12:43:53 +01:00
Piotr Sarna
83b47ae394 cql3: add validating per-role timeout options
Per-role timeout options are now validated when set:
 - they should represent a valid duration
 - the duration should have millisecond granularity,
   since the timeout clock does not support micro/nanoseconds.
2020-11-27 12:37:27 +01:00
Piotr Sarna
391d1f2b21 client_state: add updating per-role params
Per-role parameters (currently: read_timeout and write_timeout)
are now updated when a new connection is established.
Also, the changes are immediately propagated for the connection
which sent the CREATE ROLE/ALTER ROLE statement.
The other connections which have the changed role are currently
not immediately reloaded.
It can be done in the future if needed, but all sessions with
given roles should be tracked, or, alternatively, all sessions
should be iterated and changed.
2020-11-27 12:37:27 +01:00
Piotr Sarna
137a8a0161 auth: add options support to password authenticator
Custom options will be used later to provide per-role timeouts
and other useful parameters.
2020-11-27 12:37:17 +01:00
Piotr Sarna
c473cb4a2d treewide: remove timeout config from query options
Timeout config is now stored in each connection, so there's no point
in tracking it inside each query as well. This patch removes
timeout_config from query_options and follows by removing now
unnecessary parameters of many functions and constructors.
2020-11-26 17:56:55 +01:00
Piotr Sarna
98fac66361 cql3: use timeout config from client state instead of query options
... in batch statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
2cbeb3678f cql3: use timeout config from client state instead of query options
... in modification statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
d61e1fd174 cql3: use timeout config from client state instead of query options
... in select statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
f31ac0a8ca service: add timeout config to client state
Future patches will use this per-connection timeout config
to allow setting different timeouts for each session,
based on roles.
2020-11-26 17:55:14 +01:00
41 changed files with 427 additions and 212 deletions

View File

@@ -129,8 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name); auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name);
auto cl = auth::password_authenticator::consistency_for_user(username); auto cl = auth::password_authenticator::consistency_for_user(username);
auto& timeout = auth::internal_distributed_timeout_config(); return qp.execute_internal(query, cl, auth::internal_distributed_query_state(), {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
auto res = f.get0(); auto res = f.get0();
auto salted_hash = std::optional<sstring>(); auto salted_hash = std::optional<sstring>();
if (res->empty()) { if (res->empty()) {

View File

@@ -2845,7 +2845,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit)); auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
command->slice.options.set<query::partition_slice::option::allow_short_read>(); command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{}); auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state)); query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr); auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);

View File

@@ -108,7 +108,7 @@ future<> wait_for_schema_agreement(::service::migration_manager& mm, const datab
}); });
} }
const timeout_config& internal_distributed_timeout_config() noexcept { ::service::query_state& internal_distributed_query_state() noexcept {
#ifdef DEBUG #ifdef DEBUG
// Give the much slower debug tests more headroom for completing auth queries. // Give the much slower debug tests more headroom for completing auth queries.
static const auto t = 30s; static const auto t = 30s;
@@ -116,7 +116,9 @@ const timeout_config& internal_distributed_timeout_config() noexcept {
static const auto t = 5s; static const auto t = 5s;
#endif #endif
static const timeout_config tc{t, t, t, t, t, t, t}; static const timeout_config tc{t, t, t, t, t, t, t};
return tc; static thread_local ::service::client_state cs(::service::client_state::internal_tag{}, tc);
static thread_local ::service::query_state qs(cs, empty_service_permit());
return qs;
} }
} }

View File

@@ -35,6 +35,7 @@
#include "log.hh" #include "log.hh"
#include "seastarx.hh" #include "seastarx.hh"
#include "utils/exponential_backoff_retry.hh" #include "utils/exponential_backoff_retry.hh"
#include "service/query_state.hh"
using namespace std::chrono_literals; using namespace std::chrono_literals;
@@ -87,6 +88,6 @@ future<> wait_for_schema_agreement(::service::migration_manager&, const database
/// ///
/// Time-outs for internal, non-local CQL queries. /// Time-outs for internal, non-local CQL queries.
/// ///
const timeout_config& internal_distributed_timeout_config() noexcept; ::service::query_state& internal_distributed_query_state() noexcept;
} }

View File

@@ -103,7 +103,6 @@ future<bool> default_authorizer::any_granted() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{}, {},
true).then([this](::shared_ptr<cql3::untyped_result_set> results) { true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return !results->empty(); return !results->empty();
@@ -116,8 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
return do_with( return do_with(
row.get_as<sstring>("username"), row.get_as<sstring>("username"),
@@ -197,7 +195,6 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) { {*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) { if (results->empty()) {
return permissions::NONE; return permissions::NONE;
@@ -226,7 +223,7 @@ default_authorizer::modify(
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result(); {permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
}); });
} }
@@ -251,7 +248,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{}, {},
true).then([](::shared_ptr<cql3::untyped_result_set> results) { true).then([](::shared_ptr<cql3::untyped_result_set> results) {
std::vector<permission_details> all_details; std::vector<permission_details> all_details;
@@ -278,7 +275,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) { {sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
try { try {
std::rethrow_exception(ep); std::rethrow_exception(ep);
@@ -298,7 +295,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) { {resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
try { try {
auto res = f.get0(); auto res = f.get0();
@@ -315,7 +311,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception( {r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
[resource](auto ep) { [resource](auto ep) {
try { try {

View File

@@ -66,6 +66,7 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
// name of the hash column. // name of the hash column.
static constexpr std::string_view SALTED_HASH = "salted_hash"; static constexpr std::string_view SALTED_HASH = "salted_hash";
static constexpr std::string_view OPTIONS = "options";
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME; static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME); static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
@@ -114,7 +115,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) { internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
auto username = row.get_as<sstring>("username"); auto username = row.get_as<sstring>("username");
auto salted_hash = row.get_as<sstring>(SALTED_HASH); auto salted_hash = row.get_as<sstring>(SALTED_HASH);
@@ -122,7 +123,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.execute_internal( return _qp.execute_internal(
update_row_query(), update_row_query(),
consistency_for_user(username), consistency_for_user(username),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{std::move(salted_hash), username}).discard_result(); {std::move(salted_hash), username}).discard_result();
}).finally([results] {}); }).finally([results] {});
}).then([] { }).then([] {
@@ -139,7 +140,7 @@ future<> password_authenticator::create_default_if_missing() const {
return _qp.execute_internal( return _qp.execute_internal(
update_row_query(), update_row_query(),
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) { {passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) {
plogger.info("Created default superuser authentication record."); plogger.info("Created default superuser authentication record.");
}); });
@@ -203,11 +204,11 @@ bool password_authenticator::require_authentication() const {
} }
authentication_option_set password_authenticator::supported_options() const { authentication_option_set password_authenticator::supported_options() const {
return authentication_option_set{authentication_option::password}; return authentication_option_set{authentication_option::password, authentication_option::options};
} }
authentication_option_set password_authenticator::alterable_options() const { authentication_option_set password_authenticator::alterable_options() const {
return authentication_option_set{authentication_option::password}; return authentication_option_set{authentication_option::password, authentication_option::options};
} }
future<authenticated_user> password_authenticator::authenticate( future<authenticated_user> password_authenticator::authenticate(
@@ -236,7 +237,7 @@ future<authenticated_user> password_authenticator::authenticate(
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_user(username), consistency_for_user(username),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{username}, {username},
true); true);
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) { }).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
@@ -262,21 +263,46 @@ future<authenticated_user> password_authenticator::authenticate(
}); });
} }
future<> password_authenticator::maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const {
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
meta::roles_table::qualified_name,
OPTIONS,
meta::roles_table::role_col_name);
if (!options.options) {
return make_ready_future<>();
}
std::vector<std::pair<data_value, data_value>> entries;
for (const auto& entry : *options.options) {
entries.push_back({data_value(entry.first), data_value(entry.second)});
}
auto map_value = make_map_value(map_type_impl::get_instance(utf8_type, utf8_type, false), entries);
return _qp.execute_internal(
query,
consistency_for_user(role_name),
internal_distributed_query_state(),
{std::move(map_value), sstring(role_name)}).discard_result();
}
future<> password_authenticator::create(std::string_view role_name, const authentication_options& options) const { future<> password_authenticator::create(std::string_view role_name, const authentication_options& options) const {
if (!options.password) { if (!options.password) {
return make_ready_future<>(); return maybe_update_custom_options(role_name, options);
} }
return _qp.execute_internal( return _qp.execute_internal(
update_row_query(), update_row_query(),
consistency_for_user(role_name), consistency_for_user(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result(); {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
return maybe_update_custom_options(role_name, options);
});
} }
future<> password_authenticator::alter(std::string_view role_name, const authentication_options& options) const { future<> password_authenticator::alter(std::string_view role_name, const authentication_options& options) const {
if (!options.password) { if (!options.password) {
return make_ready_future<>(); return maybe_update_custom_options(role_name, options);
} }
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?", static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
@@ -287,8 +313,10 @@ future<> password_authenticator::alter(std::string_view role_name, const authent
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_user(role_name), consistency_for_user(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result(); {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
return maybe_update_custom_options(role_name, options);
}).discard_result();
} }
future<> password_authenticator::drop(std::string_view name) const { future<> password_authenticator::drop(std::string_view name) const {
@@ -299,12 +327,27 @@ future<> password_authenticator::drop(std::string_view name) const {
return _qp.execute_internal( return _qp.execute_internal(
query, consistency_for_user(name), query, consistency_for_user(name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(name)}).discard_result(); {sstring(name)}).discard_result();
} }
future<custom_options> password_authenticator::query_custom_options(std::string_view role_name) const { future<custom_options> password_authenticator::query_custom_options(std::string_view role_name) const {
return make_ready_future<custom_options>(); static const sstring query = format("SELECT {} FROM {} WHERE {} = ?",
OPTIONS,
meta::roles_table::qualified_name,
meta::roles_table::role_col_name);
return _qp.execute_internal(
query, consistency_for_user(role_name),
internal_distributed_query_state(),
{sstring(role_name)}).then([](::shared_ptr<cql3::untyped_result_set> rs) {
custom_options opts;
const auto& row = rs->one();
if (row.has(OPTIONS)) {
row.get_map_data<sstring, sstring>(OPTIONS, std::inserter(opts, opts.end()), utf8_type, utf8_type);
}
return opts;
});
} }
const resource_set& password_authenticator::protected_resources() const { const resource_set& password_authenticator::protected_resources() const {

View File

@@ -94,6 +94,8 @@ public:
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override; virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
private: private:
future<> maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const;
bool legacy_metadata_exists() const; bool legacy_metadata_exists() const;
future<> migrate_legacy_metadata() const; future<> migrate_legacy_metadata() const;

View File

@@ -43,7 +43,8 @@ std::string_view creation_query() {
" can_login boolean," " can_login boolean,"
" is_superuser boolean," " is_superuser boolean,"
" member_of set<text>," " member_of set<text>,"
" salted_hash text" " salted_hash text,"
" options frozen<map<text, text>>,"
")", ")",
qualified_name, qualified_name,
role_col_name); role_col_name);
@@ -68,14 +69,13 @@ future<bool> default_role_row_satisfies(
return qp.execute_internal( return qp.execute_internal(
query, query,
db::consistency_level::ONE, db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME}, {meta::DEFAULT_SUPERUSER_NAME},
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) { true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) { if (results->empty()) {
return qp.execute_internal( return qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{meta::DEFAULT_SUPERUSER_NAME}, {meta::DEFAULT_SUPERUSER_NAME},
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) { true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) { if (results->empty()) {
@@ -100,7 +100,7 @@ future<bool> any_nondefault_role_row_satisfies(
return qp.execute_internal( return qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([&p](::shared_ptr<cql3::untyped_result_set> results) { internal_distributed_query_state()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) { if (results->empty()) {
return false; return false;
} }

View File

@@ -210,7 +210,6 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal( return _qp.execute_internal(
default_user_query, default_user_query,
db::consistency_level::ONE, db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME}, {meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) { true).then([this](auto results) {
if (!results->empty()) { if (!results->empty()) {
@@ -220,7 +219,6 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal( return _qp.execute_internal(
default_user_query, default_user_query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME}, {meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) { true).then([this](auto results) {
if (!results->empty()) { if (!results->empty()) {
@@ -229,8 +227,7 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal( return _qp.execute_internal(
all_users_query, all_users_query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM).then([](auto results) {
infinite_timeout_config).then([](auto results) {
return make_ready_future<bool>(!results->empty()); return make_ready_future<bool>(!results->empty());
}); });
}); });

View File

@@ -86,7 +86,7 @@ static future<std::optional<record>> find_record(cql3::query_processor& qp, std:
return qp.execute_internal( return qp.execute_internal(
query, query,
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name)}, {sstring(role_name)},
true).then([](::shared_ptr<cql3::untyped_result_set> results) { true).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) { if (results->empty()) {
@@ -165,7 +165,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config(), internal_distributed_query_state(),
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) { {meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME); log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
return make_ready_future<>(); return make_ready_future<>();
@@ -192,7 +192,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) { internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
role_config config; role_config config;
config.is_superuser = row.get_or<bool>("super", false); config.is_superuser = row.get_or<bool>("super", false);
@@ -253,7 +253,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name), c.is_superuser, c.can_login}, {sstring(role_name), c.is_superuser, c.can_login},
true).discard_result(); true).discard_result();
} }
@@ -296,7 +296,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
build_column_assignments(u), build_column_assignments(u),
meta::roles_table::role_col_name), meta::roles_table::role_col_name),
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name)}).discard_result(); {sstring(role_name)}).discard_result();
}); });
} }
@@ -315,7 +315,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) { {sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
return parallel_for_each( return parallel_for_each(
members->begin(), members->begin(),
@@ -354,7 +354,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name)}).discard_result(); {sstring(role_name)}).discard_result();
}; };
@@ -381,7 +381,7 @@ standard_role_manager::modify_membership(
return _qp.execute_internal( return _qp.execute_internal(
query, query,
consistency_for_role(grantee_name), consistency_for_role(grantee_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result(); {role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
}; };
@@ -392,7 +392,7 @@ standard_role_manager::modify_membership(
format("INSERT INTO {} (role, member) VALUES (?, ?)", format("INSERT INTO {} (role, member) VALUES (?, ?)",
meta::role_members_table::qualified_name), meta::role_members_table::qualified_name),
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name), sstring(grantee_name)}).discard_result(); {sstring(role_name), sstring(grantee_name)}).discard_result();
case membership_change::remove: case membership_change::remove:
@@ -400,7 +400,7 @@ standard_role_manager::modify_membership(
format("DELETE FROM {} WHERE role = ? AND member = ?", format("DELETE FROM {} WHERE role = ? AND member = ?",
meta::role_members_table::qualified_name), meta::role_members_table::qualified_name),
consistency_for_role(role_name), consistency_for_role(role_name),
internal_distributed_timeout_config(), internal_distributed_query_state(),
{sstring(role_name), sstring(grantee_name)}).discard_result(); {sstring(role_name), sstring(grantee_name)}).discard_result();
} }
@@ -503,7 +503,7 @@ future<role_set> standard_role_manager::query_all() const {
return _qp.execute_internal( return _qp.execute_internal(
query, query,
db::consistency_level::QUORUM, db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([](::shared_ptr<cql3::untyped_result_set> results) { internal_distributed_query_state()).then([](::shared_ptr<cql3::untyped_result_set> results) {
role_set roles; role_set roles;
std::transform( std::transform(

View File

@@ -50,12 +50,11 @@ const cql_config default_cql_config;
thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp}; thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp};
thread_local query_options query_options::DEFAULT{default_cql_config, thread_local query_options query_options::DEFAULT{default_cql_config,
db::consistency_level::ONE, infinite_timeout_config, std::nullopt, db::consistency_level::ONE, std::nullopt,
std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()}; std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()};
query_options::query_options(const cql_config& cfg, query_options::query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values, std::vector<cql3::raw_value> values,
std::vector<cql3::raw_value_view> value_views, std::vector<cql3::raw_value_view> value_views,
@@ -64,7 +63,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf) cql_serialization_format sf)
: _cql_config(cfg) : _cql_config(cfg)
, _consistency(consistency) , _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names)) , _names(std::move(names))
, _values(std::move(values)) , _values(std::move(values))
, _value_views(value_views) , _value_views(value_views)
@@ -76,7 +74,6 @@ query_options::query_options(const cql_config& cfg,
query_options::query_options(const cql_config& cfg, query_options::query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values, std::vector<cql3::raw_value> values,
bool skip_metadata, bool skip_metadata,
@@ -84,7 +81,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf) cql_serialization_format sf)
: _cql_config(cfg) : _cql_config(cfg)
, _consistency(consistency) , _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names)) , _names(std::move(names))
, _values(std::move(values)) , _values(std::move(values))
, _value_views() , _value_views()
@@ -97,7 +93,6 @@ query_options::query_options(const cql_config& cfg,
query_options::query_options(const cql_config& cfg, query_options::query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value_view> value_views, std::vector<cql3::raw_value_view> value_views,
bool skip_metadata, bool skip_metadata,
@@ -105,7 +100,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf) cql_serialization_format sf)
: _cql_config(cfg) : _cql_config(cfg)
, _consistency(consistency) , _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names)) , _names(std::move(names))
, _values() , _values()
, _value_views(std::move(value_views)) , _value_views(std::move(value_views))
@@ -115,12 +109,11 @@ query_options::query_options(const cql_config& cfg,
{ {
} }
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values, query_options::query_options(db::consistency_level cl, std::vector<cql3::raw_value> values,
specific_options options) specific_options options)
: query_options( : query_options(
default_cql_config, default_cql_config,
cl, cl,
timeout_config,
{}, {},
std::move(values), std::move(values),
false, false,
@@ -133,7 +126,6 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state) query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state)
: query_options(qo->_cql_config, : query_options(qo->_cql_config,
qo->_consistency, qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names), std::move(qo->_names),
std::move(qo->_values), std::move(qo->_values),
std::move(qo->_value_views), std::move(qo->_value_views),
@@ -146,7 +138,6 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size) query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
: query_options(qo->_cql_config, : query_options(qo->_cql_config,
qo->_consistency, qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names), std::move(qo->_names),
std::move(qo->_values), std::move(qo->_values),
std::move(qo->_value_views), std::move(qo->_value_views),
@@ -158,7 +149,7 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
query_options::query_options(std::vector<cql3::raw_value> values) query_options::query_options(std::vector<cql3::raw_value> values)
: query_options( : query_options(
db::consistency_level::ONE, infinite_timeout_config, std::move(values)) db::consistency_level::ONE, std::move(values))
{} {}
void query_options::prepare(const std::vector<lw_shared_ptr<column_specification>>& specs) void query_options::prepare(const std::vector<lw_shared_ptr<column_specification>>& specs)

View File

@@ -51,7 +51,6 @@
#include "cql3/column_identifier.hh" #include "cql3/column_identifier.hh"
#include "cql3/values.hh" #include "cql3/values.hh"
#include "cql_serialization_format.hh" #include "cql_serialization_format.hh"
#include "timeout_config.hh"
namespace cql3 { namespace cql3 {
@@ -75,7 +74,6 @@ public:
private: private:
const cql_config& _cql_config; const cql_config& _cql_config;
const db::consistency_level _consistency; const db::consistency_level _consistency;
const timeout_config& _timeout_config;
const std::optional<std::vector<sstring_view>> _names; const std::optional<std::vector<sstring_view>> _names;
std::vector<cql3::raw_value> _values; std::vector<cql3::raw_value> _values;
std::vector<cql3::raw_value_view> _value_views; std::vector<cql3::raw_value_view> _value_views;
@@ -109,7 +107,6 @@ public:
explicit query_options(const cql_config& cfg, explicit query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values, std::vector<cql3::raw_value> values,
bool skip_metadata, bool skip_metadata,
@@ -117,7 +114,6 @@ public:
cql_serialization_format sf); cql_serialization_format sf);
explicit query_options(const cql_config& cfg, explicit query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values, std::vector<cql3::raw_value> values,
std::vector<cql3::raw_value_view> value_views, std::vector<cql3::raw_value_view> value_views,
@@ -126,7 +122,6 @@ public:
cql_serialization_format sf); cql_serialization_format sf);
explicit query_options(const cql_config& cfg, explicit query_options(const cql_config& cfg,
db::consistency_level consistency, db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names, std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value_view> value_views, std::vector<cql3::raw_value_view> value_views,
bool skip_metadata, bool skip_metadata,
@@ -158,13 +153,10 @@ public:
// forInternalUse // forInternalUse
explicit query_options(std::vector<cql3::raw_value> values); explicit query_options(std::vector<cql3::raw_value> values);
explicit query_options(db::consistency_level, const timeout_config& timeouts, explicit query_options(db::consistency_level, std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state); explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state);
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size); explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
const timeout_config& get_timeout_config() const { return _timeout_config; }
db::consistency_level get_consistency() const { db::consistency_level get_consistency() const {
return _consistency; return _consistency;
} }
@@ -258,7 +250,7 @@ query_options::query_options(query_options&& o, std::vector<OneMutationDataRange
std::vector<query_options> tmp; std::vector<query_options> tmp;
tmp.reserve(values_ranges.size()); tmp.reserve(values_ranges.size());
std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) { std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) {
return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); return query_options(_cql_config, _consistency, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
}); });
_batch_options = std::move(tmp); _batch_options = std::move(tmp);
} }

View File

@@ -619,7 +619,6 @@ query_options query_processor::make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p, const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>& values, const std::initializer_list<data_value>& values,
db::consistency_level cl, db::consistency_level cl,
const timeout_config& timeout_config,
int32_t page_size) const { int32_t page_size) const {
if (p->bound_names.size() != values.size()) { if (p->bound_names.size() != values.size()) {
throw std::invalid_argument( throw std::invalid_argument(
@@ -643,11 +642,10 @@ query_options query_processor::make_internal_options(
api::timestamp_type ts = api::missing_timestamp; api::timestamp_type ts = api::missing_timestamp;
return query_options( return query_options(
cl, cl,
timeout_config,
bound_values, bound_values,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}); cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
} }
return query_options(cl, timeout_config, bound_values); return query_options(cl, bound_values);
} }
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) { statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
@@ -671,7 +669,7 @@ struct internal_query_state {
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string, ::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
const std::initializer_list<data_value>& values, int32_t page_size) { const std::initializer_list<data_value>& values, int32_t page_size) {
auto p = prepare_internal(query_string); auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config, page_size); auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>( ::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
internal_query_state{ internal_query_state{
query_string, query_string,
@@ -789,7 +787,16 @@ future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal( query_processor::execute_internal(
const sstring& query_string, const sstring& query_string,
db::consistency_level cl, db::consistency_level cl,
const timeout_config& timeout_config, const std::initializer_list<data_value>& values,
bool cache) {
return execute_internal(query_string, cl, *_internal_state, values, cache);
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
const sstring& query_string,
db::consistency_level cl,
service::query_state& query_state,
const std::initializer_list<data_value>& values, const std::initializer_list<data_value>& values,
bool cache) { bool cache) {
@@ -797,13 +804,13 @@ query_processor::execute_internal(
log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values)); log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values));
} }
if (cache) { if (cache) {
return execute_with_params(prepare_internal(query_string), cl, timeout_config, values); return execute_with_params(prepare_internal(query_string), cl, query_state, values);
} else { } else {
auto p = parse_statement(query_string)->prepare(_db, _cql_stats); auto p = parse_statement(query_string)->prepare(_db, _cql_stats);
p->statement->raw_cql_statement = query_string; p->statement->raw_cql_statement = query_string;
p->statement->validate(_proxy, *_internal_state); p->statement->validate(_proxy, *_internal_state);
auto checked_weak_ptr = p->checked_weak_from_this(); auto checked_weak_ptr = p->checked_weak_from_this();
return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {}); return execute_with_params(std::move(checked_weak_ptr), cl, query_state, values).finally([p = std::move(p)] {});
} }
} }
@@ -811,11 +818,11 @@ future<::shared_ptr<untyped_result_set>>
query_processor::execute_with_params( query_processor::execute_with_params(
statements::prepared_statement::checked_weak_ptr p, statements::prepared_statement::checked_weak_ptr p,
db::consistency_level cl, db::consistency_level cl,
const timeout_config& timeout_config, service::query_state& query_state,
const std::initializer_list<data_value>& values) { const std::initializer_list<data_value>& values) {
auto opts = make_internal_options(p, values, cl, timeout_config); auto opts = make_internal_options(p, values, cl);
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) { return do_with(std::move(opts), [this, &query_state, p = std::move(p)](auto & opts) {
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) { return p->statement->execute(_proxy, query_state, opts).then([](auto msg) {
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg)); return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
}); });
}); });

View File

@@ -215,8 +215,7 @@ public:
// creating namespaces, etc) is explicitly forbidden via this interface. // creating namespaces, etc) is explicitly forbidden via this interface.
future<::shared_ptr<untyped_result_set>> future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) { execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
return execute_internal(query_string, db::consistency_level::ONE, return execute_internal(query_string, db::consistency_level::ONE, values, true);
infinite_timeout_config, values, true);
} }
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query); statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
@@ -305,14 +304,19 @@ public:
future<::shared_ptr<untyped_result_set>> execute_internal( future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string, const sstring& query_string,
db::consistency_level, db::consistency_level,
const timeout_config& timeout_config, const std::initializer_list<data_value>& = { },
bool cache = false);
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
service::query_state& query_state,
const std::initializer_list<data_value>& = { }, const std::initializer_list<data_value>& = { },
bool cache = false); bool cache = false);
future<::shared_ptr<untyped_result_set>> execute_with_params( future<::shared_ptr<untyped_result_set>> execute_with_params(
statements::prepared_statement::checked_weak_ptr p, statements::prepared_statement::checked_weak_ptr p,
db::consistency_level, db::consistency_level,
const timeout_config& timeout_config, service::query_state& query_state,
const std::initializer_list<data_value>& = { }); const std::initializer_list<data_value>& = { });
future<::shared_ptr<cql_transport::messages::result_message::prepared>> future<::shared_ptr<cql_transport::messages::result_message::prepared>>
@@ -341,7 +345,6 @@ private:
const statements::prepared_statement::checked_weak_ptr& p, const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>&, const std::initializer_list<data_value>&,
db::consistency_level, db::consistency_level,
const timeout_config& timeout_config,
int32_t page_size = -1) const; int32_t page_size = -1) const;
future<::shared_ptr<cql_transport::messages::result_message>> future<::shared_ptr<cql_transport::messages::result_message>>

View File

@@ -286,7 +286,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
++_stats.batches; ++_stats.batches;
_stats.statements_in_batches += _statements.size(); _stats.statements_in_batches += _statements.size();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + query_state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return get_mutations(storage, options, timeout, local, now, query_state).then([this, &storage, &options, timeout, tr_state = query_state.get_trace_state(), return get_mutations(storage, options, timeout, local, now, query_state).then([this, &storage, &options, timeout, tr_state = query_state.get_trace_state(),
permit = query_state.get_permit()] (std::vector<mutation> ms) mutable { permit = query_state.get_permit()] (std::vector<mutation> ms) mutable {
return execute_without_conditions(storage, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit)); return execute_without_conditions(storage, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
@@ -343,7 +343,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
schema_ptr schema; schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now(); db::timeout_clock::time_point now = db::timeout_clock::now();
const timeout_config& cfg = options.get_timeout_config(); const timeout_config& cfg = qs.get_client_state().get_timeout_config();
auto batch_timeout = now + cfg.write_timeout; // Statement timeout. auto batch_timeout = now + cfg.write_timeout; // Statement timeout.
auto cas_timeout = now + cfg.cas_timeout; // Ballot contention timeout. auto cas_timeout = now + cfg.cas_timeout; // Ballot contention timeout.
auto read_timeout = now + cfg.read_timeout; // Query timeout. auto read_timeout = now + cfg.read_timeout; // Query timeout.

View File

@@ -286,7 +286,7 @@ modification_statement::do_execute(service::storage_proxy& proxy, service::query
future<> future<>
modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) const { modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) const {
auto cl = options.get_consistency(); auto cl = options.get_consistency();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + qs.get_client_state().get_timeout_config().*get_timeout_config_selector();
return get_mutations(proxy, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &proxy, &qs] (auto mutations) { return get_mutations(proxy, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &proxy, &qs] (auto mutations) {
if (mutations.empty()) { if (mutations.empty()) {
return now(); return now();
@@ -302,7 +302,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
auto cl_for_learn = options.get_consistency(); auto cl_for_learn = options.get_consistency();
auto cl_for_paxos = options.check_serial_consistency(); auto cl_for_paxos = options.check_serial_consistency();
db::timeout_clock::time_point now = db::timeout_clock::now(); db::timeout_clock::time_point now = db::timeout_clock::now();
const timeout_config& cfg = options.get_timeout_config(); const timeout_config& cfg = qs.get_client_state().get_timeout_config();
auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout. auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout.
auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention. auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention.

View File

@@ -59,6 +59,7 @@
#include "gms/feature_service.hh" #include "gms/feature_service.hh"
#include "transport/messages/result_message.hh" #include "transport/messages/result_message.hh"
#include "unimplemented.hh" #include "unimplemented.hh"
#include "concrete_types.hh"
namespace cql3 { namespace cql3 {
@@ -105,6 +106,30 @@ future<> create_role_statement::grant_permissions_to_creator(const service::clie
}); });
} }
static void validate_timeout_options(const auth::authentication_options& auth_options) {
if (!auth_options.options) {
return;
}
const auto& options = *auth_options.options;
auto check_duration = [&] (const sstring& repr) {
data_value v = duration_type->deserialize(duration_type->from_string(repr));
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
if (duration.months || duration.days) {
throw exceptions::invalid_request_exception("Timeout values cannot be longer than 24h");
}
if (duration.nanoseconds % 1'000'000 != 0) {
throw exceptions::invalid_request_exception("Timeout values must be expressed in millisecond granularity");
}
};
for (auto opt : {"read_timeout", "write_timeout"}) {
auto it = options.find(opt);
if (it != options.end()) {
check_duration(it->second);
}
}
}
void create_role_statement::validate(service::storage_proxy& p, const service::client_state&) const { void create_role_statement::validate(service::storage_proxy& p, const service::client_state&) const {
validate_cluster_support(p); validate_cluster_support(p);
} }
@@ -137,9 +162,12 @@ create_role_statement::execute(service::storage_proxy&,
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) { [this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
const auto& cs = state.get_client_state(); const auto& cs = state.get_client_state();
auto& as = *cs.get_auth_service(); auto& as = *cs.get_auth_service();
validate_timeout_options(authen_options);
return auth::create_role(as, _role, config, authen_options).then([this, &cs] { return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
return grant_permissions_to_creator(cs); return grant_permissions_to_creator(cs);
}).then([&state] () mutable {
return state.get_client_state().update_per_role_params();
}).then([] { }).then([] {
return void_result_message(); return void_result_message();
}).handle_exception_type([this](const auth::role_already_exists& e) { }).handle_exception_type([this](const auth::role_already_exists& e) {
@@ -224,8 +252,9 @@ alter_role_statement::execute(service::storage_proxy&, service::query_state& sta
extract_authentication_options(_options), extract_authentication_options(_options),
[this, &state](const auth::role_config_update& update, const auth::authentication_options& authen_options) { [this, &state](const auth::role_config_update& update, const auth::authentication_options& authen_options) {
auto& as = *state.get_client_state().get_auth_service(); auto& as = *state.get_client_state().get_auth_service();
return auth::alter_role(as, _role, update, authen_options).then([&state] () mutable {
return auth::alter_role(as, _role, update, authen_options).then([] { return state.get_client_state().update_per_role_params();
}).then([] {
return void_result_message(); return void_result_message();
}).handle_exception_type([](const auth::nonexistant_role& e) { }).handle_exception_type([](const auth::nonexistant_role& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what())); return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));

View File

@@ -366,7 +366,7 @@ select_statement::do_execute(service::storage_proxy& proxy,
} }
command->slice.options.set<query::partition_slice::option::allow_short_read>(); command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto timeout_duration = options.get_timeout_config().*get_timeout_config_selector(); auto timeout_duration = state.get_client_state().get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + timeout_duration; auto timeout = db::timeout_clock::now() + timeout_duration;
auto p = service::pager::query_pagers::pager(_schema, _selection, auto p = service::pager::query_pagers::pager(_schema, _selection,
state, options, command, std::move(key_ranges), restrictions_need_filtering ? _restrictions : nullptr); state, options, command, std::move(key_ranges), restrictions_need_filtering ? _restrictions : nullptr);
@@ -513,7 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
lw_shared_ptr<const service::pager::paging_state> paging_state) const { lw_shared_ptr<const service::pager::paging_state> paging_state) const {
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>; using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state)); auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
uint32_t queried_ranges_count = partition_ranges.size(); uint32_t queried_ranges_count = partition_ranges.size();
service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata_ptr(), _schema, std::move(partition_ranges)); service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata_ptr(), _schema, std::move(partition_ranges));
@@ -607,7 +607,7 @@ indexed_table_select_statement::do_execute_base_query(
lw_shared_ptr<const service::pager::paging_state> paging_state) const { lw_shared_ptr<const service::pager::paging_state> paging_state) const {
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>; using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state)); auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
struct base_query_state { struct base_query_state {
query::result_merger merger; query::result_merger merger;
@@ -689,7 +689,7 @@ select_statement::execute(service::storage_proxy& proxy,
// is specified we need to get "limit" rows from each partition since there // is specified we need to get "limit" rows from each partition since there
// is no way to tell which of these rows belong to the query result before // is no way to tell which of these rows belong to the query result before
// doing post-query ordering. // doing post-query ordering.
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
if (needs_post_query_ordering() && _limit) { if (needs_post_query_ordering() && _limit) {
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd, timeout](auto& prs) { return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd, timeout](auto& prs) {
assert(cmd->partition_limit == query::max_partitions); assert(cmd->partition_limit == query::max_partitions);
@@ -1250,7 +1250,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
{ {
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>; using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now(); auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, false).then( return read_posting_list(proxy, options, get_limit(options), state, now, timeout, false).then(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) { [this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows); auto rs = cql3::untyped_result_set(rows);
@@ -1291,7 +1291,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
{ {
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>; using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now(); auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, true).then( return read_posting_list(proxy, options, get_limit(options), state, now, timeout, true).then(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) { [this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {

View File

@@ -55,10 +55,18 @@ struct query_context {
// let the `storage_proxy` time out the query down the call chain // let the `storage_proxy` time out the query down the call chain
db::timeout_clock::duration::zero(); db::timeout_clock::duration::zero();
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) { struct timeout_context {
std::unique_ptr<service::client_state> client_state;
service::query_state query_state;
timeout_context(db::timeout_clock::duration d)
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
, query_state(*client_state, empty_service_permit())
{}
};
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
return _qp.local().execute_internal(req, return _qp.local().execute_internal(req,
cql3::query_options::DEFAULT.get_consistency(), cql3::query_options::DEFAULT.get_consistency(),
tcfg, tctx.query_state,
{ data_value(std::forward<Args>(args))... }, { data_value(std::forward<Args>(args))... },
true); true);
}); });

View File

@@ -3093,7 +3093,6 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
auto cm_fut = qctx->qp().execute_internal( auto cm_fut = qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY, GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version} {table_id, version}
); );
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) { return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
@@ -3136,7 +3135,6 @@ future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version ve
return qctx->qp().execute_internal( return qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY, GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version} {table_id, version}
).then([] (shared_ptr<cql3::untyped_result_set> results) { ).then([] (shared_ptr<cql3::untyped_result_set> results) {
return !results->empty(); return !results->empty();
@@ -3150,7 +3148,6 @@ future<> drop_column_mapping(utils::UUID table_id, table_schema_version version)
return qctx->qp().execute_internal( return qctx->qp().execute_internal(
DEL_COLUMN_MAPPING_QUERY, DEL_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE, db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}).discard_result(); {table_id, version}).discard_result();
} }

View File

@@ -155,17 +155,20 @@ future<> system_distributed_keyspace::stop() {
return make_ready_future<>(); return make_ready_future<>();
} }
static const timeout_config internal_distributed_timeout_config = [] { static service::query_state& internal_distributed_query_state() {
using namespace std::chrono_literals; using namespace std::chrono_literals;
const auto t = 10s; const auto t = 10s;
return timeout_config{ t, t, t, t, t, t, t }; static timeout_config tc{ t, t, t, t, t, t, t };
}(); static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
static thread_local service::query_state qs(cs, empty_service_permit());
return qs;
};
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const { future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
return _qp.execute_internal( return _qp.execute_internal(
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config, internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name) }, { std::move(ks_name), std::move(view_name) },
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) { false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
@@ -182,7 +185,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
return _qp.execute_internal( return _qp.execute_internal(
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS), format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config, internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" }, { std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
false).discard_result(); false).discard_result();
}); });
@@ -193,7 +196,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
return _qp.execute_internal( return _qp.execute_internal(
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS), format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config, internal_distributed_query_state(),
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) }, { "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
false).discard_result(); false).discard_result();
}); });
@@ -203,7 +206,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
return _qp.execute_internal( return _qp.execute_internal(
format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE, db::consistency_level::ONE,
internal_distributed_timeout_config, internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name) }, { std::move(ks_name), std::move(view_name) },
false).discard_result(); false).discard_result();
} }
@@ -281,7 +284,7 @@ system_distributed_keyspace::insert_cdc_topology_description(
return _qp.execute_internal( return _qp.execute_internal(
format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION), format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) }, { time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) },
false).discard_result(); false).discard_result();
} }
@@ -293,7 +296,7 @@ system_distributed_keyspace::read_cdc_topology_description(
return _qp.execute_internal( return _qp.execute_internal(
format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ time }, { time },
false false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> std::optional<cdc::topology_description> { ).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> std::optional<cdc::topology_description> {
@@ -321,7 +324,7 @@ system_distributed_keyspace::expire_cdc_topology_description(
return _qp.execute_internal( return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ expiration_time, streams_ts }, { expiration_time, streams_ts },
false).discard_result(); false).discard_result();
} }
@@ -342,7 +345,7 @@ system_distributed_keyspace::create_cdc_desc(
return _qp.execute_internal( return _qp.execute_internal(
format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC), format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ time, make_set_value(cdc_streams_set_type, prepare_cdc_streams(streams)) }, { time, make_set_value(cdc_streams_set_type, prepare_cdc_streams(streams)) },
false).discard_result(); false).discard_result();
} }
@@ -355,7 +358,7 @@ system_distributed_keyspace::expire_cdc_desc(
return _qp.execute_internal( return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC), format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ expiration_time, streams_ts }, { expiration_time, streams_ts },
false).discard_result(); false).discard_result();
} }
@@ -367,7 +370,7 @@ system_distributed_keyspace::cdc_desc_exists(
return _qp.execute_internal( return _qp.execute_internal(
format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC), format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{ streams_ts }, { streams_ts },
false false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> bool { ).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> bool {
@@ -380,7 +383,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(context ctx) {
return _qp.execute_internal( return _qp.execute_internal(
format("SELECT * FROM {}.{}", NAME, CDC_DESC), format("SELECT * FROM {}.{}", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners), quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config, internal_distributed_query_state(),
{}, {},
false false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) { ).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {

33
docs/roles.md Normal file
View File

@@ -0,0 +1,33 @@
# Per-role parameters
Scylla allows configuring per-role parameters. The current list of parameters includes:
* `read_timeout` - custom timeout for read operations
* `write_timeout` - custom timeout for write operations
## Examples
In order to set up per-role parameters, one should use the already existing CQL API for roles:
```cql
CREATE ROLE example WITH options = {'read_timeout': 50ms}
```
or
```cql
ALTER ROLE example WITH options = {'read_timeout': 1s, 'write_timeout': 500ms}
```
Once a session with given role is established, it will use per-role timeouts instead of globally configured
timeouts, if there are any.
Role options can be viewed with the standard `LIST ROLES` statement:
```cql
LIST ROLES;
role | super | login | options
-----------+-------+-------+-----------------------------------------------------
cassandra | True | True | {'read_timeout': '50ms', 'write_timeout': '1000us'}
```
## Supported authenticators
Roles are currently supported in PasswordAuthenticator.

View File

@@ -60,7 +60,7 @@ public:
,_read_consistency(rcl) ,_read_consistency(rcl)
,_write_consistency(wcl) ,_write_consistency(wcl)
,_timeout_config(tc) ,_timeout_config(tc)
,_client_state(service::client_state::external_tag{}, auth, addr) ,_client_state(service::client_state::external_tag{}, auth, tc, addr)
,_total_redis_db_count(total_redis_db_count) ,_total_redis_db_count(total_redis_db_count)
{ {
} }
@@ -75,7 +75,7 @@ public:
,_read_consistency(rcl) ,_read_consistency(rcl)
,_write_consistency(wcl) ,_write_consistency(wcl)
,_timeout_config(tc) ,_timeout_config(tc)
,_client_state(service::client_state::external_tag{}, auth, addr) ,_client_state(service::client_state::external_tag{}, auth, tc, addr)
,_total_redis_db_count(total_redis_db_count) ,_total_redis_db_count(total_redis_db_count)
{ {
} }

View File

@@ -53,6 +53,7 @@
#include "db/system_distributed_keyspace.hh" #include "db/system_distributed_keyspace.hh"
#include "database.hh" #include "database.hh"
#include "cdc/log.hh" #include "cdc/log.hh"
#include "concrete_types.hh"
thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0; thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0;
@@ -60,6 +61,56 @@ void service::client_state::set_login(auth::authenticated_user user) {
_user = std::move(user); _user = std::move(user);
} }
service::client_state::client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address, bool thrift)
: _is_internal(false)
, _is_thrift(thrift)
, _remote_address(remote_address)
, _auth_service(&auth_service)
, _default_timeout_config(timeout_config)
, _timeout_config(timeout_config) {
if (!auth_service.underlying_authenticator().require_authentication()) {
_user = auth::authenticated_user();
}
}
future<> service::client_state::update_per_role_params() {
if (!_user || auth::is_anonymous(*_user)) {
return make_ready_future<>();
}
//FIXME: replace with a coroutine once they're widely accepted
return seastar::async([this] {
auth::role_set roles = _auth_service->get_roles(*_user->name).get();
db::timeout_clock::duration read_timeout = db::timeout_clock::duration::max();
db::timeout_clock::duration write_timeout = db::timeout_clock::duration::max();
auto get_duration = [&] (const sstring& repr) {
data_value v = duration_type->deserialize(duration_type->from_string(repr));
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
return std::chrono::duration_cast<lowres_clock::duration>(std::chrono::nanoseconds(duration.nanoseconds));
};
for (const auto& role : roles) {
auto options = _auth_service->underlying_authenticator().query_custom_options(role).get();
auto read_timeout_it = options.find("read_timeout");
if (read_timeout_it != options.end()) {
read_timeout = std::min(read_timeout, get_duration(read_timeout_it->second));
}
auto write_timeout_it = options.find("write_timeout");
if (write_timeout_it != options.end()) {
write_timeout = std::min(write_timeout, get_duration(write_timeout_it->second));
}
}
_timeout_config.read_timeout = read_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.read_timeout : read_timeout;
_timeout_config.range_read_timeout = read_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.range_read_timeout : read_timeout;
_timeout_config.write_timeout = write_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.write_timeout : write_timeout;
_timeout_config.counter_write_timeout = write_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.counter_write_timeout : write_timeout;
});
}
future<> service::client_state::check_user_can_login() { future<> service::client_state::check_user_can_login() {
if (auth::is_anonymous(*_user)) { if (auth::is_anonymous(*_user)) {
return make_ready_future(); return make_ready_future();

View File

@@ -44,6 +44,7 @@
#include "auth/service.hh" #include "auth/service.hh"
#include "exceptions/exceptions.hh" #include "exceptions/exceptions.hh"
#include "unimplemented.hh" #include "unimplemented.hh"
#include "timeout_config.hh"
#include "timestamp.hh" #include "timestamp.hh"
#include "db_clock.hh" #include "db_clock.hh"
#include "database_fwd.hh" #include "database_fwd.hh"
@@ -88,10 +89,17 @@ public:
}; };
private: private:
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service) client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state), : _keyspace(cs->_keyspace)
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address), , _user(cs->_user)
_auth_service(auth_service ? &auth_service->local() : nullptr), , _auth_state(cs->_auth_state)
_enabled_protocol_extensions(cs->_enabled_protocol_extensions) {} , _is_internal(cs->_is_internal)
, _is_thrift(cs->_is_thrift)
, _remote_address(cs->_remote_address)
, _auth_service(auth_service ? &auth_service->local() : nullptr)
, _default_timeout_config(cs->_default_timeout_config)
, _timeout_config(cs->_timeout_config)
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
{}
friend client_state_for_another_shard; friend client_state_for_another_shard;
private: private:
sstring _keyspace; sstring _keyspace;
@@ -136,6 +144,10 @@ private:
// Only populated for external client state. // Only populated for external client state.
auth::service* _auth_service{nullptr}; auth::service* _auth_service{nullptr};
// For restoring default values in the timeout config
timeout_config _default_timeout_config;
timeout_config _timeout_config;
public: public:
struct internal_tag {}; struct internal_tag {};
struct external_tag {}; struct external_tag {};
@@ -162,15 +174,7 @@ public:
_driver_version = std::move(driver_version); _driver_version = std::move(driver_version);
} }
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false) client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address = socket_address(), bool thrift = false);
: _is_internal(false)
, _is_thrift(thrift)
, _remote_address(remote_address)
, _auth_service(&auth_service) {
if (!auth_service.underlying_authenticator().require_authentication()) {
_user = auth::authenticated_user();
}
}
gms::inet_address get_client_address() const { gms::inet_address get_client_address() const {
return gms::inet_address(_remote_address); return gms::inet_address(_remote_address);
@@ -180,10 +184,19 @@ public:
return _remote_address.port(); return _remote_address.port();
} }
client_state(internal_tag) const timeout_config& get_timeout_config() const {
return _timeout_config;
}
client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config)
{}
client_state(internal_tag, const timeout_config& config)
: _keyspace("system") : _keyspace("system")
, _is_internal(true) , _is_internal(true)
, _is_thrift(false) , _is_thrift(false)
, _default_timeout_config(config)
, _timeout_config(config)
{} {}
client_state(const client_state&) = delete; client_state(const client_state&) = delete;
@@ -315,6 +328,7 @@ public:
auth::command_desc::type = auth::command_desc::type::OTHER) const; auth::command_desc::type = auth::command_desc::type::OTHER) const;
future<> has_schema_access(const schema& s, auth::permission p) const; future<> has_schema_access(const schema& s, auth::permission p) const;
future<> update_per_role_params();
private: private:
future<> has_access(const sstring& keyspace, auth::command_desc) const; future<> has_access(const sstring& keyspace, auth::command_desc) const;

View File

@@ -44,6 +44,7 @@
#include "db/config.hh" #include "db/config.hh"
#include "cql3/query_processor.hh" #include "cql3/query_processor.hh"
#include "types/map.hh"
SEASTAR_TEST_CASE(test_default_authenticator) { SEASTAR_TEST_CASE(test_default_authenticator) {
return do_with_cql_env([](cql_test_env& env) { return do_with_cql_env([](cql_test_env& env) {
@@ -216,3 +217,48 @@ SEASTAR_TEST_CASE(alter_opts_on_system_auth_tables) {
cquery_nofail(env, "ALTER TABLE system_auth.role_permissions WITH min_index_interval = 456"); cquery_nofail(env, "ALTER TABLE system_auth.role_permissions WITH min_index_interval = 456");
}, auth_on()); }, auth_on());
} }
SEASTAR_TEST_CASE(test_alter_with_timeouts) {
auto cfg = make_shared<db::config>();
cfg->authenticator(sstring(auth::password_authenticator_name));
return do_with_cql_env_thread([] (cql_test_env& e) {
auth::role_config config {
.can_login = true,
};
auth::authentication_options opts {
.password = "pass"
};
auth::create_role(e.local_auth_service(), "user", config, opts).get();
authenticate(e, "user", "pass").get();
cquery_nofail(e, "CREATE TABLE t (id int PRIMARY KEY, v int)");
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 5ms, 'write_timeout': 1h30m}");
auto my_map_type = map_type_impl::get_instance(utf8_type, utf8_type, false);
auto msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
assert_that(msg).is_rows().with_rows({{
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"read_timeout", "5ms"}, {"write_timeout", "1h30m"}}))),
}});
cquery_nofail(e, "ALTER ROLE user WITH options = {'write_timeout': 35s}");
msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
assert_that(msg).is_rows().with_rows({{
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"write_timeout", "35s"}}))),
}});
// Setting a timeout value of 0 makes little sense, but it's great for testing
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 0s, 'write_timeout': 0s}");
BOOST_REQUIRE_THROW(e.execute_cql("SELECT * FROM t").get(), exceptions::read_timeout_exception);
BOOST_REQUIRE_THROW(e.execute_cql("INSERT INTO t (id, v) VALUES (1,2)").get(), exceptions::mutation_write_failure_exception);
cquery_nofail(e, "ALTER ROLE user WITH options = {}");
cquery_nofail(e, "SELECT * FROM t");
cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)");
// Only valid timeout values are accepted
BOOST_REQUIRE_THROW(e.execute_cql("ALTER ROLE user WITH options = {'read_timeout': 'I am not a valid duration'}").get(), marshal_exception);
}, cfg);
}

View File

@@ -173,7 +173,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception); BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception);
auto make_query_options = [] (cql_protocol_version_type version) { auto make_query_options = [] (cql_protocol_version_type version) {
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt, return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt,
std::vector<cql3::raw_value_view>(), false, std::vector<cql3::raw_value_view>(), false,
cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version}); cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version});
}; };

View File

@@ -3013,7 +3013,7 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get(); e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get();
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0(); auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0();
assert_that(res).is_rows().is_empty(); assert_that(res).is_rows().is_empty();
@@ -4444,7 +4444,6 @@ static std::unique_ptr<cql3::query_options> q_serial_opts(
const auto& so = cql3::query_options::specific_options::DEFAULT; const auto& so = cql3::query_options::specific_options::DEFAULT;
auto qo = std::make_unique<cql3::query_options>( auto qo = std::make_unique<cql3::query_options>(
cl, cl,
infinite_timeout_config,
values, values,
// Ensure (optional) serial consistency is always specified. // Ensure (optional) serial consistency is always specified.
cql3::query_options::specific_options{ cql3::query_options::specific_options{
@@ -4634,7 +4633,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC"); const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
int32_t page_size = is_paged ? 10000 : -1; int32_t page_size = is_paged ? 10000 : -1;
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows; const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;

View File

@@ -831,14 +831,14 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(6), boolean_type->decompose(false)}, { int32_type->decompose(6), boolean_type->decompose(false)},
}); });
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(3), boolean_type->decompose(true)}, { int32_type->decompose(3), boolean_type->decompose(true)},
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(6), boolean_type->decompose(false)}, { int32_type->decompose(6), boolean_type->decompose(false)},
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -857,7 +857,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(2), boolean_type->decompose(false)} { int32_type->decompose(2), boolean_type->decompose(false)}
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -866,7 +866,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(4), boolean_type->decompose(false)} { int32_type->decompose(4), boolean_type->decompose(false)}
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg); auto paging_state = extract_paging_state(msg);
@@ -877,7 +877,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
// Some pages might be empty and in such case we should continue querying // Some pages might be empty and in such case we should continue querying
size_t rows_fetched = 0; size_t rows_fetched = 0;
while (rows_fetched == 0) { while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg); rows_fetched = count_rows_fetched(msg);
@@ -889,7 +889,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
rows_fetched = 0; rows_fetched = 0;
while (rows_fetched == 0) { while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg); rows_fetched = count_rows_fetched(msg);
@@ -905,7 +905,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
rows_fetched = 0; rows_fetched = 0;
uint64_t remaining = 1; uint64_t remaining = 1;
while (remaining > 0) { while (remaining > 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched += count_rows_fetched(msg); rows_fetched += count_rows_fetched(msg);
@@ -964,7 +964,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
{ int32_type->decompose(1), boolean_type->decompose(false)}, { int32_type->decompose(1), boolean_type->decompose(false)},
}); });
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -972,7 +972,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
{ int32_type->decompose(3), boolean_type->decompose(true)}, { int32_type->decompose(3), boolean_type->decompose(true)},
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
// Some pages might be empty and in such case we should continue querying // Some pages might be empty and in such case we should continue querying
size_t rows_fetched = 0; size_t rows_fetched = 0;
while (rows_fetched == 0) { while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg); rows_fetched = count_rows_fetched(msg);
@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
rows_fetched = 0; rows_fetched = 0;
uint64_t remaining = 1; uint64_t remaining = 1;
while (remaining > 0) { while (remaining > 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()});
sstring query = allow_filtering ? sstring query = allow_filtering ?
fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) : fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) :

View File

@@ -36,7 +36,7 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
} }
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(4321); assert_that(res).is_rows().with_size(4321);

View File

@@ -63,7 +63,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
e.execute_prepared(id, {cql3_pk, cql3_ck}).get(); e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
} }
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("select * from test;", std::move(qo)).get0(); auto msg = e.execute_cql("select * from test;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg); auto paging_state = extract_paging_state(msg);
@@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
paging_state->set_remaining(test_remaining); paging_state->set_remaining(test_remaining);
while (has_more_pages(msg)) { while (has_more_pages(msg)) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0(); msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg); rows_fetched = count_rows_fetched(msg);
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
e.execute_prepared(id, {cql3_pk, cql3_ck}).get(); e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
} }
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0(); auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg); auto paging_state = extract_paging_state(msg);
@@ -113,7 +113,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
paging_state->set_remaining(test_remaining); paging_state->set_remaining(test_remaining);
while (has_more_pages(msg)) { while (has_more_pages(msg)) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0(); msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg); rows_fetched = count_rows_fetched(msg);

View File

@@ -209,8 +209,7 @@ std::unordered_map<sstring, uint64_t> get_query_metrics() {
/// Creates query_options with cl, infinite timeout, and no named values. /// Creates query_options with cl, infinite timeout, and no named values.
auto make_options(clevel cl) { auto make_options(clevel cl) {
return std::make_unique<cql3::query_options>( return std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>());
cl, infinite_timeout_config, std::vector<cql3::raw_value>());
} }
} // anonymous namespace } // anonymous namespace

View File

@@ -45,7 +45,7 @@ std::unique_ptr<cql3::query_options> to_options(
static auto& d = cql3::query_options::DEFAULT; static auto& d = cql3::query_options::DEFAULT;
return std::make_unique<cql3::query_options>( return std::make_unique<cql3::query_options>(
cfg, cfg,
d.get_consistency(), d.get_timeout_config(), std::move(names), std::move(values), d.skip_metadata(), d.get_consistency(), std::move(names), std::move(values), d.skip_metadata(),
d.get_specific_options(), d.get_cql_serialization_format()); d.get_specific_options(), d.get_cql_serialization_format());
} }

View File

@@ -412,7 +412,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
} }
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(101); assert_that(res).is_rows().with_size(101);
@@ -424,7 +424,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
}); });
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_rows({{ assert_that(res).is_rows().with_rows({{
@@ -434,7 +434,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
}); });
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0();
assert_that(res).is_rows().with_rows({{ assert_that(res).is_rows().with_rows({{
@@ -470,7 +470,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
}; };
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
auto paging_state = extract_paging_state(res); auto paging_state = extract_paging_state(res);
@@ -480,7 +480,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
}}); }});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
expect_more_pages(res, true); expect_more_pages(res, true);
@@ -490,7 +490,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}}); }});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
paging_state = extract_paging_state(res); paging_state = extract_paging_state(res);
@@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
try { try {
expect_more_pages(res, false); expect_more_pages(res, false);
} catch (...) { } catch (...) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(0); assert_that(res).is_rows().with_size(0);
@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
}); });
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res); auto paging_state = extract_paging_state(res);
@@ -524,7 +524,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}}); }});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
@@ -534,7 +534,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
}); });
{ {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res); auto paging_state = extract_paging_state(res);
@@ -551,7 +551,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(), paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
paging_state->get_rows_fetched_for_last_partition()); paging_state->get_rows_fetched_for_last_partition());
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
@@ -563,7 +563,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
// not to return rows (since no row matches an empty partition key) // not to return rows (since no row matches an empty partition key)
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt, auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1); 1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
@@ -802,7 +802,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
}; };
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
auto paging_state = extract_paging_state(res); auto paging_state = extract_paging_state(res);
@@ -811,7 +811,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
}}); }});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
@@ -821,7 +821,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
}); });
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0(); auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res); auto paging_state = extract_paging_state(res);
@@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}}); }});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0(); res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
@@ -1158,7 +1158,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
} }
eventually([&] { eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo)); auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function. // Even though we set up paging, we still expect a single result from an aggregation function.
@@ -1173,7 +1173,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)}, { int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo)); msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({
@@ -1191,7 +1191,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str()); cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
} }
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo)); auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function // Even though we set up paging, we still expect a single result from an aggregation function
@@ -1199,7 +1199,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
{ int32_type->decompose(row_count * row_count / 4)}, { int32_type->decompose(row_count * row_count / 4)},
}); });
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo)); msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({ assert_that(msg).is_rows().with_rows({

View File

@@ -125,7 +125,7 @@ private:
service::client_state client_state; service::client_state client_state;
core_local_state(auth::service& auth_service) core_local_state(auth::service& auth_service)
: client_state(service::client_state::external_tag{}, auth_service) : client_state(service::client_state::external_tag{}, auth_service, infinite_timeout_config)
{ {
client_state.set_login(auth::authenticated_user(testing_superuser)); client_state.set_login(auth::authenticated_user(testing_superuser));
} }
@@ -189,7 +189,7 @@ public:
db::consistency_level cl = db::consistency_level::ONE) override { db::consistency_level cl = db::consistency_level::ONE) override {
const auto& so = cql3::query_options::specific_options::DEFAULT; const auto& so = cql3::query_options::specific_options::DEFAULT;
auto options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, auto options = std::make_unique<cql3::query_options>(cl,
std::move(values), cql3::query_options::specific_options{ std::move(values), cql3::query_options::specific_options{
so.page_size, so.page_size,
so.state, so.state,
@@ -226,7 +226,7 @@ public:
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text)); throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
} }
auto& qo = cql3::query_options::DEFAULT; auto& qo = cql3::query_options::DEFAULT;
auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout; auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs) return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
.finally([qs, modif_stmt = std::move(modif_stmt)] {}); .finally([qs, modif_stmt = std::move(modif_stmt)] {});

View File

@@ -228,7 +228,7 @@ SEASTAR_TEST_CASE(scan_enormous_table_test) {
std::unique_ptr<cql3::query_options> qo; std::unique_ptr<cql3::query_options> qo;
uint64_t fetched_rows_log_counter = 1e7; uint64_t fetched_rows_log_counter = 1e7;
do { do {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{}, qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()}); cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0(); msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0();
rows_fetched += count_rows_fetched(msg); rows_fetched += count_rows_fetched(msg);

View File

@@ -104,7 +104,6 @@ std::unique_ptr<cql3::query_options> repl_options() {
const auto& so = cql3::query_options::specific_options::DEFAULT; const auto& so = cql3::query_options::specific_options::DEFAULT;
auto qo = std::make_unique<cql3::query_options>( auto qo = std::make_unique<cql3::query_options>(
db::consistency_level::ONE, db::consistency_level::ONE,
infinite_timeout_config,
std::vector<cql3::raw_value>{}, std::vector<cql3::raw_value>{},
// Ensure (optional) serial consistency is always specified. // Ensure (optional) serial consistency is always specified.
cql3::query_options::specific_options{ cql3::query_options::specific_options{

View File

@@ -201,9 +201,9 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf { class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db; distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor; distributed<cql3::query_processor>& _query_processor;
::timeout_config _timeout_config;
service::client_state _client_state; service::client_state _client_state;
service::query_state _query_state; service::query_state _query_state;
::timeout_config _timeout_config;
private: private:
template <typename Cob, typename Func> template <typename Cob, typename Func>
void void
@@ -220,9 +220,9 @@ public:
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config) explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
: _db(db) : _db(db)
, _query_processor(qp) , _query_processor(qp)
, _client_state(service::client_state::external_tag{}, auth_service, socket_address(), true)
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _timeout_config(timeout_config) , _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true)
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
{ } { }
const sstring& current_keyspace() const { const sstring& current_keyspace() const {
@@ -976,7 +976,7 @@ public:
throw make_exception<InvalidRequestException>("Compressed query strings are not supported"); throw make_exception<InvalidRequestException>("Compressed query strings are not supported");
} }
auto& qp = _query_processor.local(); auto& qp = _query_processor.local();
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(), auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::vector<cql3::raw_value_view>(),
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
auto f = qp.execute_direct(query, _query_state, *opts); auto f = qp.execute_direct(query, _query_state, *opts);
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
@@ -1056,7 +1056,7 @@ public:
return cql3::raw_value::make_value(to_bytes(s)); return cql3::raw_value::make_value(to_bytes(s));
}); });
auto& qp = _query_processor.local(); auto& qp = _query_processor.local();
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::move(bytes_values),
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization); auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization);
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {

View File

@@ -63,9 +63,13 @@ const sstring trace_keyspace_helper::EVENTS("events");
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log"); const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log");
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx"); const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx");
timeout_config tracing_db_timeout_config { static service::client_state& tracing_client_state() {
5s, 5s, 5s, 5s, 5s, 5s, 5s, static timeout_config tracing_db_timeout_config {
}; 5s, 5s, 5s, 5s, 5s, 5s, 5s,
};
static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config);
return s;
}
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base { struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
int64_t last_nanos = 0; int64_t last_nanos = 0;
@@ -75,7 +79,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat
trace_keyspace_helper::trace_keyspace_helper(tracing& tr) trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
: i_tracing_backend_helper(tr) : i_tracing_backend_helper(tr)
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit()) , _dummy_query_state(tracing_client_state(), empty_service_permit())
, _sessions(KEYSPACE_NAME, SESSIONS, , _sessions(KEYSPACE_NAME, SESSIONS,
sprint("CREATE TABLE IF NOT EXISTS %s.%s (" sprint("CREATE TABLE IF NOT EXISTS %s.%s ("
"session_id uuid," "session_id uuid,"
@@ -313,7 +317,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
}; };
return cql3::query_options(cql3::default_cql_config, return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
} }
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) { cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
@@ -331,7 +335,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
}; };
return cql3::query_options(cql3::default_cql_config, return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
} }
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
@@ -374,7 +378,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
}); });
return cql3::query_options(cql3::default_cql_config, return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
} }
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
@@ -395,7 +399,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
}); });
return cql3::query_options(cql3::default_cql_config, return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
} }
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) { std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
@@ -431,7 +435,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); }); std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
return do_with( return do_with(
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()), cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
[this] (auto& batch_options, auto& batch) { [this] (auto& batch_options, auto& batch) {
return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); }); return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });

View File

@@ -219,10 +219,10 @@ private:
options_flag::NAMES_FOR_VALUES options_flag::NAMES_FOR_VALUES
>; >;
public: public:
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) { std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const cql3::cql_config& cql_config) {
auto consistency = read_consistency(); auto consistency = read_consistency();
if (version == 1) { if (version == 1) {
return std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{}, return std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::vector<cql3::raw_value_view>{},
false, cql3::query_options::specific_options::DEFAULT, cql_ser_format); false, cql3::query_options::specific_options::DEFAULT, cql_ser_format);
} }
@@ -270,11 +270,11 @@ public:
if (!names.empty()) { if (!names.empty()) {
onames = std::move(names); onames = std::move(names);
} }
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata, options = std::make_unique<cql3::query_options>(cql_config, consistency, std::move(onames), std::move(values), skip_metadata,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts},
cql_ser_format); cql_ser_format);
} else { } else {
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata, options = std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::move(values), skip_metadata,
cql3::query_options::specific_options::DEFAULT, cql_ser_format); cql3::query_options::specific_options::DEFAULT, cql_ser_format);
} }

View File

@@ -563,7 +563,7 @@ cql_server::connection::connection(cql_server& server, socket_address server_add
, _fd(std::move(fd)) , _fd(std::move(fd))
, _read_buf(_fd.input()) , _read_buf(_fd.input())
, _write_buf(_fd.output()) , _write_buf(_fd.output())
, _client_state(service::client_state::external_tag{}, server._auth_service, addr) , _client_state(service::client_state::external_tag{}, server._auth_service, server.timeout_config(), addr)
{ {
++_server._total_connections; ++_server._total_connections;
++_server._current_connections; ++_server._current_connections;
@@ -855,6 +855,9 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username))); std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username)));
}); });
} }
f = f.then([&client_state] {
return client_state.update_per_role_params();
});
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable { return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state)); return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
}); });
@@ -891,7 +894,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable { (bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
request_reader in(is, linearization_buffer); request_reader in(is, linearization_buffer);
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
server.timeout_config(), /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) { /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
// result here has to be foreign ptr // result here has to be foreign ptr
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg)); return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
}); });
@@ -906,7 +909,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
fragmented_temporary_buffer::istream is = in.get_stream(); fragmented_temporary_buffer::istream is = in.get_stream();
return process_fn(client_state, _server._query_processor, in, stream, return process_fn(client_state, _server._query_processor, in, stream,
_version, _cql_serialization_format, _server.timeout_config(), permit, trace_state, true) _version, _cql_serialization_format, permit, trace_state, true)
.then([stream, &client_state, this, is, permit, process_fn, trace_state] .then([stream, &client_state, this, is, permit, process_fn, trace_state]
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable { (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
unsigned* shard = std::get_if<unsigned>(&msg); unsigned* shard = std::get_if<unsigned>(&msg);
@@ -920,12 +923,12 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>> static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in, process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state, service_permit permit, tracing::trace_state_ptr trace_state,
bool init_trace) { bool init_trace) {
auto query = in.read_long_string_view(); auto query = in.read_long_string_view();
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit)); auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state; auto& query_state = q_state->query_state;
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config()); q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
auto& options = *q_state->options; auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata(); auto skip_metadata = options.skip_metadata();
@@ -988,8 +991,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>> static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in, process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit, service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
tracing::trace_state_ptr trace_state, bool init_trace) {
cql3::prepared_cache_key_type cache_key(in.read_short_bytes()); cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
bool needs_authorization = false; bool needs_authorization = false;
@@ -1012,10 +1014,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
std::vector<cql3::raw_value_view> values; std::vector<cql3::raw_value_view> values;
in.read_value_view_list(version, values); in.read_value_view_list(version, values);
auto consistency = in.read_consistency(); auto consistency = in.read_consistency();
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, timeout_config, std::nullopt, values, false, q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, std::nullopt, values, false,
cql3::query_options::specific_options::DEFAULT, serialization_format); cql3::query_options::specific_options::DEFAULT, serialization_format);
} else { } else {
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config()); q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
} }
auto& options = *q_state->options; auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata(); auto skip_metadata = options.skip_metadata();
@@ -1068,8 +1070,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>> static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in, process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit, service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
tracing::trace_state_ptr trace_state, bool init_trace) {
if (version == 1) { if (version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol"); throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
} }
@@ -1158,7 +1159,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
auto& query_state = q_state->query_state; auto& query_state = q_state->query_state;
// #563. CQL v2 encodes query_options in v1 format for batch requests. // #563. CQL v2 encodes query_options in v1 format for batch requests.
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format, q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,
timeout_config, qp.local().get_cql_config())), std::move(values))); qp.local().get_cql_config())), std::move(values)));
auto& options = *q_state->options; auto& options = *q_state->options;
if (init_trace) { if (init_trace) {