Compare commits

...

10 Commits

Author SHA1 Message Date
Piotr Sarna
fcb349b026 tests: add tests for per-role timeouts
The test cases verify that setting timeout parameters per-role
works and is validated.
2020-11-27 12:43:53 +01:00
Piotr Sarna
28c558af95 docs: add a paragaph about per-role parameters
This paragraph is also the first one in newly crated roles.md,
which should be later filled with more information about roles.
2020-11-27 12:43:53 +01:00
Piotr Sarna
83b47ae394 cql3: add validating per-role timeout options
Per-role timeout options are now validated when set:
 - they should represent a valid duration
 - the duration should have millisecond granularity,
   since the timeout clock does not support micro/nanoseconds.
2020-11-27 12:37:27 +01:00
Piotr Sarna
391d1f2b21 client_state: add updating per-role params
Per-role parameters (currently: read_timeout and write_timeout)
are now updated when a new connection is established.
Also, the changes are immediately propagated for the connection
which sent the CREATE ROLE/ALTER ROLE statement.
The other connections which have the changed role are currently
not immediately reloaded.
It can be done in the future if needed, but all sessions with
given roles should be tracked, or, alternatively, all sessions
should be iterated and changed.
2020-11-27 12:37:27 +01:00
Piotr Sarna
137a8a0161 auth: add options support to password authenticator
Custom options will be used later to provide per-role timeouts
and other useful parameters.
2020-11-27 12:37:17 +01:00
Piotr Sarna
c473cb4a2d treewide: remove timeout config from query options
Timeout config is now stored in each connection, so there's no point
in tracking it inside each query as well. This patch removes
timeout_config from query_options and follows by removing now
unnecessary parameters of many functions and constructors.
2020-11-26 17:56:55 +01:00
Piotr Sarna
98fac66361 cql3: use timeout config from client state instead of query options
... in batch statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
2cbeb3678f cql3: use timeout config from client state instead of query options
... in modification statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
d61e1fd174 cql3: use timeout config from client state instead of query options
... in select statement, in order to be able to remove the timeout
from query options later.
2020-11-26 17:55:29 +01:00
Piotr Sarna
f31ac0a8ca service: add timeout config to client state
Future patches will use this per-connection timeout config
to allow setting different timeouts for each session,
based on roles.
2020-11-26 17:55:14 +01:00
41 changed files with 427 additions and 212 deletions

View File

