Merge 'Make all timeouts explicit' from Avi

"
This patchset makes all users of query_processor specify their timeouts
explicitly, in preparation for the removal of
cql_statement::execute_internal() (whose main function was to override
timeouts).
"

* tag 'cql-explicit-timeouts/v1' of https://github.com/avikivity/scylla:
  query_processor: require clients to specify timeout configuration
  query_processor: un-default consistency level in make_internal_options
This commit is contained in:
Duarte Nunes
2018-05-26 16:10:58 +02:00
9 changed files with 56 additions and 15 deletions

View File

@@ -103,6 +103,7 @@ future<bool> default_authorizer::any_granted() const {
return _qp.process(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{},
true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return !results->empty();
@@ -115,7 +116,8 @@ future<> default_authorizer::migrate_legacy_metadata() const {
return _qp.process(
query,
db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
db::consistency_level::LOCAL_ONE,
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
return do_with(
row.get_as<sstring>("username"),
@@ -196,6 +198,7 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
return _qp.process(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return permissions::NONE;
@@ -225,6 +228,7 @@ default_authorizer::modify(
return _qp.process(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
});
}
@@ -250,6 +254,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
return _qp.process(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{},
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
std::vector<permission_details> all_details;
@@ -277,6 +282,7 @@ future<> default_authorizer::revoke_all(stdx::string_view role_name) const {
return _qp.process(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
try {
std::rethrow_exception(ep);
@@ -297,6 +303,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.process(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
@@ -314,6 +321,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
return _qp.process(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
[resource](auto ep) {
try {

View File

@@ -183,7 +183,8 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.process(
query,
db::consistency_level::QUORUM).then([this](::shared_ptr<cql3::untyped_result_set> results) {
db::consistency_level::QUORUM,
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
auto username = row.get_as<sstring>("username");
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
@@ -191,6 +192,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return _qp.process(
update_row_query,
consistency_for_user(username),
infinite_timeout_config,
{std::move(salted_hash), username}).discard_result();
}).finally([results] {});
}).then([] {
@@ -207,6 +209,7 @@ future<> password_authenticator::create_default_if_missing() const {
return _qp.process(
update_row_query,
db::consistency_level::QUORUM,
infinite_timeout_config,
{hashpw(DEFAULT_USER_PASSWORD), DEFAULT_USER_NAME}).then([](auto&&) {
plogger.info("Created default superuser authentication record.");
});
@@ -306,6 +309,7 @@ future<authenticated_user> password_authenticator::authenticate(
return _qp.process(
query,
consistency_for_user(username),
infinite_timeout_config,
{username},
true);
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
@@ -333,6 +337,7 @@ future<> password_authenticator::create(stdx::string_view role_name, const authe
return _qp.process(
update_row_query,
consistency_for_user(role_name),
infinite_timeout_config,
{hashpw(*options.password), sstring(role_name)}).discard_result();
}
@@ -350,6 +355,7 @@ future<> password_authenticator::alter(stdx::string_view role_name, const authen
return _qp.process(
query,
consistency_for_user(role_name),
infinite_timeout_config,
{hashpw(*options.password), sstring(role_name)}).discard_result();
}
@@ -360,7 +366,7 @@ future<> password_authenticator::drop(stdx::string_view name) const {
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(query, consistency_for_user(name), {sstring(name)}).discard_result();
return _qp.process(query, consistency_for_user(name), infinite_timeout_config, {sstring(name)}).discard_result();
}
future<custom_options> password_authenticator::query_custom_options(stdx::string_view role_name) const {

View File

@@ -72,12 +72,14 @@ future<bool> default_role_row_satisfies(
return qp.process(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return qp.process(
query,
db::consistency_level::QUORUM,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
@@ -101,7 +103,8 @@ future<bool> any_nondefault_role_row_satisfies(
return do_with(std::move(p), [&qp](const auto& p) {
return qp.process(
query,
db::consistency_level::QUORUM).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
db::consistency_level::QUORUM,
infinite_timeout_config).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return false;
}

View File

@@ -223,6 +223,7 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.process(
default_user_query,
db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) {
if (!results->empty()) {
@@ -232,6 +233,7 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.process(
default_user_query,
db::consistency_level::QUORUM,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([this](auto results) {
if (!results->empty()) {
@@ -240,7 +242,8 @@ future<bool> service::has_existing_legacy_users() const {
return _qp.process(
all_users_query,
db::consistency_level::QUORUM).then([](auto results) {
db::consistency_level::QUORUM,
infinite_timeout_config).then([](auto results) {
return make_ready_future<bool>(!results->empty());
});
});

View File

@@ -89,6 +89,7 @@ static future<stdx::optional<record>> find_record(cql3::query_processor& qp, std
return qp.process(
query,
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name)},
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
@@ -173,6 +174,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
return _qp.process(
query,
db::consistency_level::QUORUM,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
return make_ready_future<>();
@@ -198,7 +200,8 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
return _qp.process(
query,
db::consistency_level::QUORUM).then([this](::shared_ptr<cql3::untyped_result_set> results) {
db::consistency_level::QUORUM,
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
role_config config;
config.is_superuser = row.get_as<bool>("super");
@@ -260,6 +263,7 @@ future<> standard_role_manager::create_or_replace(stdx::string_view role_name, c
return _qp.process(
query,
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name), c.is_superuser, c.can_login},
true).discard_result();
}
@@ -303,6 +307,7 @@ standard_role_manager::alter(stdx::string_view role_name, const role_config_upda
build_column_assignments(u),
meta::roles_table::role_col_name),
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name)}).discard_result();
});
}
@@ -322,6 +327,7 @@ future<> standard_role_manager::drop(stdx::string_view role_name) const {
return _qp.process(
query,
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
return parallel_for_each(
members->begin(),
@@ -361,6 +367,7 @@ future<> standard_role_manager::drop(stdx::string_view role_name) const {
return _qp.process(
query,
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name)}).discard_result();
};
@@ -387,6 +394,7 @@ standard_role_manager::modify_membership(
return _qp.process(
query,
consistency_for_role(grantee_name),
infinite_timeout_config,
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
};
@@ -398,6 +406,7 @@ standard_role_manager::modify_membership(
"INSERT INTO %s (role, member) VALUES (?, ?)",
meta::role_members_table::qualified_name()),
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name), sstring(grantee_name)}).discard_result();
case membership_change::remove:
@@ -406,6 +415,7 @@ standard_role_manager::modify_membership(
"DELETE FROM %s WHERE role = ? AND member = ?",
meta::role_members_table::qualified_name()),
consistency_for_role(role_name),
infinite_timeout_config,
{sstring(role_name), sstring(grantee_name)}).discard_result();
}
@@ -506,7 +516,7 @@ future<role_set> standard_role_manager::query_all() const {
// To avoid many copies of a view.
static const auto role_col_name_string = sstring(meta::roles_table::role_col_name);
return _qp.process(query, db::consistency_level::QUORUM).then([](::shared_ptr<cql3::untyped_result_set> results) {
return _qp.process(query, db::consistency_level::QUORUM, infinite_timeout_config).then([](::shared_ptr<cql3::untyped_result_set> results) {
role_set roles;
std::transform(

View File

@@ -414,6 +414,7 @@ query_options query_processor::make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>& values,
db::consistency_level cl,
const timeout_config& timeout_config,
int32_t page_size) {
if (p->bound_names.size() != values.size()) {
throw std::invalid_argument(
@@ -437,11 +438,11 @@ query_options query_processor::make_internal_options(
api::timestamp_type ts = api::missing_timestamp;
return query_options(
cl,
infinite_timeout_config,
timeout_config,
bound_values,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
}
return query_options(cl, infinite_timeout_config, bound_values);
return query_options(cl, timeout_config, bound_values);
}
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
@@ -472,7 +473,7 @@ struct internal_query_state {
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
const std::initializer_list<data_value>& values, int32_t page_size) {
auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config, page_size);
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
internal_query_state{
query_string,
@@ -560,7 +561,7 @@ future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
statements::prepared_statement::checked_weak_ptr p,
const std::initializer_list<data_value>& values) {
query_options opts = make_internal_options(p, values);
query_options opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config);
return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) {
return p->statement->execute_internal(
_proxy,
@@ -575,15 +576,16 @@ future<::shared_ptr<untyped_result_set>>
query_processor::process(
const sstring& query_string,
db::consistency_level cl,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& values,
bool cache) {
if (cache) {
return process(prepare_internal(query_string), cl, values);
return process(prepare_internal(query_string), cl, timeout_config, values);
} else {
auto p = parse_statement(query_string)->prepare(_db.local(), _cql_stats);
p->statement->validate(_proxy, *_internal_state);
auto checked_weak_ptr = p->checked_weak_from_this();
return process(std::move(checked_weak_ptr), cl, values).finally([p = std::move(p)] {});
return process(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
}
}
@@ -591,8 +593,9 @@ future<::shared_ptr<untyped_result_set>>
query_processor::process(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level cl,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& values) {
auto opts = make_internal_options(p, values, cl);
auto opts = make_internal_options(p, values, cl, timeout_config);
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) {
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) {
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));

View File

@@ -240,12 +240,14 @@ public:
future<::shared_ptr<untyped_result_set>> process(
const sstring& query_string,
db::consistency_level,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& = { },
bool cache = false);
future<::shared_ptr<untyped_result_set>> process(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& = { });
/*
@@ -283,7 +285,8 @@ private:
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>&,
db::consistency_level = db::consistency_level::ONE,
db::consistency_level,
const timeout_config& timeout_config,
int32_t page_size = -1);
future<::shared_ptr<cql_transport::messages::result_message>>

View File

@@ -101,6 +101,7 @@ future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::vi
return _qp.process(
sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
infinite_timeout_config,
{ std::move(ks_name), std::move(view_name) },
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
@@ -117,6 +118,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
return _qp.process(
sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
infinite_timeout_config,
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
false).discard_result();
});
@@ -127,6 +129,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
return _qp.process(
sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
infinite_timeout_config,
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
false).discard_result();
});
@@ -136,6 +139,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
return _qp.process(
sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
infinite_timeout_config,
{ std::move(ks_name), std::move(view_name) },
false).discard_result();
}

View File

@@ -167,6 +167,7 @@ SEASTAR_TEST_CASE(test_cassandra_hash) {
// This is extremely whitebox. We'll just go right ahead and know
// what the tables etc are called. Oy wei...
auto f = env.local_qp().process("INSERT into system_auth.roles (role, salted_hash) values (?, ?)", db::consistency_level::ONE,
infinite_timeout_config,
{ username, salted_hash }).discard_result();
return f.then([=, &env] {