Merge 'Make all timeouts explicit' from Avi
" This patchset makes all users of query_processor specify their timeouts explicitly, in preparation for the removal of cql_statement::execute_internal() (whose main function was to override timeouts). " * tag 'cql-explicit-timeouts/v1' of https://github.com/avikivity/scylla: query_processor: require clients to specify timeout configuration query_processor: un-default consistency level in make_internal_options
This commit is contained in:
@@ -103,6 +103,7 @@ future<bool> default_authorizer::any_granted() const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{},
|
||||
true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
@@ -115,7 +116,8 @@ future<> default_authorizer::migrate_legacy_metadata() const {
|
||||
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
return do_with(
|
||||
row.get_as<sstring>("username"),
|
||||
@@ -196,6 +198,7 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return permissions::NONE;
|
||||
@@ -225,6 +228,7 @@ default_authorizer::modify(
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -250,6 +254,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
std::vector<permission_details> all_details;
|
||||
@@ -277,6 +282,7 @@ future<> default_authorizer::revoke_all(stdx::string_view role_name) const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
@@ -297,6 +303,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
try {
|
||||
auto res = f.get0();
|
||||
@@ -314,6 +321,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
|
||||
@@ -183,7 +183,8 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::QUORUM).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
auto username = row.get_as<sstring>("username");
|
||||
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
|
||||
@@ -191,6 +192,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
return _qp.process(
|
||||
update_row_query,
|
||||
consistency_for_user(username),
|
||||
infinite_timeout_config,
|
||||
{std::move(salted_hash), username}).discard_result();
|
||||
}).finally([results] {});
|
||||
}).then([] {
|
||||
@@ -207,6 +209,7 @@ future<> password_authenticator::create_default_if_missing() const {
|
||||
return _qp.process(
|
||||
update_row_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{hashpw(DEFAULT_USER_PASSWORD), DEFAULT_USER_NAME}).then([](auto&&) {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
});
|
||||
@@ -306,6 +309,7 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_user(username),
|
||||
infinite_timeout_config,
|
||||
{username},
|
||||
true);
|
||||
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
@@ -333,6 +337,7 @@ future<> password_authenticator::create(stdx::string_view role_name, const authe
|
||||
return _qp.process(
|
||||
update_row_query,
|
||||
consistency_for_user(role_name),
|
||||
infinite_timeout_config,
|
||||
{hashpw(*options.password), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -350,6 +355,7 @@ future<> password_authenticator::alter(stdx::string_view role_name, const authen
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_user(role_name),
|
||||
infinite_timeout_config,
|
||||
{hashpw(*options.password), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -360,7 +366,7 @@ future<> password_authenticator::drop(stdx::string_view name) const {
|
||||
meta::roles_table::qualified_name(),
|
||||
meta::roles_table::role_col_name);
|
||||
|
||||
return _qp.process(query, consistency_for_user(name), {sstring(name)}).discard_result();
|
||||
return _qp.process(query, consistency_for_user(name), infinite_timeout_config, {sstring(name)}).discard_result();
|
||||
}
|
||||
|
||||
future<custom_options> password_authenticator::query_custom_options(stdx::string_view role_name) const {
|
||||
|
||||
@@ -72,12 +72,14 @@ future<bool> default_role_row_satisfies(
|
||||
return qp.process(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return qp.process(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -101,7 +103,8 @@ future<bool> any_nondefault_role_row_satisfies(
|
||||
return do_with(std::move(p), [&qp](const auto& p) {
|
||||
return qp.process(
|
||||
query,
|
||||
db::consistency_level::QUORUM).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -223,6 +223,7 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -232,6 +233,7 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.process(
|
||||
default_user_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -240,7 +242,8 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
|
||||
return _qp.process(
|
||||
all_users_query,
|
||||
db::consistency_level::QUORUM).then([](auto results) {
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([](auto results) {
|
||||
return make_ready_future<bool>(!results->empty());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -89,6 +89,7 @@ static future<stdx::optional<record>> find_record(cql3::query_processor& qp, std
|
||||
return qp.process(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name)},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -173,6 +174,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
|
||||
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
|
||||
return make_ready_future<>();
|
||||
@@ -198,7 +200,8 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
|
||||
|
||||
return _qp.process(
|
||||
query,
|
||||
db::consistency_level::QUORUM).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
role_config config;
|
||||
config.is_superuser = row.get_as<bool>("super");
|
||||
@@ -260,6 +263,7 @@ future<> standard_role_manager::create_or_replace(stdx::string_view role_name, c
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name), c.is_superuser, c.can_login},
|
||||
true).discard_result();
|
||||
}
|
||||
@@ -303,6 +307,7 @@ standard_role_manager::alter(stdx::string_view role_name, const role_config_upda
|
||||
build_column_assignments(u),
|
||||
meta::roles_table::role_col_name),
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name)}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -322,6 +327,7 @@ future<> standard_role_manager::drop(stdx::string_view role_name) const {
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
|
||||
return parallel_for_each(
|
||||
members->begin(),
|
||||
@@ -361,6 +367,7 @@ future<> standard_role_manager::drop(stdx::string_view role_name) const {
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -387,6 +394,7 @@ standard_role_manager::modify_membership(
|
||||
return _qp.process(
|
||||
query,
|
||||
consistency_for_role(grantee_name),
|
||||
infinite_timeout_config,
|
||||
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -398,6 +406,7 @@ standard_role_manager::modify_membership(
|
||||
"INSERT INTO %s (role, member) VALUES (?, ?)",
|
||||
meta::role_members_table::qualified_name()),
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
|
||||
case membership_change::remove:
|
||||
@@ -406,6 +415,7 @@ standard_role_manager::modify_membership(
|
||||
"DELETE FROM %s WHERE role = ? AND member = ?",
|
||||
meta::role_members_table::qualified_name()),
|
||||
consistency_for_role(role_name),
|
||||
infinite_timeout_config,
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -506,7 +516,7 @@ future<role_set> standard_role_manager::query_all() const {
|
||||
// To avoid many copies of a view.
|
||||
static const auto role_col_name_string = sstring(meta::roles_table::role_col_name);
|
||||
|
||||
return _qp.process(query, db::consistency_level::QUORUM).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return _qp.process(query, db::consistency_level::QUORUM, infinite_timeout_config).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
role_set roles;
|
||||
|
||||
std::transform(
|
||||
|
||||
@@ -414,6 +414,7 @@ query_options query_processor::make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>& values,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size) {
|
||||
if (p->bound_names.size() != values.size()) {
|
||||
throw std::invalid_argument(
|
||||
@@ -437,11 +438,11 @@ query_options query_processor::make_internal_options(
|
||||
api::timestamp_type ts = api::missing_timestamp;
|
||||
return query_options(
|
||||
cl,
|
||||
infinite_timeout_config,
|
||||
timeout_config,
|
||||
bound_values,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
|
||||
}
|
||||
return query_options(cl, infinite_timeout_config, bound_values);
|
||||
return query_options(cl, timeout_config, bound_values);
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
|
||||
@@ -472,7 +473,7 @@ struct internal_query_state {
|
||||
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
|
||||
const std::initializer_list<data_value>& values, int32_t page_size) {
|
||||
auto p = prepare_internal(query_string);
|
||||
auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
|
||||
auto opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config, page_size);
|
||||
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
|
||||
internal_query_state{
|
||||
query_string,
|
||||
@@ -560,7 +561,7 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_internal(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
const std::initializer_list<data_value>& values) {
|
||||
query_options opts = make_internal_options(p, values);
|
||||
query_options opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config);
|
||||
return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) {
|
||||
return p->statement->execute_internal(
|
||||
_proxy,
|
||||
@@ -575,15 +576,16 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::process(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
bool cache) {
|
||||
if (cache) {
|
||||
return process(prepare_internal(query_string), cl, values);
|
||||
return process(prepare_internal(query_string), cl, timeout_config, values);
|
||||
} else {
|
||||
auto p = parse_statement(query_string)->prepare(_db.local(), _cql_stats);
|
||||
p->statement->validate(_proxy, *_internal_state);
|
||||
auto checked_weak_ptr = p->checked_weak_from_this();
|
||||
return process(std::move(checked_weak_ptr), cl, values).finally([p = std::move(p)] {});
|
||||
return process(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,8 +593,9 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::process(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values) {
|
||||
auto opts = make_internal_options(p, values, cl);
|
||||
auto opts = make_internal_options(p, values, cl, timeout_config);
|
||||
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) {
|
||||
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) {
|
||||
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
|
||||
|
||||
@@ -240,12 +240,14 @@ public:
|
||||
future<::shared_ptr<untyped_result_set>> process(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& = { },
|
||||
bool cache = false);
|
||||
|
||||
future<::shared_ptr<untyped_result_set>> process(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& = { });
|
||||
|
||||
/*
|
||||
@@ -283,7 +285,8 @@ private:
|
||||
query_options make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>&,
|
||||
db::consistency_level = db::consistency_level::ONE,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size = -1);
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
|
||||
@@ -101,6 +101,7 @@ future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::vi
|
||||
return _qp.process(
|
||||
sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
|
||||
@@ -117,6 +118,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
|
||||
return _qp.process(
|
||||
sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -127,6 +129,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
|
||||
return _qp.process(
|
||||
sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -136,6 +139,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
return _qp.process(
|
||||
sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
@@ -167,6 +167,7 @@ SEASTAR_TEST_CASE(test_cassandra_hash) {
|
||||
// This is extremely whitebox. We'll just go right ahead and know
|
||||
// what the tables etc are called. Oy wei...
|
||||
auto f = env.local_qp().process("INSERT into system_auth.roles (role, salted_hash) values (?, ?)", db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{ username, salted_hash }).discard_result();
|
||||
|
||||
return f.then([=, &env] {
|
||||
|
||||
Reference in New Issue
Block a user