@@ -129,8 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name);
auto cl = auth::password_authenticator::consistency_for_user(username);
auto& timeout = auth::internal_distributed_timeout_config();
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
return qp.execute_internal(query, cl, auth::internal_distributed_query_state(), {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
auto res = f.get0();
auto salted_hash = std::optional<sstring>();
if (res->empty()) {

View File

@@ -2845,7 +2845,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);

View File

@@ -108,7 +108,7 @@ future<> wait_for_schema_agreement(::service::migration_manager& mm, const datab
});
}
const timeout_config& internal_distributed_timeout_config() noexcept {
::service::query_state& internal_distributed_query_state() noexcept {
#ifdef DEBUG
// Give the much slower debug tests more headroom for completing auth queries.
static const auto t = 30s;
@@ -116,7 +116,9 @@ const timeout_config& internal_distributed_timeout_config() noexcept {
static const auto t = 5s;
#endif
static const timeout_config tc{t, t, t, t, t, t, t};
return tc;
static thread_local ::service::client_state cs(::service::client_state::internal_tag{}, tc);
static thread_local ::service::query_state qs(cs, empty_service_permit());
return qs;
}
}

View File

@@ -35,6 +35,7 @@
#include "log.hh"
#include "seastarx.hh"
#include "utils/exponential_backoff_retry.hh"
#include "service/query_state.hh"
using namespace std::chrono_literals;
@@ -87,6 +88,6 @@ future<> wait_for_schema_agreement(::service::migration_manager&, const database
///
/// Time-outs for internal, non-local CQL queries.
///
const timeout_config& internal_distributed_timeout_config() noexcept;
::service::query_state& internal_distributed_query_state() noexcept;
}

View File

@@ -103,7 +103,6 @@ future<bool> default_authorizer::any_granted() const {
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{},
true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return !results->empty();
@@ -116,8 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const {
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
return do_with(
row.get_as<sstring>("username"),
@@ -197,7 +195,6 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return permissions::NONE;
@@ -226,7 +223,7 @@ default_authorizer::modify(
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
});
}
@@ -251,7 +248,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{},
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
std::vector<permission_details> all_details;
@@ -278,7 +275,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const {
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
try {
std::rethrow_exception(ep);
@@ -298,7 +295,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
@@ -315,7 +311,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
[resource](auto ep) {
try {

View File

@@ -66,6 +66,7 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
// name of the hash column.
static constexpr std::string_view SALTED_HASH = "salted_hash";
static constexpr std::string_view OPTIONS = "options";
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
@@ -114,7 +115,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
auto username = row.get_as<sstring>("username");
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
@@ -122,7 +123,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.execute_internal(
update_row_query(),
consistency_for_user(username),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{std::move(salted_hash), username}).discard_result();
}).finally([results] {});
}).then([] {
@@ -139,7 +140,7 @@ future<> password_authenticator::create_default_if_missing() const {
return _qp.execute_internal(
update_row_query(),
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) {
plogger.info("Created default superuser authentication record.");
});
@@ -203,11 +204,11 @@ bool password_authenticator::require_authentication() const {
}
authentication_option_set password_authenticator::supported_options() const {
return authentication_option_set{authentication_option::password};
return authentication_option_set{authentication_option::password, authentication_option::options};
}
authentication_option_set password_authenticator::alterable_options() const {
return authentication_option_set{authentication_option::password};
return authentication_option_set{authentication_option::password, authentication_option::options};
}
future<authenticated_user> password_authenticator::authenticate(
@@ -236,7 +237,7 @@ future<authenticated_user> password_authenticator::authenticate(
return _qp.execute_internal(
query,
consistency_for_user(username),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{username},
true);
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
@@ -262,21 +263,46 @@ future<authenticated_user> password_authenticator::authenticate(
});
}
future<> password_authenticator::maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const {
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
meta::roles_table::qualified_name,
OPTIONS,
meta::roles_table::role_col_name);
if (!options.options) {
return make_ready_future<>();
}
std::vector<std::pair<data_value, data_value>> entries;
for (const auto& entry : *options.options) {
entries.push_back({data_value(entry.first), data_value(entry.second)});
}
auto map_value = make_map_value(map_type_impl::get_instance(utf8_type, utf8_type, false), entries);
return _qp.execute_internal(
query,
consistency_for_user(role_name),
internal_distributed_query_state(),
{std::move(map_value), sstring(role_name)}).discard_result();
}
future<> password_authenticator::create(std::string_view role_name, const authentication_options& options) const {
if (!options.password) {
return make_ready_future<>();
return maybe_update_custom_options(role_name, options);
}
return _qp.execute_internal(
update_row_query(),
consistency_for_user(role_name),
internal_distributed_timeout_config(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
internal_distributed_query_state(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
return maybe_update_custom_options(role_name, options);
});
}
future<> password_authenticator::alter(std::string_view role_name, const authentication_options& options) const {
if (!options.password) {
return make_ready_future<>();
return maybe_update_custom_options(role_name, options);
}
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
@@ -287,8 +313,10 @@ future<> password_authenticator::alter(std::string_view role_name, const authent
return _qp.execute_internal(
query,
consistency_for_user(role_name),
internal_distributed_timeout_config(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
internal_distributed_query_state(),
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
return maybe_update_custom_options(role_name, options);
}).discard_result();
}
future<> password_authenticator::drop(std::string_view name) const {
@@ -299,12 +327,27 @@ future<> password_authenticator::drop(std::string_view name) const {
return _qp.execute_internal(
query, consistency_for_user(name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(name)}).discard_result();
}
future<custom_options> password_authenticator::query_custom_options(std::string_view role_name) const {
return make_ready_future<custom_options>();
static const sstring query = format("SELECT {} FROM {} WHERE {} = ?",
OPTIONS,
meta::roles_table::qualified_name,
meta::roles_table::role_col_name);
return _qp.execute_internal(
query, consistency_for_user(role_name),
internal_distributed_query_state(),
{sstring(role_name)}).then([](::shared_ptr<cql3::untyped_result_set> rs) {
custom_options opts;
const auto& row = rs->one();
if (row.has(OPTIONS)) {
row.get_map_data<sstring, sstring>(OPTIONS, std::inserter(opts, opts.end()), utf8_type, utf8_type);
}
return opts;
});
}
const resource_set& password_authenticator::protected_resources() const {

View File

@@ -94,6 +94,8 @@ public:
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
private:
future<> maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const;
bool legacy_metadata_exists() const;
future<> migrate_legacy_metadata() const;

View File

@@ -43,7 +43,8 @@ std::string_view creation_query() {
" can_login boolean,"
" is_superuser boolean,"
" member_of set<text>,"
" salted_hash text"
" salted_hash text,"
" options frozen<map<text, text>>,"
")",
qualified_name,
role_col_name);
@@ -68,14 +69,13 @@ future<bool> default_role_row_satisfies(
return qp.execute_internal(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{meta::DEFAULT_SUPERUSER_NAME},
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
@@ -100,7 +100,7 @@ future<bool> any_nondefault_role_row_satisfies(
return qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
internal_distributed_query_state()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return false;
}

View File

@@ -210,7 +210,6 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal(
default_user_query,
db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) {
if (!results->empty()) {
@@ -220,7 +219,6 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal(
default_user_query,
db::consistency_level::QUORUM,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) {
if (!results->empty()) {
@@ -229,8 +227,7 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.execute_internal(
all_users_query,
db::consistency_level::QUORUM,
infinite_timeout_config).then([](auto results) {
db::consistency_level::QUORUM).then([](auto results) {
return make_ready_future<bool>(!results->empty());
});
});

View File

@@ -86,7 +86,7 @@ static future<std::optional<record>> find_record(cql3::query_processor& qp, std:
return qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name)},
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
@@ -165,7 +165,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
return make_ready_future<>();
@@ -192,7 +192,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
role_config config;
config.is_superuser = row.get_or<bool>("super", false);
@@ -253,7 +253,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name), c.is_superuser, c.can_login},
true).discard_result();
}
@@ -296,7 +296,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
build_column_assignments(u),
meta::roles_table::role_col_name),
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name)}).discard_result();
});
}
@@ -315,7 +315,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
return parallel_for_each(
members->begin(),
@@ -354,7 +354,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name)}).discard_result();
};
@@ -381,7 +381,7 @@ standard_role_manager::modify_membership(
return _qp.execute_internal(
query,
consistency_for_role(grantee_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
};
@@ -392,7 +392,7 @@ standard_role_manager::modify_membership(
format("INSERT INTO {} (role, member) VALUES (?, ?)",
meta::role_members_table::qualified_name),
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name), sstring(grantee_name)}).discard_result();
case membership_change::remove:
@@ -400,7 +400,7 @@ standard_role_manager::modify_membership(
format("DELETE FROM {} WHERE role = ? AND member = ?",
meta::role_members_table::qualified_name),
consistency_for_role(role_name),
internal_distributed_timeout_config(),
internal_distributed_query_state(),
{sstring(role_name), sstring(grantee_name)}).discard_result();
}
@@ -503,7 +503,7 @@ future<role_set> standard_role_manager::query_all() const {
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([](::shared_ptr<cql3::untyped_result_set> results) {
internal_distributed_query_state()).then([](::shared_ptr<cql3::untyped_result_set> results) {
role_set roles;
std::transform(

View File

@@ -50,12 +50,11 @@ const cql_config default_cql_config;
thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp};
thread_local query_options query_options::DEFAULT{default_cql_config,
db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
db::consistency_level::ONE, std::nullopt,
std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()};
query_options::query_options(const cql_config& cfg,
db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values,
std::vector<cql3::raw_value_view> value_views,
@@ -64,7 +63,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf)
: _cql_config(cfg)
, _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names))
, _values(std::move(values))
, _value_views(value_views)
@@ -76,7 +74,6 @@ query_options::query_options(const cql_config& cfg,
query_options::query_options(const cql_config& cfg,
db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values,
bool skip_metadata,
@@ -84,7 +81,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf)
: _cql_config(cfg)
, _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names))
, _values(std::move(values))
, _value_views()
@@ -97,7 +93,6 @@ query_options::query_options(const cql_config& cfg,
query_options::query_options(const cql_config& cfg,
db::consistency_level consistency,
const ::timeout_config& timeout_config,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value_view> value_views,
bool skip_metadata,
@@ -105,7 +100,6 @@ query_options::query_options(const cql_config& cfg,
cql_serialization_format sf)
: _cql_config(cfg)
, _consistency(consistency)
, _timeout_config(timeout_config)
, _names(std::move(names))
, _values()
, _value_views(std::move(value_views))
@@ -115,12 +109,11 @@ query_options::query_options(const cql_config& cfg,
{
}
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values,
query_options::query_options(db::consistency_level cl, std::vector<cql3::raw_value> values,
specific_options options)
: query_options(
default_cql_config,
cl,
timeout_config,
{},
std::move(values),
false,
@@ -133,7 +126,6 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state)
: query_options(qo->_cql_config,
qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names),
std::move(qo->_values),
std::move(qo->_value_views),
@@ -146,7 +138,6 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
: query_options(qo->_cql_config,
qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names),
std::move(qo->_values),
std::move(qo->_value_views),
@@ -158,7 +149,7 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
query_options::query_options(std::vector<cql3::raw_value> values)
: query_options(
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
db::consistency_level::ONE, std::move(values))
{}
void query_options::prepare(const std::vector<lw_shared_ptr<column_specification>>& specs)

View File

@@ -51,7 +51,6 @@
#include "cql3/column_identifier.hh"
#include "cql3/values.hh"
#include "cql_serialization_format.hh"
#include "timeout_config.hh"
namespace cql3 {
@@ -75,7 +74,6 @@ public:
private:
const cql_config& _cql_config;
const db::consistency_level _consistency;
const timeout_config& _timeout_config;
const std::optional<std::vector<sstring_view>> _names;
std::vector<cql3::raw_value> _values;
std::vector<cql3::raw_value_view> _value_views;
@@ -109,7 +107,6 @@ public:
explicit query_options(const cql_config& cfg,
db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values,
bool skip_metadata,
@@ -117,7 +114,6 @@ public:
cql_serialization_format sf);
explicit query_options(const cql_config& cfg,
db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value> values,
std::vector<cql3::raw_value_view> value_views,
@@ -126,7 +122,6 @@ public:
cql_serialization_format sf);
explicit query_options(const cql_config& cfg,
db::consistency_level consistency,
const timeout_config& timeouts,
std::optional<std::vector<sstring_view>> names,
std::vector<cql3::raw_value_view> value_views,
bool skip_metadata,
@@ -158,13 +153,10 @@ public:
// forInternalUse
explicit query_options(std::vector<cql3::raw_value> values);
explicit query_options(db::consistency_level, const timeout_config& timeouts,
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(db::consistency_level, std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state);
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
const timeout_config& get_timeout_config() const { return _timeout_config; }
db::consistency_level get_consistency() const {
return _consistency;
}
@@ -258,7 +250,7 @@ query_options::query_options(query_options&& o, std::vector<OneMutationDataRange
std::vector<query_options> tmp;
tmp.reserve(values_ranges.size());
std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) {
return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
return query_options(_cql_config, _consistency, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
});
_batch_options = std::move(tmp);
}

View File

@@ -619,7 +619,6 @@ query_options query_processor::make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>& values,
db::consistency_level cl,
const timeout_config& timeout_config,
int32_t page_size) const {
if (p->bound_names.size() != values.size()) {
throw std::invalid_argument(
@@ -643,11 +642,10 @@ query_options query_processor::make_internal_options(
api::timestamp_type ts = api::missing_timestamp;
return query_options(
cl,
timeout_config,
bound_values,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
}
return query_options(cl, timeout_config, bound_values);
return query_options(cl, bound_values);
}
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
@@ -671,7 +669,7 @@ struct internal_query_state {
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
const std::initializer_list<data_value>& values, int32_t page_size) {
auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config, page_size);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
internal_query_state{
query_string,
@@ -789,7 +787,16 @@ future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
const sstring& query_string,
db::consistency_level cl,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& values,
bool cache) {
return execute_internal(query_string, cl, *_internal_state, values, cache);
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
const sstring& query_string,
db::consistency_level cl,
service::query_state& query_state,
const std::initializer_list<data_value>& values,
bool cache) {
@@ -797,13 +804,13 @@ query_processor::execute_internal(
log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values));
}
if (cache) {
return execute_with_params(prepare_internal(query_string), cl, timeout_config, values);
return execute_with_params(prepare_internal(query_string), cl, query_state, values);
} else {
auto p = parse_statement(query_string)->prepare(_db, _cql_stats);
p->statement->raw_cql_statement = query_string;
p->statement->validate(_proxy, *_internal_state);
auto checked_weak_ptr = p->checked_weak_from_this();
return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
return execute_with_params(std::move(checked_weak_ptr), cl, query_state, values).finally([p = std::move(p)] {});
}
}
@@ -811,11 +818,11 @@ future<::shared_ptr<untyped_result_set>>
query_processor::execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level cl,
const timeout_config& timeout_config,
service::query_state& query_state,
const std::initializer_list<data_value>& values) {
auto opts = make_internal_options(p, values, cl, timeout_config);
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) {
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) {
auto opts = make_internal_options(p, values, cl);
return do_with(std::move(opts), [this, &query_state, p = std::move(p)](auto & opts) {
return p->statement->execute(_proxy, query_state, opts).then([](auto msg) {
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
});
});

View File

@@ -215,8 +215,7 @@ public:
// creating namespaces, etc) is explicitly forbidden via this interface.
future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
return execute_internal(query_string, db::consistency_level::ONE,
infinite_timeout_config, values, true);
return execute_internal(query_string, db::consistency_level::ONE, values, true);
}
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
@@ -305,14 +304,19 @@ public:
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& = { },
bool cache = false);
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
service::query_state& query_state,
const std::initializer_list<data_value>& = { },
bool cache = false);
future<::shared_ptr<untyped_result_set>> execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level,
const timeout_config& timeout_config,
service::query_state& query_state,
const std::initializer_list<data_value>& = { });
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
@@ -341,7 +345,6 @@ private:
const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>&,
db::consistency_level,
const timeout_config& timeout_config,
int32_t page_size = -1) const;
future<::shared_ptr<cql_transport::messages::result_message>>

View File

@@ -286,7 +286,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
++_stats.batches;
_stats.statements_in_batches += _statements.size();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + query_state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return get_mutations(storage, options, timeout, local, now, query_state).then([this, &storage, &options, timeout, tr_state = query_state.get_trace_state(),
permit = query_state.get_permit()] (std::vector<mutation> ms) mutable {
return execute_without_conditions(storage, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
@@ -343,7 +343,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now();
const timeout_config& cfg = options.get_timeout_config();
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
auto batch_timeout = now + cfg.write_timeout; // Statement timeout.
auto cas_timeout = now + cfg.cas_timeout; // Ballot contention timeout.
auto read_timeout = now + cfg.read_timeout; // Query timeout.

View File

@@ -286,7 +286,7 @@ modification_statement::do_execute(service::storage_proxy& proxy, service::query
future<>
modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) const {
auto cl = options.get_consistency();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + qs.get_client_state().get_timeout_config().*get_timeout_config_selector();
return get_mutations(proxy, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &proxy, &qs] (auto mutations) {
if (mutations.empty()) {
return now();
@@ -302,7 +302,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
auto cl_for_learn = options.get_consistency();
auto cl_for_paxos = options.check_serial_consistency();
db::timeout_clock::time_point now = db::timeout_clock::now();
const timeout_config& cfg = options.get_timeout_config();
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout.
auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention.

View File

@@ -59,6 +59,7 @@
#include "gms/feature_service.hh"
#include "transport/messages/result_message.hh"
#include "unimplemented.hh"
#include "concrete_types.hh"
namespace cql3 {
@@ -105,6 +106,30 @@ future<> create_role_statement::grant_permissions_to_creator(const service::clie
});
}
static void validate_timeout_options(const auth::authentication_options& auth_options) {
if (!auth_options.options) {
return;
}
const auto& options = *auth_options.options;
auto check_duration = [&] (const sstring& repr) {
data_value v = duration_type->deserialize(duration_type->from_string(repr));
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
if (duration.months || duration.days) {
throw exceptions::invalid_request_exception("Timeout values cannot be longer than 24h");
}
if (duration.nanoseconds % 1'000'000 != 0) {
throw exceptions::invalid_request_exception("Timeout values must be expressed in millisecond granularity");
}
};
for (auto opt : {"read_timeout", "write_timeout"}) {
auto it = options.find(opt);
if (it != options.end()) {
check_duration(it->second);
}
}
}
void create_role_statement::validate(service::storage_proxy& p, const service::client_state&) const {
validate_cluster_support(p);
}
@@ -137,9 +162,12 @@ create_role_statement::execute(service::storage_proxy&,
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
const auto& cs = state.get_client_state();
auto& as = *cs.get_auth_service();
validate_timeout_options(authen_options);
return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
return grant_permissions_to_creator(cs);
}).then([&state] () mutable {
return state.get_client_state().update_per_role_params();
}).then([] {
return void_result_message();
}).handle_exception_type([this](const auth::role_already_exists& e) {
@@ -224,8 +252,9 @@ alter_role_statement::execute(service::storage_proxy&, service::query_state& sta
extract_authentication_options(_options),
[this, &state](const auth::role_config_update& update, const auth::authentication_options& authen_options) {
auto& as = *state.get_client_state().get_auth_service();
return auth::alter_role(as, _role, update, authen_options).then([] {
return auth::alter_role(as, _role, update, authen_options).then([&state] () mutable {
return state.get_client_state().update_per_role_params();
}).then([] {
return void_result_message();
}).handle_exception_type([](const auth::nonexistant_role& e) {
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));

View File

@@ -366,7 +366,7 @@ select_statement::do_execute(service::storage_proxy& proxy,
}
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto timeout_duration = options.get_timeout_config().*get_timeout_config_selector();
auto timeout_duration = state.get_client_state().get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + timeout_duration;
auto p = service::pager::query_pagers::pager(_schema, _selection,
state, options, command, std::move(key_ranges), restrictions_need_filtering ? _restrictions : nullptr);
@@ -513,7 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
uint32_t queried_ranges_count = partition_ranges.size();
service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata_ptr(), _schema, std::move(partition_ranges));
@@ -607,7 +607,7 @@ indexed_table_select_statement::do_execute_base_query(
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
struct base_query_state {
query::result_merger merger;
@@ -689,7 +689,7 @@ select_statement::execute(service::storage_proxy& proxy,
// is specified we need to get "limit" rows from each partition since there
// is no way to tell which of these rows belong to the query result before
// doing post-query ordering.
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
if (needs_post_query_ordering() && _limit) {
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd, timeout](auto& prs) {
assert(cmd->partition_limit == query::max_partitions);
@@ -1250,7 +1250,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
{
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, false).then(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
@@ -1291,7 +1291,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
{
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, true).then(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {

View File

@@ -55,10 +55,18 @@ struct query_context {
// let the `storage_proxy` time out the query down the call chain
db::timeout_clock::duration::zero();
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) {
struct timeout_context {
std::unique_ptr<service::client_state> client_state;
service::query_state query_state;
timeout_context(db::timeout_clock::duration d)
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
, query_state(*client_state, empty_service_permit())
{}
};
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
return _qp.local().execute_internal(req,
cql3::query_options::DEFAULT.get_consistency(),
tcfg,
tctx.query_state,
{ data_value(std::forward<Args>(args))... },
true);
});

View File

@@ -3093,7 +3093,6 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
auto cm_fut = qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}
);
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
@@ -3136,7 +3135,6 @@ future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version ve
return qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}
).then([] (shared_ptr<cql3::untyped_result_set> results) {
return !results->empty();
@@ -3150,7 +3148,6 @@ future<> drop_column_mapping(utils::UUID table_id, table_schema_version version)
return qctx->qp().execute_internal(
DEL_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}).discard_result();
}

View File

@@ -155,17 +155,20 @@ future<> system_distributed_keyspace::stop() {
return make_ready_future<>();
}
static const timeout_config internal_distributed_timeout_config = [] {
static service::query_state& internal_distributed_query_state() {
using namespace std::chrono_literals;
const auto t = 10s;
return timeout_config{ t, t, t, t, t, t, t };
}();
static timeout_config tc{ t, t, t, t, t, t, t };
static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
static thread_local service::query_state qs(cs, empty_service_permit());
return qs;
};
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
return _qp.execute_internal(
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name) },
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
@@ -182,7 +185,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
return _qp.execute_internal(
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
false).discard_result();
});
@@ -193,7 +196,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
return _qp.execute_internal(
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
false).discard_result();
});
@@ -203,7 +206,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
return _qp.execute_internal(
format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ std::move(ks_name), std::move(view_name) },
false).discard_result();
}
@@ -281,7 +284,7 @@ system_distributed_keyspace::insert_cdc_topology_description(
return _qp.execute_internal(
format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) },
false).discard_result();
}
@@ -293,7 +296,7 @@ system_distributed_keyspace::read_cdc_topology_description(
return _qp.execute_internal(
format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ time },
false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> std::optional<cdc::topology_description> {
@@ -321,7 +324,7 @@ system_distributed_keyspace::expire_cdc_topology_description(
return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ expiration_time, streams_ts },
false).discard_result();
}
@@ -342,7 +345,7 @@ system_distributed_keyspace::create_cdc_desc(
return _qp.execute_internal(
format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ time, make_set_value(cdc_streams_set_type, prepare_cdc_streams(streams)) },
false).discard_result();
}
@@ -355,7 +358,7 @@ system_distributed_keyspace::expire_cdc_desc(
return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ expiration_time, streams_ts },
false).discard_result();
}
@@ -367,7 +370,7 @@ system_distributed_keyspace::cdc_desc_exists(
return _qp.execute_internal(
format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{ streams_ts },
false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> bool {
@@ -380,7 +383,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(context ctx) {
return _qp.execute_internal(
format("SELECT * FROM {}.{}", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
internal_distributed_query_state(),
{},
false
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {

33
docs/roles.md Normal file
View File

@@ -0,0 +1,33 @@
# Per-role parameters
Scylla allows configuring per-role parameters. The current list of parameters includes:
* `read_timeout` - custom timeout for read operations
* `write_timeout` - custom timeout for write operations
## Examples
In order to set up per-role parameters, one should use the already existing CQL API for roles:
```cql
CREATE ROLE example WITH options = {'read_timeout': 50ms}
```
or
```cql
ALTER ROLE example WITH options = {'read_timeout': 1s, 'write_timeout': 500ms}
```
Once a session with given role is established, it will use per-role timeouts instead of globally configured
timeouts, if there are any.
Role options can be viewed with the standard `LIST ROLES` statement:
```cql
LIST ROLES;
role | super | login | options
-----------+-------+-------+-----------------------------------------------------
cassandra | True | True | {'read_timeout': '50ms', 'write_timeout': '1000us'}
```
## Supported authenticators
Roles are currently supported in PasswordAuthenticator.

View File

@@ -60,7 +60,7 @@ public:
,_read_consistency(rcl)
,_write_consistency(wcl)
,_timeout_config(tc)
,_client_state(service::client_state::external_tag{}, auth, addr)
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
,_total_redis_db_count(total_redis_db_count)
{
}
@@ -75,7 +75,7 @@ public:
,_read_consistency(rcl)
,_write_consistency(wcl)
,_timeout_config(tc)
,_client_state(service::client_state::external_tag{}, auth, addr)
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
,_total_redis_db_count(total_redis_db_count)
{
}

View File

@@ -53,6 +53,7 @@
#include "db/system_distributed_keyspace.hh"
#include "database.hh"
#include "cdc/log.hh"
#include "concrete_types.hh"
thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0;
@@ -60,6 +61,56 @@ void service::client_state::set_login(auth::authenticated_user user) {
_user = std::move(user);
}
service::client_state::client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address, bool thrift)
: _is_internal(false)
, _is_thrift(thrift)
, _remote_address(remote_address)
, _auth_service(&auth_service)
, _default_timeout_config(timeout_config)
, _timeout_config(timeout_config) {
if (!auth_service.underlying_authenticator().require_authentication()) {
_user = auth::authenticated_user();
}
}
future<> service::client_state::update_per_role_params() {
if (!_user || auth::is_anonymous(*_user)) {
return make_ready_future<>();
}
//FIXME: replace with a coroutine once they're widely accepted
return seastar::async([this] {
auth::role_set roles = _auth_service->get_roles(*_user->name).get();
db::timeout_clock::duration read_timeout = db::timeout_clock::duration::max();
db::timeout_clock::duration write_timeout = db::timeout_clock::duration::max();
auto get_duration = [&] (const sstring& repr) {
data_value v = duration_type->deserialize(duration_type->from_string(repr));
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
return std::chrono::duration_cast<lowres_clock::duration>(std::chrono::nanoseconds(duration.nanoseconds));
};
for (const auto& role : roles) {
auto options = _auth_service->underlying_authenticator().query_custom_options(role).get();
auto read_timeout_it = options.find("read_timeout");
if (read_timeout_it != options.end()) {
read_timeout = std::min(read_timeout, get_duration(read_timeout_it->second));
}
auto write_timeout_it = options.find("write_timeout");
if (write_timeout_it != options.end()) {
write_timeout = std::min(write_timeout, get_duration(write_timeout_it->second));
}
}
_timeout_config.read_timeout = read_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.read_timeout : read_timeout;
_timeout_config.range_read_timeout = read_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.range_read_timeout : read_timeout;
_timeout_config.write_timeout = write_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.write_timeout : write_timeout;
_timeout_config.counter_write_timeout = write_timeout == db::timeout_clock::duration::max() ?
_default_timeout_config.counter_write_timeout : write_timeout;
});
}
future<> service::client_state::check_user_can_login() {
if (auth::is_anonymous(*_user)) {
return make_ready_future();

View File

@@ -44,6 +44,7 @@
#include "auth/service.hh"
#include "exceptions/exceptions.hh"
#include "unimplemented.hh"
#include "timeout_config.hh"
#include "timestamp.hh"
#include "db_clock.hh"
#include "database_fwd.hh"
@@ -88,10 +89,17 @@ public:
};
private:
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state),
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
_auth_service(auth_service ? &auth_service->local() : nullptr),
_enabled_protocol_extensions(cs->_enabled_protocol_extensions) {}
: _keyspace(cs->_keyspace)
, _user(cs->_user)
, _auth_state(cs->_auth_state)
, _is_internal(cs->_is_internal)
, _is_thrift(cs->_is_thrift)
, _remote_address(cs->_remote_address)
, _auth_service(auth_service ? &auth_service->local() : nullptr)
, _default_timeout_config(cs->_default_timeout_config)
, _timeout_config(cs->_timeout_config)
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
{}
friend client_state_for_another_shard;
private:
sstring _keyspace;
@@ -136,6 +144,10 @@ private:
// Only populated for external client state.
auth::service* _auth_service{nullptr};
// For restoring default values in the timeout config
timeout_config _default_timeout_config;
timeout_config _timeout_config;
public:
struct internal_tag {};
struct external_tag {};
@@ -162,15 +174,7 @@ public:
_driver_version = std::move(driver_version);
}
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
: _is_internal(false)
, _is_thrift(thrift)
, _remote_address(remote_address)
, _auth_service(&auth_service) {
if (!auth_service.underlying_authenticator().require_authentication()) {
_user = auth::authenticated_user();
}
}
client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address = socket_address(), bool thrift = false);
gms::inet_address get_client_address() const {
return gms::inet_address(_remote_address);
@@ -180,10 +184,19 @@ public:
return _remote_address.port();
}
client_state(internal_tag)
const timeout_config& get_timeout_config() const {
return _timeout_config;
}
client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config)
{}
client_state(internal_tag, const timeout_config& config)
: _keyspace("system")
, _is_internal(true)
, _is_thrift(false)
, _default_timeout_config(config)
, _timeout_config(config)
{}
client_state(const client_state&) = delete;
@@ -315,6 +328,7 @@ public:
auth::command_desc::type = auth::command_desc::type::OTHER) const;
future<> has_schema_access(const schema& s, auth::permission p) const;
future<> update_per_role_params();
private:
future<> has_access(const sstring& keyspace, auth::command_desc) const;

View File

@@ -44,6 +44,7 @@
#include "db/config.hh"
#include "cql3/query_processor.hh"
#include "types/map.hh"
SEASTAR_TEST_CASE(test_default_authenticator) {
return do_with_cql_env([](cql_test_env& env) {
@@ -216,3 +217,48 @@ SEASTAR_TEST_CASE(alter_opts_on_system_auth_tables) {
cquery_nofail(env, "ALTER TABLE system_auth.role_permissions WITH min_index_interval = 456");
}, auth_on());
}
SEASTAR_TEST_CASE(test_alter_with_timeouts) {
auto cfg = make_shared<db::config>();
cfg->authenticator(sstring(auth::password_authenticator_name));
return do_with_cql_env_thread([] (cql_test_env& e) {
auth::role_config config {
.can_login = true,
};
auth::authentication_options opts {
.password = "pass"
};
auth::create_role(e.local_auth_service(), "user", config, opts).get();
authenticate(e, "user", "pass").get();
cquery_nofail(e, "CREATE TABLE t (id int PRIMARY KEY, v int)");
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 5ms, 'write_timeout': 1h30m}");
auto my_map_type = map_type_impl::get_instance(utf8_type, utf8_type, false);
auto msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
assert_that(msg).is_rows().with_rows({{
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"read_timeout", "5ms"}, {"write_timeout", "1h30m"}}))),
}});
cquery_nofail(e, "ALTER ROLE user WITH options = {'write_timeout': 35s}");
msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
assert_that(msg).is_rows().with_rows({{
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"write_timeout", "35s"}}))),
}});
// Setting a timeout value of 0 makes little sense, but it's great for testing
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 0s, 'write_timeout': 0s}");
BOOST_REQUIRE_THROW(e.execute_cql("SELECT * FROM t").get(), exceptions::read_timeout_exception);
BOOST_REQUIRE_THROW(e.execute_cql("INSERT INTO t (id, v) VALUES (1,2)").get(), exceptions::mutation_write_failure_exception);
cquery_nofail(e, "ALTER ROLE user WITH options = {}");
cquery_nofail(e, "SELECT * FROM t");
cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)");
// Only valid timeout values are accepted
BOOST_REQUIRE_THROW(e.execute_cql("ALTER ROLE user WITH options = {'read_timeout': 'I am not a valid duration'}").get(), marshal_exception);
}, cfg);
}

