Compare commits
10 Commits
next-5.0
...
add_alter_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fcb349b026 | ||
|
|
28c558af95 | ||
|
|
83b47ae394 | ||
|
|
391d1f2b21 | ||
|
|
137a8a0161 | ||
|
|
c473cb4a2d | ||
|
|
98fac66361 | ||
|
|
2cbeb3678f | ||
|
|
d61e1fd174 | ||
|
|
f31ac0a8ca |
@@ -129,8 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
|
||||
auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name);
|
||||
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
auto& timeout = auth::internal_distributed_timeout_config();
|
||||
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
return qp.execute_internal(query, cl, auth::internal_distributed_query_state(), {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
auto res = f.get0();
|
||||
auto salted_hash = std::optional<sstring>();
|
||||
if (res->empty()) {
|
||||
|
||||
@@ -2845,7 +2845,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
|
||||
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
|
||||
auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
|
||||
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
|
||||
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ future<> wait_for_schema_agreement(::service::migration_manager& mm, const datab
|
||||
});
|
||||
}
|
||||
|
||||
const timeout_config& internal_distributed_timeout_config() noexcept {
|
||||
::service::query_state& internal_distributed_query_state() noexcept {
|
||||
#ifdef DEBUG
|
||||
// Give the much slower debug tests more headroom for completing auth queries.
|
||||
static const auto t = 30s;
|
||||
@@ -116,7 +116,9 @@ const timeout_config& internal_distributed_timeout_config() noexcept {
|
||||
static const auto t = 5s;
|
||||
#endif
|
||||
static const timeout_config tc{t, t, t, t, t, t, t};
|
||||
return tc;
|
||||
static thread_local ::service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
static thread_local ::service::query_state qs(cs, empty_service_permit());
|
||||
return qs;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "log.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "service/query_state.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -87,6 +88,6 @@ future<> wait_for_schema_agreement(::service::migration_manager&, const database
|
||||
///
|
||||
/// Time-outs for internal, non-local CQL queries.
|
||||
///
|
||||
const timeout_config& internal_distributed_timeout_config() noexcept;
|
||||
::service::query_state& internal_distributed_query_state() noexcept;
|
||||
|
||||
}
|
||||
|
||||
@@ -103,7 +103,6 @@ future<bool> default_authorizer::any_granted() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{},
|
||||
true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
@@ -116,8 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
return do_with(
|
||||
row.get_as<sstring>("username"),
|
||||
@@ -197,7 +195,6 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return permissions::NONE;
|
||||
@@ -226,7 +223,7 @@ default_authorizer::modify(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -251,7 +248,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
std::vector<permission_details> all_details;
|
||||
@@ -278,7 +275,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
@@ -298,7 +295,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
try {
|
||||
auto res = f.get0();
|
||||
@@ -315,7 +311,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
|
||||
@@ -66,6 +66,7 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
|
||||
|
||||
// name of the hash column.
|
||||
static constexpr std::string_view SALTED_HASH = "salted_hash";
|
||||
static constexpr std::string_view OPTIONS = "options";
|
||||
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
|
||||
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
|
||||
|
||||
@@ -114,7 +115,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
auto username = row.get_as<sstring>("username");
|
||||
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
|
||||
@@ -122,7 +123,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
consistency_for_user(username),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{std::move(salted_hash), username}).discard_result();
|
||||
}).finally([results] {});
|
||||
}).then([] {
|
||||
@@ -139,7 +140,7 @@ future<> password_authenticator::create_default_if_missing() const {
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
});
|
||||
@@ -203,11 +204,11 @@ bool password_authenticator::require_authentication() const {
|
||||
}
|
||||
|
||||
authentication_option_set password_authenticator::supported_options() const {
|
||||
return authentication_option_set{authentication_option::password};
|
||||
return authentication_option_set{authentication_option::password, authentication_option::options};
|
||||
}
|
||||
|
||||
authentication_option_set password_authenticator::alterable_options() const {
|
||||
return authentication_option_set{authentication_option::password};
|
||||
return authentication_option_set{authentication_option::password, authentication_option::options};
|
||||
}
|
||||
|
||||
future<authenticated_user> password_authenticator::authenticate(
|
||||
@@ -236,7 +237,7 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_user(username),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{username},
|
||||
true);
|
||||
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
@@ -262,21 +263,46 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
});
|
||||
}
|
||||
|
||||
future<> password_authenticator::maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const {
|
||||
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
|
||||
meta::roles_table::qualified_name,
|
||||
OPTIONS,
|
||||
meta::roles_table::role_col_name);
|
||||
|
||||
if (!options.options) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
std::vector<std::pair<data_value, data_value>> entries;
|
||||
for (const auto& entry : *options.options) {
|
||||
entries.push_back({data_value(entry.first), data_value(entry.second)});
|
||||
}
|
||||
auto map_value = make_map_value(map_type_impl::get_instance(utf8_type, utf8_type, false), entries);
|
||||
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_user(role_name),
|
||||
internal_distributed_query_state(),
|
||||
{std::move(map_value), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
future<> password_authenticator::create(std::string_view role_name, const authentication_options& options) const {
|
||||
if (!options.password) {
|
||||
return make_ready_future<>();
|
||||
return maybe_update_custom_options(role_name, options);
|
||||
}
|
||||
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
consistency_for_user(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
|
||||
return maybe_update_custom_options(role_name, options);
|
||||
});
|
||||
}
|
||||
|
||||
future<> password_authenticator::alter(std::string_view role_name, const authentication_options& options) const {
|
||||
if (!options.password) {
|
||||
return make_ready_future<>();
|
||||
return maybe_update_custom_options(role_name, options);
|
||||
}
|
||||
|
||||
static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
|
||||
@@ -287,8 +313,10 @@ future<> password_authenticator::alter(std::string_view role_name, const authent
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_user(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result().then([this, role_name, &options] {
|
||||
return maybe_update_custom_options(role_name, options);
|
||||
}).discard_result();
|
||||
}
|
||||
|
||||
future<> password_authenticator::drop(std::string_view name) const {
|
||||
@@ -299,12 +327,27 @@ future<> password_authenticator::drop(std::string_view name) const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
query, consistency_for_user(name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(name)}).discard_result();
|
||||
}
|
||||
|
||||
future<custom_options> password_authenticator::query_custom_options(std::string_view role_name) const {
|
||||
return make_ready_future<custom_options>();
|
||||
static const sstring query = format("SELECT {} FROM {} WHERE {} = ?",
|
||||
OPTIONS,
|
||||
meta::roles_table::qualified_name,
|
||||
meta::roles_table::role_col_name);
|
||||
|
||||
return _qp.execute_internal(
|
||||
query, consistency_for_user(role_name),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).then([](::shared_ptr<cql3::untyped_result_set> rs) {
|
||||
custom_options opts;
|
||||
const auto& row = rs->one();
|
||||
if (row.has(OPTIONS)) {
|
||||
row.get_map_data<sstring, sstring>(OPTIONS, std::inserter(opts, opts.end()), utf8_type, utf8_type);
|
||||
}
|
||||
return opts;
|
||||
});
|
||||
}
|
||||
|
||||
const resource_set& password_authenticator::protected_resources() const {
|
||||
|
||||
@@ -94,6 +94,8 @@ public:
|
||||
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
|
||||
|
||||
private:
|
||||
future<> maybe_update_custom_options(std::string_view role_name, const authentication_options& options) const;
|
||||
|
||||
bool legacy_metadata_exists() const;
|
||||
|
||||
future<> migrate_legacy_metadata() const;
|
||||
|
||||
@@ -43,7 +43,8 @@ std::string_view creation_query() {
|
||||
" can_login boolean,"
|
||||
" is_superuser boolean,"
|
||||
" member_of set<text>,"
|
||||
" salted_hash text"
|
||||
" salted_hash text,"
|
||||
" options frozen<map<text, text>>,"
|
||||
")",
|
||||
qualified_name,
|
||||
role_col_name);
|
||||
@@ -68,14 +69,13 @@ future<bool> default_role_row_satisfies(
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -100,7 +100,7 @@ future<bool> any_nondefault_role_row_satisfies(
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -210,7 +210,6 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.execute_internal(
|
||||
default_user_query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -220,7 +219,6 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.execute_internal(
|
||||
default_user_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -229,8 +227,7 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
all_users_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([](auto results) {
|
||||
db::consistency_level::QUORUM).then([](auto results) {
|
||||
return make_ready_future<bool>(!results->empty());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -86,7 +86,7 @@ static future<std::optional<record>> find_record(cql3::query_processor& qp, std:
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -165,7 +165,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
|
||||
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
|
||||
return make_ready_future<>();
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
role_config config;
|
||||
config.is_superuser = row.get_or<bool>("super", false);
|
||||
@@ -253,7 +253,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), c.is_superuser, c.can_login},
|
||||
true).discard_result();
|
||||
}
|
||||
@@ -296,7 +296,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
|
||||
build_column_assignments(u),
|
||||
meta::roles_table::role_col_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -315,7 +315,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
|
||||
return parallel_for_each(
|
||||
members->begin(),
|
||||
@@ -354,7 +354,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -381,7 +381,7 @@ standard_role_manager::modify_membership(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(grantee_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -392,7 +392,7 @@ standard_role_manager::modify_membership(
|
||||
format("INSERT INTO {} (role, member) VALUES (?, ?)",
|
||||
meta::role_members_table::qualified_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
|
||||
case membership_change::remove:
|
||||
@@ -400,7 +400,7 @@ standard_role_manager::modify_membership(
|
||||
format("DELETE FROM {} WHERE role = ? AND member = ?",
|
||||
meta::role_members_table::qualified_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -503,7 +503,7 @@ future<role_set> standard_role_manager::query_all() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
role_set roles;
|
||||
|
||||
std::transform(
|
||||
|
||||
@@ -50,12 +50,11 @@ const cql_config default_cql_config;
|
||||
thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp};
|
||||
|
||||
thread_local query_options query_options::DEFAULT{default_cql_config,
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
db::consistency_level::ONE, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()};
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
@@ -64,7 +63,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _value_views(value_views)
|
||||
@@ -76,7 +74,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
@@ -84,7 +81,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _value_views()
|
||||
@@ -97,7 +93,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
bool skip_metadata,
|
||||
@@ -105,7 +100,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values()
|
||||
, _value_views(std::move(value_views))
|
||||
@@ -115,12 +109,11 @@ query_options::query_options(const cql_config& cfg,
|
||||
{
|
||||
}
|
||||
|
||||
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values,
|
||||
query_options::query_options(db::consistency_level cl, std::vector<cql3::raw_value> values,
|
||||
specific_options options)
|
||||
: query_options(
|
||||
default_cql_config,
|
||||
cl,
|
||||
timeout_config,
|
||||
{},
|
||||
std::move(values),
|
||||
false,
|
||||
@@ -133,7 +126,6 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state)
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
std::move(qo->_value_views),
|
||||
@@ -146,7 +138,6 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
std::move(qo->_value_views),
|
||||
@@ -158,7 +149,7 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
|
||||
|
||||
query_options::query_options(std::vector<cql3::raw_value> values)
|
||||
: query_options(
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
|
||||
db::consistency_level::ONE, std::move(values))
|
||||
{}
|
||||
|
||||
void query_options::prepare(const std::vector<lw_shared_ptr<column_specification>>& specs)
|
||||
|
||||
@@ -51,7 +51,6 @@
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "cql_serialization_format.hh"
|
||||
#include "timeout_config.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -75,7 +74,6 @@ public:
|
||||
private:
|
||||
const cql_config& _cql_config;
|
||||
const db::consistency_level _consistency;
|
||||
const timeout_config& _timeout_config;
|
||||
const std::optional<std::vector<sstring_view>> _names;
|
||||
std::vector<cql3::raw_value> _values;
|
||||
std::vector<cql3::raw_value_view> _value_views;
|
||||
@@ -109,7 +107,6 @@ public:
|
||||
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
@@ -117,7 +114,6 @@ public:
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
@@ -126,7 +122,6 @@ public:
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
bool skip_metadata,
|
||||
@@ -158,13 +153,10 @@ public:
|
||||
|
||||
// forInternalUse
|
||||
explicit query_options(std::vector<cql3::raw_value> values);
|
||||
explicit query_options(db::consistency_level, const timeout_config& timeouts,
|
||||
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
|
||||
explicit query_options(db::consistency_level, std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
|
||||
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state);
|
||||
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
|
||||
|
||||
const timeout_config& get_timeout_config() const { return _timeout_config; }
|
||||
|
||||
db::consistency_level get_consistency() const {
|
||||
return _consistency;
|
||||
}
|
||||
@@ -258,7 +250,7 @@ query_options::query_options(query_options&& o, std::vector<OneMutationDataRange
|
||||
std::vector<query_options> tmp;
|
||||
tmp.reserve(values_ranges.size());
|
||||
std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) {
|
||||
return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
return query_options(_cql_config, _consistency, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
});
|
||||
_batch_options = std::move(tmp);
|
||||
}
|
||||
|
||||
@@ -619,7 +619,6 @@ query_options query_processor::make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>& values,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size) const {
|
||||
if (p->bound_names.size() != values.size()) {
|
||||
throw std::invalid_argument(
|
||||
@@ -643,11 +642,10 @@ query_options query_processor::make_internal_options(
|
||||
api::timestamp_type ts = api::missing_timestamp;
|
||||
return query_options(
|
||||
cl,
|
||||
timeout_config,
|
||||
bound_values,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
|
||||
}
|
||||
return query_options(cl, timeout_config, bound_values);
|
||||
return query_options(cl, bound_values);
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
|
||||
@@ -671,7 +669,7 @@ struct internal_query_state {
|
||||
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
|
||||
const std::initializer_list<data_value>& values, int32_t page_size) {
|
||||
auto p = prepare_internal(query_string);
|
||||
auto opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config, page_size);
|
||||
auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
|
||||
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
|
||||
internal_query_state{
|
||||
query_string,
|
||||
@@ -789,7 +787,16 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
bool cache) {
|
||||
return execute_internal(query_string, cl, *_internal_state, values, cache);
|
||||
}
|
||||
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& values,
|
||||
bool cache) {
|
||||
|
||||
@@ -797,13 +804,13 @@ query_processor::execute_internal(
|
||||
log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values));
|
||||
}
|
||||
if (cache) {
|
||||
return execute_with_params(prepare_internal(query_string), cl, timeout_config, values);
|
||||
return execute_with_params(prepare_internal(query_string), cl, query_state, values);
|
||||
} else {
|
||||
auto p = parse_statement(query_string)->prepare(_db, _cql_stats);
|
||||
p->statement->raw_cql_statement = query_string;
|
||||
p->statement->validate(_proxy, *_internal_state);
|
||||
auto checked_weak_ptr = p->checked_weak_from_this();
|
||||
return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
|
||||
return execute_with_params(std::move(checked_weak_ptr), cl, query_state, values).finally([p = std::move(p)] {});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -811,11 +818,11 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_with_params(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& values) {
|
||||
auto opts = make_internal_options(p, values, cl, timeout_config);
|
||||
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) {
|
||||
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) {
|
||||
auto opts = make_internal_options(p, values, cl);
|
||||
return do_with(std::move(opts), [this, &query_state, p = std::move(p)](auto & opts) {
|
||||
return p->statement->execute(_proxy, query_state, opts).then([](auto msg) {
|
||||
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
|
||||
});
|
||||
});
|
||||
|
||||
@@ -215,8 +215,7 @@ public:
|
||||
// creating namespaces, etc) is explicitly forbidden via this interface.
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
|
||||
return execute_internal(query_string, db::consistency_level::ONE,
|
||||
infinite_timeout_config, values, true);
|
||||
return execute_internal(query_string, db::consistency_level::ONE, values, true);
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
|
||||
@@ -305,14 +304,19 @@ public:
|
||||
future<::shared_ptr<untyped_result_set>> execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& = { },
|
||||
bool cache = false);
|
||||
future<::shared_ptr<untyped_result_set>> execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& = { },
|
||||
bool cache = false);
|
||||
|
||||
future<::shared_ptr<untyped_result_set>> execute_with_params(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& = { });
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
||||
@@ -341,7 +345,6 @@ private:
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>&,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size = -1) const;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
|
||||
@@ -286,7 +286,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
++_stats.batches;
|
||||
_stats.statements_in_batches += _statements.size();
|
||||
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + query_state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
return get_mutations(storage, options, timeout, local, now, query_state).then([this, &storage, &options, timeout, tr_state = query_state.get_trace_state(),
|
||||
permit = query_state.get_permit()] (std::vector<mutation> ms) mutable {
|
||||
return execute_without_conditions(storage, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
|
||||
@@ -343,7 +343,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
schema_ptr schema;
|
||||
|
||||
db::timeout_clock::time_point now = db::timeout_clock::now();
|
||||
const timeout_config& cfg = options.get_timeout_config();
|
||||
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
|
||||
auto batch_timeout = now + cfg.write_timeout; // Statement timeout.
|
||||
auto cas_timeout = now + cfg.cas_timeout; // Ballot contention timeout.
|
||||
auto read_timeout = now + cfg.read_timeout; // Query timeout.
|
||||
|
||||
@@ -286,7 +286,7 @@ modification_statement::do_execute(service::storage_proxy& proxy, service::query
|
||||
future<>
|
||||
modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) const {
|
||||
auto cl = options.get_consistency();
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + qs.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
return get_mutations(proxy, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &proxy, &qs] (auto mutations) {
|
||||
if (mutations.empty()) {
|
||||
return now();
|
||||
@@ -302,7 +302,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
|
||||
auto cl_for_learn = options.get_consistency();
|
||||
auto cl_for_paxos = options.check_serial_consistency();
|
||||
db::timeout_clock::time_point now = db::timeout_clock::now();
|
||||
const timeout_config& cfg = options.get_timeout_config();
|
||||
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
|
||||
|
||||
auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout.
|
||||
auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention.
|
||||
|
||||
@@ -59,6 +59,7 @@
|
||||
#include "gms/feature_service.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -105,6 +106,30 @@ future<> create_role_statement::grant_permissions_to_creator(const service::clie
|
||||
});
|
||||
}
|
||||
|
||||
static void validate_timeout_options(const auth::authentication_options& auth_options) {
|
||||
if (!auth_options.options) {
|
||||
return;
|
||||
}
|
||||
const auto& options = *auth_options.options;
|
||||
auto check_duration = [&] (const sstring& repr) {
|
||||
data_value v = duration_type->deserialize(duration_type->from_string(repr));
|
||||
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
|
||||
if (duration.months || duration.days) {
|
||||
throw exceptions::invalid_request_exception("Timeout values cannot be longer than 24h");
|
||||
}
|
||||
if (duration.nanoseconds % 1'000'000 != 0) {
|
||||
throw exceptions::invalid_request_exception("Timeout values must be expressed in millisecond granularity");
|
||||
}
|
||||
};
|
||||
|
||||
for (auto opt : {"read_timeout", "write_timeout"}) {
|
||||
auto it = options.find(opt);
|
||||
if (it != options.end()) {
|
||||
check_duration(it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void create_role_statement::validate(service::storage_proxy& p, const service::client_state&) const {
|
||||
validate_cluster_support(p);
|
||||
}
|
||||
@@ -137,9 +162,12 @@ create_role_statement::execute(service::storage_proxy&,
|
||||
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
|
||||
const auto& cs = state.get_client_state();
|
||||
auto& as = *cs.get_auth_service();
|
||||
validate_timeout_options(authen_options);
|
||||
|
||||
return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
|
||||
return grant_permissions_to_creator(cs);
|
||||
}).then([&state] () mutable {
|
||||
return state.get_client_state().update_per_role_params();
|
||||
}).then([] {
|
||||
return void_result_message();
|
||||
}).handle_exception_type([this](const auth::role_already_exists& e) {
|
||||
@@ -224,8 +252,9 @@ alter_role_statement::execute(service::storage_proxy&, service::query_state& sta
|
||||
extract_authentication_options(_options),
|
||||
[this, &state](const auth::role_config_update& update, const auth::authentication_options& authen_options) {
|
||||
auto& as = *state.get_client_state().get_auth_service();
|
||||
|
||||
return auth::alter_role(as, _role, update, authen_options).then([] {
|
||||
return auth::alter_role(as, _role, update, authen_options).then([&state] () mutable {
|
||||
return state.get_client_state().update_per_role_params();
|
||||
}).then([] {
|
||||
return void_result_message();
|
||||
}).handle_exception_type([](const auth::nonexistant_role& e) {
|
||||
return make_exception_future<result_message_ptr>(exceptions::invalid_request_exception(e.what()));
|
||||
|
||||
@@ -366,7 +366,7 @@ select_statement::do_execute(service::storage_proxy& proxy,
|
||||
}
|
||||
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto timeout_duration = options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout_duration = state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + timeout_duration;
|
||||
auto p = service::pager::query_pagers::pager(_schema, _selection,
|
||||
state, options, command, std::move(key_ranges), restrictions_need_filtering ? _restrictions : nullptr);
|
||||
@@ -513,7 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
|
||||
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
|
||||
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
uint32_t queried_ranges_count = partition_ranges.size();
|
||||
service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata_ptr(), _schema, std::move(partition_ranges));
|
||||
|
||||
@@ -607,7 +607,7 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
|
||||
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
|
||||
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
|
||||
struct base_query_state {
|
||||
query::result_merger merger;
|
||||
@@ -689,7 +689,7 @@ select_statement::execute(service::storage_proxy& proxy,
|
||||
// is specified we need to get "limit" rows from each partition since there
|
||||
// is no way to tell which of these rows belong to the query result before
|
||||
// doing post-query ordering.
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd, timeout](auto& prs) {
|
||||
assert(cmd->partition_limit == query::max_partitions);
|
||||
@@ -1250,7 +1250,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
|
||||
{
|
||||
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, false).then(
|
||||
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
auto rs = cql3::untyped_result_set(rows);
|
||||
@@ -1291,7 +1291,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
|
||||
{
|
||||
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
|
||||
auto timeout = db::timeout_clock::now() + state.get_client_state().get_timeout_config().*get_timeout_config_selector();
|
||||
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, true).then(
|
||||
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
|
||||
|
||||
@@ -55,10 +55,18 @@ struct query_context {
|
||||
// let the `storage_proxy` time out the query down the call chain
|
||||
db::timeout_clock::duration::zero();
|
||||
|
||||
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) {
|
||||
struct timeout_context {
|
||||
std::unique_ptr<service::client_state> client_state;
|
||||
service::query_state query_state;
|
||||
timeout_context(db::timeout_clock::duration d)
|
||||
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
|
||||
, query_state(*client_state, empty_service_permit())
|
||||
{}
|
||||
};
|
||||
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
|
||||
return _qp.local().execute_internal(req,
|
||||
cql3::query_options::DEFAULT.get_consistency(),
|
||||
tcfg,
|
||||
tctx.query_state,
|
||||
{ data_value(std::forward<Args>(args))... },
|
||||
true);
|
||||
});
|
||||
|
||||
@@ -3093,7 +3093,6 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
|
||||
auto cm_fut = qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}
|
||||
);
|
||||
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
@@ -3136,7 +3135,6 @@ future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version ve
|
||||
return qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}
|
||||
).then([] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
@@ -3150,7 +3148,6 @@ future<> drop_column_mapping(utils::UUID table_id, table_schema_version version)
|
||||
return qctx->qp().execute_internal(
|
||||
DEL_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}).discard_result();
|
||||
}
|
||||
|
||||
|
||||
@@ -155,17 +155,20 @@ future<> system_distributed_keyspace::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static const timeout_config internal_distributed_timeout_config = [] {
|
||||
static service::query_state& internal_distributed_query_state() {
|
||||
using namespace std::chrono_literals;
|
||||
const auto t = 10s;
|
||||
return timeout_config{ t, t, t, t, t, t, t };
|
||||
}();
|
||||
static timeout_config tc{ t, t, t, t, t, t, t };
|
||||
static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
|
||||
static thread_local service::query_state qs(cs, empty_service_permit());
|
||||
return qs;
|
||||
};
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
|
||||
return _qp.execute_internal(
|
||||
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
|
||||
@@ -182,7 +185,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
|
||||
return _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -193,7 +196,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -203,7 +206,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
return _qp.execute_internal(
|
||||
format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -281,7 +284,7 @@ system_distributed_keyspace::insert_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -293,7 +296,7 @@ system_distributed_keyspace::read_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ time },
|
||||
false
|
||||
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> std::optional<cdc::topology_description> {
|
||||
@@ -321,7 +324,7 @@ system_distributed_keyspace::expire_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ expiration_time, streams_ts },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -342,7 +345,7 @@ system_distributed_keyspace::create_cdc_desc(
|
||||
return _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ time, make_set_value(cdc_streams_set_type, prepare_cdc_streams(streams)) },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -355,7 +358,7 @@ system_distributed_keyspace::expire_cdc_desc(
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ expiration_time, streams_ts },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -367,7 +370,7 @@ system_distributed_keyspace::cdc_desc_exists(
|
||||
return _qp.execute_internal(
|
||||
format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ streams_ts },
|
||||
false
|
||||
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> bool {
|
||||
@@ -380,7 +383,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(context ctx) {
|
||||
return _qp.execute_internal(
|
||||
format("SELECT * FROM {}.{}", NAME, CDC_DESC),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{},
|
||||
false
|
||||
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
|
||||
33
docs/roles.md
Normal file
33
docs/roles.md
Normal file
@@ -0,0 +1,33 @@
|
||||
# Per-role parameters
|
||||
|
||||
Scylla allows configuring per-role parameters. The current list of parameters includes:
|
||||
* `read_timeout` - custom timeout for read operations
|
||||
* `write_timeout` - custom timeout for write operations
|
||||
|
||||
## Examples
|
||||
|
||||
In order to set up per-role parameters, one should use the already existing CQL API for roles:
|
||||
```cql
|
||||
CREATE ROLE example WITH options = {'read_timeout': 50ms}
|
||||
```
|
||||
or
|
||||
```cql
|
||||
ALTER ROLE example WITH options = {'read_timeout': 1s, 'write_timeout': 500ms}
|
||||
```
|
||||
|
||||
Once a session with given role is established, it will use per-role timeouts instead of globally configured
|
||||
timeouts, if there are any.
|
||||
|
||||
Role options can be viewed with the standard `LIST ROLES` statement:
|
||||
```cql
|
||||
LIST ROLES;
|
||||
|
||||
role | super | login | options
|
||||
-----------+-------+-------+-----------------------------------------------------
|
||||
cassandra | True | True | {'read_timeout': '50ms', 'write_timeout': '1000us'}
|
||||
|
||||
```
|
||||
|
||||
## Supported authenticators
|
||||
|
||||
Roles are currently supported in PasswordAuthenticator.
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
@@ -75,7 +75,7 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "database.hh"
|
||||
#include "cdc/log.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
thread_local api::timestamp_type service::client_state::_last_timestamp_micros = 0;
|
||||
|
||||
@@ -60,6 +61,56 @@ void service::client_state::set_login(auth::authenticated_user user) {
|
||||
_user = std::move(user);
|
||||
}
|
||||
|
||||
service::client_state::client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address, bool thrift)
|
||||
: _is_internal(false)
|
||||
, _is_thrift(thrift)
|
||||
, _remote_address(remote_address)
|
||||
, _auth_service(&auth_service)
|
||||
, _default_timeout_config(timeout_config)
|
||||
, _timeout_config(timeout_config) {
|
||||
if (!auth_service.underlying_authenticator().require_authentication()) {
|
||||
_user = auth::authenticated_user();
|
||||
}
|
||||
}
|
||||
|
||||
future<> service::client_state::update_per_role_params() {
|
||||
if (!_user || auth::is_anonymous(*_user)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
//FIXME: replace with a coroutine once they're widely accepted
|
||||
return seastar::async([this] {
|
||||
auth::role_set roles = _auth_service->get_roles(*_user->name).get();
|
||||
db::timeout_clock::duration read_timeout = db::timeout_clock::duration::max();
|
||||
db::timeout_clock::duration write_timeout = db::timeout_clock::duration::max();
|
||||
|
||||
auto get_duration = [&] (const sstring& repr) {
|
||||
data_value v = duration_type->deserialize(duration_type->from_string(repr));
|
||||
cql_duration duration = static_pointer_cast<const duration_type_impl>(duration_type)->from_value(v);
|
||||
return std::chrono::duration_cast<lowres_clock::duration>(std::chrono::nanoseconds(duration.nanoseconds));
|
||||
};
|
||||
|
||||
for (const auto& role : roles) {
|
||||
auto options = _auth_service->underlying_authenticator().query_custom_options(role).get();
|
||||
auto read_timeout_it = options.find("read_timeout");
|
||||
if (read_timeout_it != options.end()) {
|
||||
read_timeout = std::min(read_timeout, get_duration(read_timeout_it->second));
|
||||
}
|
||||
auto write_timeout_it = options.find("write_timeout");
|
||||
if (write_timeout_it != options.end()) {
|
||||
write_timeout = std::min(write_timeout, get_duration(write_timeout_it->second));
|
||||
}
|
||||
}
|
||||
_timeout_config.read_timeout = read_timeout == db::timeout_clock::duration::max() ?
|
||||
_default_timeout_config.read_timeout : read_timeout;
|
||||
_timeout_config.range_read_timeout = read_timeout == db::timeout_clock::duration::max() ?
|
||||
_default_timeout_config.range_read_timeout : read_timeout;
|
||||
_timeout_config.write_timeout = write_timeout == db::timeout_clock::duration::max() ?
|
||||
_default_timeout_config.write_timeout : write_timeout;
|
||||
_timeout_config.counter_write_timeout = write_timeout == db::timeout_clock::duration::max() ?
|
||||
_default_timeout_config.counter_write_timeout : write_timeout;
|
||||
});
|
||||
}
|
||||
|
||||
future<> service::client_state::check_user_can_login() {
|
||||
if (auth::is_anonymous(*_user)) {
|
||||
return make_ready_future();
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "auth/service.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "database_fwd.hh"
|
||||
@@ -88,10 +89,17 @@ public:
|
||||
};
|
||||
private:
|
||||
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
|
||||
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state),
|
||||
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
|
||||
_auth_service(auth_service ? &auth_service->local() : nullptr),
|
||||
_enabled_protocol_extensions(cs->_enabled_protocol_extensions) {}
|
||||
: _keyspace(cs->_keyspace)
|
||||
, _user(cs->_user)
|
||||
, _auth_state(cs->_auth_state)
|
||||
, _is_internal(cs->_is_internal)
|
||||
, _is_thrift(cs->_is_thrift)
|
||||
, _remote_address(cs->_remote_address)
|
||||
, _auth_service(auth_service ? &auth_service->local() : nullptr)
|
||||
, _default_timeout_config(cs->_default_timeout_config)
|
||||
, _timeout_config(cs->_timeout_config)
|
||||
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
|
||||
{}
|
||||
friend client_state_for_another_shard;
|
||||
private:
|
||||
sstring _keyspace;
|
||||
@@ -136,6 +144,10 @@ private:
|
||||
// Only populated for external client state.
|
||||
auth::service* _auth_service{nullptr};
|
||||
|
||||
// For restoring default values in the timeout config
|
||||
timeout_config _default_timeout_config;
|
||||
timeout_config _timeout_config;
|
||||
|
||||
public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
@@ -162,15 +174,7 @@ public:
|
||||
_driver_version = std::move(driver_version);
|
||||
}
|
||||
|
||||
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
: _is_internal(false)
|
||||
, _is_thrift(thrift)
|
||||
, _remote_address(remote_address)
|
||||
, _auth_service(&auth_service) {
|
||||
if (!auth_service.underlying_authenticator().require_authentication()) {
|
||||
_user = auth::authenticated_user();
|
||||
}
|
||||
}
|
||||
client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address = socket_address(), bool thrift = false);
|
||||
|
||||
gms::inet_address get_client_address() const {
|
||||
return gms::inet_address(_remote_address);
|
||||
@@ -180,10 +184,19 @@ public:
|
||||
return _remote_address.port();
|
||||
}
|
||||
|
||||
client_state(internal_tag)
|
||||
const timeout_config& get_timeout_config() const {
|
||||
return _timeout_config;
|
||||
}
|
||||
|
||||
client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config)
|
||||
{}
|
||||
|
||||
client_state(internal_tag, const timeout_config& config)
|
||||
: _keyspace("system")
|
||||
, _is_internal(true)
|
||||
, _is_thrift(false)
|
||||
, _default_timeout_config(config)
|
||||
, _timeout_config(config)
|
||||
{}
|
||||
|
||||
client_state(const client_state&) = delete;
|
||||
@@ -315,6 +328,7 @@ public:
|
||||
auth::command_desc::type = auth::command_desc::type::OTHER) const;
|
||||
future<> has_schema_access(const schema& s, auth::permission p) const;
|
||||
|
||||
future<> update_per_role_params();
|
||||
private:
|
||||
future<> has_access(const sstring& keyspace, auth::command_desc) const;
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "types/map.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_default_authenticator) {
|
||||
return do_with_cql_env([](cql_test_env& env) {
|
||||
@@ -216,3 +217,48 @@ SEASTAR_TEST_CASE(alter_opts_on_system_auth_tables) {
|
||||
cquery_nofail(env, "ALTER TABLE system_auth.role_permissions WITH min_index_interval = 456");
|
||||
}, auth_on());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
cfg->authenticator(sstring(auth::password_authenticator_name));
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auth::role_config config {
|
||||
.can_login = true,
|
||||
};
|
||||
auth::authentication_options opts {
|
||||
.password = "pass"
|
||||
};
|
||||
auth::create_role(e.local_auth_service(), "user", config, opts).get();
|
||||
authenticate(e, "user", "pass").get();
|
||||
|
||||
cquery_nofail(e, "CREATE TABLE t (id int PRIMARY KEY, v int)");
|
||||
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 5ms, 'write_timeout': 1h30m}");
|
||||
|
||||
auto my_map_type = map_type_impl::get_instance(utf8_type, utf8_type, false);
|
||||
|
||||
auto msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"read_timeout", "5ms"}, {"write_timeout", "1h30m"}}))),
|
||||
}});
|
||||
|
||||
cquery_nofail(e, "ALTER ROLE user WITH options = {'write_timeout': 35s}");
|
||||
|
||||
msg = cquery_nofail(e, "SELECT options FROM system_auth.roles WHERE role = 'user'");
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
my_map_type->decompose(make_map_value(my_map_type, map_type_impl::native_type({{"write_timeout", "35s"}}))),
|
||||
}});
|
||||
|
||||
// Setting a timeout value of 0 makes little sense, but it's great for testing
|
||||
cquery_nofail(e, "ALTER ROLE user WITH options = {'read_timeout': 0s, 'write_timeout': 0s}");
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("SELECT * FROM t").get(), exceptions::read_timeout_exception);
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("INSERT INTO t (id, v) VALUES (1,2)").get(), exceptions::mutation_write_failure_exception);
|
||||
|
||||
cquery_nofail(e, "ALTER ROLE user WITH options = {}");
|
||||
cquery_nofail(e, "SELECT * FROM t");
|
||||
cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)");
|
||||
|
||||
// Only valid timeout values are accepted
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("ALTER ROLE user WITH options = {'read_timeout': 'I am not a valid duration'}").get(), marshal_exception);
|
||||
}, cfg);
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception);
|
||||
|
||||
auto make_query_options = [] (cql_protocol_version_type version) {
|
||||
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version});
|
||||
};
|
||||
|
||||
@@ -3013,7 +3013,7 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
|
||||
e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get();
|
||||
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().is_empty();
|
||||
@@ -4444,7 +4444,6 @@ static std::unique_ptr<cql3::query_options> q_serial_opts(
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto qo = std::make_unique<cql3::query_options>(
|
||||
cl,
|
||||
infinite_timeout_config,
|
||||
values,
|
||||
// Ensure (optional) serial consistency is always specified.
|
||||
cql3::query_options::specific_options{
|
||||
@@ -4634,7 +4633,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
|
||||
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
|
||||
|
||||
int32_t page_size = is_paged ? 10000 : -1;
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
|
||||
|
||||
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;
|
||||
|
||||
@@ -831,14 +831,14 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(6), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(3), boolean_type->decompose(true)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(6), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -857,7 +857,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(2), boolean_type->decompose(false)}
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -866,7 +866,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(4), boolean_type->decompose(false)}
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -877,7 +877,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
// Some pages might be empty and in such case we should continue querying
|
||||
size_t rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -889,7 +889,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
|
||||
rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -905,7 +905,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
rows_fetched = 0;
|
||||
uint64_t remaining = 1;
|
||||
while (remaining > 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched += count_rows_fetched(msg);
|
||||
@@ -964,7 +964,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
{ int32_type->decompose(1), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -972,7 +972,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
{ int32_type->decompose(3), boolean_type->decompose(true)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
// Some pages might be empty and in such case we should continue querying
|
||||
size_t rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
rows_fetched = 0;
|
||||
uint64_t remaining = 1;
|
||||
while (remaining > 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()});
|
||||
sstring query = allow_filtering ?
|
||||
fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) :
|
||||
|
||||
@@ -36,7 +36,7 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(4321);
|
||||
|
||||
@@ -63,7 +63,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = e.execute_cql("select * from test;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
|
||||
paging_state->set_remaining(test_remaining);
|
||||
|
||||
while (has_more_pages(msg)) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -113,7 +113,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
|
||||
paging_state->set_remaining(test_remaining);
|
||||
|
||||
while (has_more_pages(msg)) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
|
||||
@@ -209,8 +209,7 @@ std::unordered_map<sstring, uint64_t> get_query_metrics() {
|
||||
|
||||
/// Creates query_options with cl, infinite timeout, and no named values.
|
||||
auto make_options(clevel cl) {
|
||||
return std::make_unique<cql3::query_options>(
|
||||
cl, infinite_timeout_config, std::vector<cql3::raw_value>());
|
||||
return std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>());
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
@@ -45,7 +45,7 @@ std::unique_ptr<cql3::query_options> to_options(
|
||||
static auto& d = cql3::query_options::DEFAULT;
|
||||
return std::make_unique<cql3::query_options>(
|
||||
cfg,
|
||||
d.get_consistency(), d.get_timeout_config(), std::move(names), std::move(values), d.skip_metadata(),
|
||||
d.get_consistency(), std::move(names), std::move(values), d.skip_metadata(),
|
||||
d.get_specific_options(), d.get_cql_serialization_format());
|
||||
}
|
||||
|
||||
|
||||
@@ -412,7 +412,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(101);
|
||||
@@ -424,7 +424,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_rows({{
|
||||
@@ -434,7 +434,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_rows({{
|
||||
@@ -470,7 +470,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
};
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -480,7 +480,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
expect_more_pages(res, true);
|
||||
@@ -490,7 +490,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
paging_state = extract_paging_state(res);
|
||||
@@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
try {
|
||||
expect_more_pages(res, false);
|
||||
} catch (...) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(0);
|
||||
@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -524,7 +524,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -534,7 +534,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
});
|
||||
|
||||
{
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -551,7 +551,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
|
||||
paging_state->get_rows_fetched_for_last_partition());
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -563,7 +563,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
// not to return rows (since no row matches an empty partition key)
|
||||
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
|
||||
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
|
||||
@@ -802,7 +802,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
};
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -811,7 +811,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
|
||||
|
||||
@@ -821,7 +821,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -1158,7 +1158,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function.
|
||||
@@ -1173,7 +1173,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -1191,7 +1191,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function
|
||||
@@ -1199,7 +1199,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
{ int32_type->decompose(row_count * row_count / 4)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
|
||||
@@ -125,7 +125,7 @@ private:
|
||||
service::client_state client_state;
|
||||
|
||||
core_local_state(auth::service& auth_service)
|
||||
: client_state(service::client_state::external_tag{}, auth_service)
|
||||
: client_state(service::client_state::external_tag{}, auth_service, infinite_timeout_config)
|
||||
{
|
||||
client_state.set_login(auth::authenticated_user(testing_superuser));
|
||||
}
|
||||
@@ -189,7 +189,7 @@ public:
|
||||
db::consistency_level cl = db::consistency_level::ONE) override {
|
||||
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config,
|
||||
auto options = std::make_unique<cql3::query_options>(cl,
|
||||
std::move(values), cql3::query_options::specific_options{
|
||||
so.page_size,
|
||||
so.state,
|
||||
@@ -226,7 +226,7 @@ public:
|
||||
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
|
||||
}
|
||||
auto& qo = cql3::query_options::DEFAULT;
|
||||
auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout;
|
||||
auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
|
||||
|
||||
return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
|
||||
.finally([qs, modif_stmt = std::move(modif_stmt)] {});
|
||||
|
||||
@@ -228,7 +228,7 @@ SEASTAR_TEST_CASE(scan_enormous_table_test) {
|
||||
std::unique_ptr<cql3::query_options> qo;
|
||||
uint64_t fetched_rows_log_counter = 1e7;
|
||||
do {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0();
|
||||
rows_fetched += count_rows_fetched(msg);
|
||||
|
||||
@@ -104,7 +104,6 @@ std::unique_ptr<cql3::query_options> repl_options() {
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto qo = std::make_unique<cql3::query_options>(
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
std::vector<cql3::raw_value>{},
|
||||
// Ensure (optional) serial consistency is always specified.
|
||||
cql3::query_options::specific_options{
|
||||
|
||||
@@ -201,9 +201,9 @@ enum class query_order { no, yes };
|
||||
class thrift_handler : public CassandraCobSvIf {
|
||||
distributed<database>& _db;
|
||||
distributed<cql3::query_processor>& _query_processor;
|
||||
::timeout_config _timeout_config;
|
||||
service::client_state _client_state;
|
||||
service::query_state _query_state;
|
||||
::timeout_config _timeout_config;
|
||||
private:
|
||||
template <typename Cob, typename Func>
|
||||
void
|
||||
@@ -220,9 +220,9 @@ public:
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
|
||||
: _db(db)
|
||||
, _query_processor(qp)
|
||||
, _client_state(service::client_state::external_tag{}, auth_service, socket_address(), true)
|
||||
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
|
||||
, _timeout_config(timeout_config)
|
||||
, _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true)
|
||||
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
|
||||
{ }
|
||||
|
||||
const sstring& current_keyspace() const {
|
||||
@@ -976,7 +976,7 @@ public:
|
||||
throw make_exception<InvalidRequestException>("Compressed query strings are not supported");
|
||||
}
|
||||
auto& qp = _query_processor.local();
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = qp.execute_direct(query, _query_state, *opts);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
@@ -1056,7 +1056,7 @@ public:
|
||||
return cql3::raw_value::make_value(to_bytes(s));
|
||||
});
|
||||
auto& qp = _query_processor.local();
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values),
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::move(bytes_values),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
|
||||
@@ -63,9 +63,13 @@ const sstring trace_keyspace_helper::EVENTS("events");
|
||||
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log");
|
||||
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx");
|
||||
|
||||
timeout_config tracing_db_timeout_config {
|
||||
5s, 5s, 5s, 5s, 5s, 5s, 5s,
|
||||
};
|
||||
static service::client_state& tracing_client_state() {
|
||||
static timeout_config tracing_db_timeout_config {
|
||||
5s, 5s, 5s, 5s, 5s, 5s, 5s,
|
||||
};
|
||||
static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config);
|
||||
return s;
|
||||
}
|
||||
|
||||
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
|
||||
int64_t last_nanos = 0;
|
||||
@@ -75,7 +79,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat
|
||||
|
||||
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
: i_tracing_backend_helper(tr)
|
||||
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit())
|
||||
, _dummy_query_state(tracing_client_state(), empty_service_permit())
|
||||
, _sessions(KEYSPACE_NAME, SESSIONS,
|
||||
sprint("CREATE TABLE IF NOT EXISTS %s.%s ("
|
||||
"session_id uuid,"
|
||||
@@ -313,7 +317,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
|
||||
};
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
|
||||
@@ -331,7 +335,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
|
||||
};
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -374,7 +378,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
});
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -395,7 +399,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
|
||||
});
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
|
||||
@@ -431,7 +435,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
|
||||
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
|
||||
|
||||
return do_with(
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
|
||||
[this] (auto& batch_options, auto& batch) {
|
||||
return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
|
||||
|
||||
@@ -219,10 +219,10 @@ private:
|
||||
options_flag::NAMES_FOR_VALUES
|
||||
>;
|
||||
public:
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) {
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const cql3::cql_config& cql_config) {
|
||||
auto consistency = read_consistency();
|
||||
if (version == 1) {
|
||||
return std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
return std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
@@ -270,11 +270,11 @@ public:
|
||||
if (!names.empty()) {
|
||||
onames = std::move(names);
|
||||
}
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::move(onames), std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts},
|
||||
cql_ser_format);
|
||||
} else {
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
|
||||
@@ -563,7 +563,7 @@ cql_server::connection::connection(cql_server& server, socket_address server_add
|
||||
, _fd(std::move(fd))
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, addr)
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, server.timeout_config(), addr)
|
||||
{
|
||||
++_server._total_connections;
|
||||
++_server._current_connections;
|
||||
@@ -855,6 +855,9 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
|
||||
std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username)));
|
||||
});
|
||||
}
|
||||
f = f.then([&client_state] {
|
||||
return client_state.update_per_role_params();
|
||||
});
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
|
||||
});
|
||||
@@ -891,7 +894,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
|
||||
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
|
||||
request_reader in(is, linearization_buffer);
|
||||
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
|
||||
server.timeout_config(), /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
|
||||
/* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
|
||||
// result here has to be foreign ptr
|
||||
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
|
||||
});
|
||||
@@ -906,7 +909,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
fragmented_temporary_buffer::istream is = in.get_stream();
|
||||
|
||||
return process_fn(client_state, _server._query_processor, in, stream,
|
||||
_version, _cql_serialization_format, _server.timeout_config(), permit, trace_state, true)
|
||||
_version, _cql_serialization_format, permit, trace_state, true)
|
||||
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
|
||||
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
unsigned* shard = std::get_if<unsigned>(&msg);
|
||||
@@ -920,12 +923,12 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state,
|
||||
service_permit permit, tracing::trace_state_ptr trace_state,
|
||||
bool init_trace) {
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
|
||||
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
@@ -988,8 +991,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
|
||||
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
||||
bool needs_authorization = false;
|
||||
@@ -1012,10 +1014,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
in.read_value_view_list(version, values);
|
||||
auto consistency = in.read_consistency();
|
||||
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, timeout_config, std::nullopt, values, false,
|
||||
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, std::nullopt, values, false,
|
||||
cql3::query_options::specific_options::DEFAULT, serialization_format);
|
||||
} else {
|
||||
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
|
||||
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
|
||||
}
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
@@ -1068,8 +1070,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
if (version == 1) {
|
||||
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
|
||||
}
|
||||
@@ -1158,7 +1159,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,
|
||||
timeout_config, qp.local().get_cql_config())), std::move(values)));
|
||||
qp.local().get_cql_config())), std::move(values)));
|
||||
auto& options = *q_state->options;
|
||||
|
||||
if (init_trace) {
|
||||
|
||||
Reference in New Issue
Block a user