View File

@@ -173,7 +173,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception);
auto make_query_options = [] (cql_protocol_version_type version) {
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt,
std::vector<cql3::raw_value_view>(), false,
cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version});
};

View File

@@ -3013,7 +3013,7 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get();
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0();
assert_that(res).is_rows().is_empty();
@@ -4444,7 +4444,6 @@ static std::unique_ptr<cql3::query_options> q_serial_opts(
const auto& so = cql3::query_options::specific_options::DEFAULT;
auto qo = std::make_unique<cql3::query_options>(
cl,
infinite_timeout_config,
values,
// Ensure (optional) serial consistency is always specified.
cql3::query_options::specific_options{
@@ -4634,7 +4633,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
int32_t page_size = is_paged ? 10000 : -1;
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;

View File

@@ -831,14 +831,14 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(6), boolean_type->decompose(false)},
});
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(3), boolean_type->decompose(true)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
@@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(6), boolean_type->decompose(false)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
@@ -857,7 +857,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(2), boolean_type->decompose(false)}
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
@@ -866,7 +866,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
{ int32_type->decompose(4), boolean_type->decompose(false)}
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg);
@@ -877,7 +877,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
// Some pages might be empty and in such case we should continue querying
size_t rows_fetched = 0;
while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg);
@@ -889,7 +889,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
rows_fetched = 0;
while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg);
@@ -905,7 +905,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
rows_fetched = 0;
uint64_t remaining = 1;
while (remaining > 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched += count_rows_fetched(msg);
@@ -964,7 +964,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
{ int32_type->decompose(1), boolean_type->decompose(false)},
});
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
@@ -972,7 +972,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
{ int32_type->decompose(3), boolean_type->decompose(true)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
// Some pages might be empty and in such case we should continue querying
size_t rows_fetched = 0;
while (rows_fetched == 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg);
@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
rows_fetched = 0;
uint64_t remaining = 1;
while (remaining > 0) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()});
sstring query = allow_filtering ?
fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) :

View File

@@ -36,7 +36,7 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
}
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(4321);

View File

@@ -63,7 +63,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
}
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("select * from test;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg);
@@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
paging_state->set_remaining(test_remaining);
while (has_more_pages(msg)) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg);
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
}
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0();
auto paging_state = extract_paging_state(msg);
@@ -113,7 +113,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
paging_state->set_remaining(test_remaining);
while (has_more_pages(msg)) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0();
rows_fetched = count_rows_fetched(msg);

View File

@@ -209,8 +209,7 @@ std::unordered_map<sstring, uint64_t> get_query_metrics() {
/// Creates query_options with cl, infinite timeout, and no named values.
auto make_options(clevel cl) {
return std::make_unique<cql3::query_options>(
cl, infinite_timeout_config, std::vector<cql3::raw_value>());
return std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>());
}
} // anonymous namespace

View File

@@ -45,7 +45,7 @@ std::unique_ptr<cql3::query_options> to_options(
static auto& d = cql3::query_options::DEFAULT;
return std::make_unique<cql3::query_options>(
cfg,
d.get_consistency(), d.get_timeout_config(), std::move(names), std::move(values), d.skip_metadata(),
d.get_consistency(), std::move(names), std::move(values), d.skip_metadata(),
d.get_specific_options(), d.get_cql_serialization_format());
}

View File

@@ -412,7 +412,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
}
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(101);
@@ -424,7 +424,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
});
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_rows({{
@@ -434,7 +434,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
});
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0();
assert_that(res).is_rows().with_rows({{
@@ -470,7 +470,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
};
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
@@ -480,7 +480,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
expect_more_pages(res, true);
@@ -490,7 +490,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
paging_state = extract_paging_state(res);
@@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
try {
expect_more_pages(res, false);
} catch (...) {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(0);
@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
});
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
@@ -524,7 +524,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
@@ -534,7 +534,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
});
{
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
@@ -551,7 +551,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
paging_state->get_rows_fetched_for_last_partition());
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
@@ -563,7 +563,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
// not to return rows (since no row matches an empty partition key)
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
@@ -802,7 +802,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
};
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
@@ -811,7 +811,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
@@ -821,7 +821,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
});
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
@@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
@@ -1158,7 +1158,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
}
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function.
@@ -1173,7 +1173,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({
@@ -1191,7 +1191,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
}
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function
@@ -1199,7 +1199,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
{ int32_type->decompose(row_count * row_count / 4)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({

View File

@@ -125,7 +125,7 @@ private:
service::client_state client_state;
core_local_state(auth::service& auth_service)
: client_state(service::client_state::external_tag{}, auth_service)
: client_state(service::client_state::external_tag{}, auth_service, infinite_timeout_config)
{
client_state.set_login(auth::authenticated_user(testing_superuser));
}
@@ -189,7 +189,7 @@ public:
db::consistency_level cl = db::consistency_level::ONE) override {
const auto& so = cql3::query_options::specific_options::DEFAULT;
auto options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config,
auto options = std::make_unique<cql3::query_options>(cl,
std::move(values), cql3::query_options::specific_options{
so.page_size,
so.state,
@@ -226,7 +226,7 @@ public:
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
}
auto& qo = cql3::query_options::DEFAULT;
auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout;
auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
.finally([qs, modif_stmt = std::move(modif_stmt)] {});

View File

@@ -228,7 +228,7 @@ SEASTAR_TEST_CASE(scan_enormous_table_test) {
std::unique_ptr<cql3::query_options> qo;
uint64_t fetched_rows_log_counter = 1e7;
do {
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()});
msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0();
rows_fetched += count_rows_fetched(msg);

View File

@@ -104,7 +104,6 @@ std::unique_ptr<cql3::query_options> repl_options() {
const auto& so = cql3::query_options::specific_options::DEFAULT;
auto qo = std::make_unique<cql3::query_options>(
db::consistency_level::ONE,
infinite_timeout_config,
std::vector<cql3::raw_value>{},
// Ensure (optional) serial consistency is always specified.
cql3::query_options::specific_options{

View File

@@ -201,9 +201,9 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
::timeout_config _timeout_config;
private:
template <typename Cob, typename Func>
void
@@ -220,9 +220,9 @@ public:
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
: _db(db)
, _query_processor(qp)
, _client_state(service::client_state::external_tag{}, auth_service, socket_address(), true)
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true)
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
{ }
const sstring& current_keyspace() const {
@@ -976,7 +976,7 @@ public:
throw make_exception<InvalidRequestException>("Compressed query strings are not supported");
}
auto& qp = _query_processor.local();
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(),
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::vector<cql3::raw_value_view>(),
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
auto f = qp.execute_direct(query, _query_state, *opts);
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
@@ -1056,7 +1056,7 @@ public:
return cql3::raw_value::make_value(to_bytes(s));
});
auto& qp = _query_processor.local();
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values),
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::move(bytes_values),
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization);
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {

View File

@@ -63,9 +63,13 @@ const sstring trace_keyspace_helper::EVENTS("events");
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log");
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx");
timeout_config tracing_db_timeout_config {
5s, 5s, 5s, 5s, 5s, 5s, 5s,
};
static service::client_state& tracing_client_state() {
static timeout_config tracing_db_timeout_config {
5s, 5s, 5s, 5s, 5s, 5s, 5s,
};
static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config);
return s;
}
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
int64_t last_nanos = 0;
@@ -75,7 +79,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
: i_tracing_backend_helper(tr)
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit())
, _dummy_query_state(tracing_client_state(), empty_service_permit())
, _sessions(KEYSPACE_NAME, SESSIONS,
sprint("CREATE TABLE IF NOT EXISTS %s.%s ("
"session_id uuid,"
@@ -313,7 +317,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
};
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
}
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
@@ -331,7 +335,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
};
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
}
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
@@ -374,7 +378,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
});
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
}
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
@@ -395,7 +399,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
});
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
}
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
@@ -431,7 +435,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
return do_with(
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
[this] (auto& batch_options, auto& batch) {
return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });

View File

@@ -219,10 +219,10 @@ private:
options_flag::NAMES_FOR_VALUES
>;
public:
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) {
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const cql3::cql_config& cql_config) {
auto consistency = read_consistency();
if (version == 1) {
return std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{},
return std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::vector<cql3::raw_value_view>{},
false, cql3::query_options::specific_options::DEFAULT, cql_ser_format);
}
@@ -270,11 +270,11 @@ public:
if (!names.empty()) {
onames = std::move(names);
}
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata,
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::move(onames), std::move(values), skip_metadata,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts},
cql_ser_format);
} else {
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata,
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::move(values), skip_metadata,
cql3::query_options::specific_options::DEFAULT, cql_ser_format);
}

View File

@@ -563,7 +563,7 @@ cql_server::connection::connection(cql_server& server, socket_address server_add
, _fd(std::move(fd))
, _read_buf(_fd.input())
, _write_buf(_fd.output())
, _client_state(service::client_state::external_tag{}, server._auth_service, addr)
, _client_state(service::client_state::external_tag{}, server._auth_service, server.timeout_config(), addr)
{
++_server._total_connections;
++_server._current_connections;
@@ -855,6 +855,9 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username)));
});
}
f = f.then([&client_state] {
return client_state.update_per_role_params();
});
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
});
@@ -891,7 +894,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
request_reader in(is, linearization_buffer);
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
server.timeout_config(), /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
/* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
// result here has to be foreign ptr
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
});
@@ -906,7 +909,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
fragmented_temporary_buffer::istream is = in.get_stream();
return process_fn(client_state, _server._query_processor, in, stream,
_version, _cql_serialization_format, _server.timeout_config(), permit, trace_state, true)
_version, _cql_serialization_format, permit, trace_state, true)
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
unsigned* shard = std::get_if<unsigned>(&msg);
@@ -920,12 +923,12 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state,
service_permit permit, tracing::trace_state_ptr trace_state,
bool init_trace) {
auto query = in.read_long_string_view();
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata();
@@ -988,8 +991,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit,
tracing::trace_state_ptr trace_state, bool init_trace) {
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
bool needs_authorization = false;
@@ -1012,10 +1014,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
std::vector<cql3::raw_value_view> values;
in.read_value_view_list(version, values);
auto consistency = in.read_consistency();
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, timeout_config, std::nullopt, values, false,
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, std::nullopt, values, false,
cql3::query_options::specific_options::DEFAULT, serialization_format);
} else {
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
}
auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata();
@@ -1068,8 +1070,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const ::timeout_config& timeout_config, service_permit permit,
tracing::trace_state_ptr trace_state, bool init_trace) {
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
if (version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
}
@@ -1158,7 +1159,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
auto& query_state = q_state->query_state;
// #563. CQL v2 encodes query_options in v1 format for batch requests.
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,
timeout_config, qp.local().get_cql_config())), std::move(values)));
qp.local().get_cql_config())), std::move(values)));
auto& options = *q_state->options;
if (init_trace) {