diff --git a/alternator/auth.cc b/alternator/auth.cc index fbd52a6369..2dd4d63907 100644 --- a/alternator/auth.cc +++ b/alternator/auth.cc @@ -129,7 +129,7 @@ future get_key_from_roles(cql3::query_processor& qp, std::string us auto cl = auth::password_authenticator::consistency_for_user(username); auto timeout = auth::internal_distributed_timeout_config(); - return qp.process(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr> f) { + return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr> f) { auto res = f.get0(); auto salted_hash = std::optional(); if (res->empty()) { diff --git a/auth/default_authorizer.cc b/auth/default_authorizer.cc index 28453b8a82..fbf971be3d 100644 --- a/auth/default_authorizer.cc +++ b/auth/default_authorizer.cc @@ -101,7 +101,7 @@ bool default_authorizer::legacy_metadata_exists() const { future default_authorizer::any_granted() const { static const sstring query = format("SELECT * FROM {}.{} LIMIT 1", meta::AUTH_KS, PERMISSIONS_CF); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, infinite_timeout_config, @@ -115,7 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const { alogger.info("Starting migration of legacy permissions metadata."); static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, infinite_timeout_config).then([this](::shared_ptr results) { @@ -195,7 +195,7 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc ROLE_NAME, RESOURCE_NAME); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, infinite_timeout_config, @@ -224,7 +224,7 @@ default_authorizer::modify( ROLE_NAME, RESOURCE_NAME), [this, &role_name, set, &resource](const auto& query) { - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::ONE, internal_distributed_timeout_config(), @@ -249,7 +249,7 @@ future> default_authorizer::list_all() const { meta::AUTH_KS, PERMISSIONS_CF); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::ONE, internal_distributed_timeout_config(), @@ -276,7 +276,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const { PERMISSIONS_CF, ROLE_NAME); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::ONE, internal_distributed_timeout_config(), @@ -296,7 +296,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const { PERMISSIONS_CF, RESOURCE_NAME); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, infinite_timeout_config, @@ -313,7 +313,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const { ROLE_NAME, RESOURCE_NAME); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, infinite_timeout_config, diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index 99c9857d4c..f64a56f956 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -111,7 +111,7 @@ future<> password_authenticator::migrate_legacy_metadata() const { plogger.info("Starting migration of legacy authentication metadata."); static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config()).then([this](::shared_ptr results) { @@ -119,7 +119,7 @@ future<> password_authenticator::migrate_legacy_metadata() const { auto username = row.get_as("username"); auto salted_hash = row.get_as(SALTED_HASH); - return _qp.process( + return _qp.execute_internal( update_row_query, consistency_for_user(username), internal_distributed_timeout_config(), @@ -136,7 +136,7 @@ future<> password_authenticator::migrate_legacy_metadata() const { future<> password_authenticator::create_default_if_missing() const { return default_role_row_satisfies(_qp, &has_salted_hash).then([this](bool exists) { if (!exists) { - return _qp.process( + return _qp.execute_internal( update_row_query, db::consistency_level::QUORUM, internal_distributed_timeout_config(), @@ -233,7 +233,7 @@ future password_authenticator::authenticate( meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_user(username), internal_distributed_timeout_config(), @@ -267,7 +267,7 @@ future<> password_authenticator::create(std::string_view role_name, const authen return make_ready_future<>(); } - return _qp.process( + return _qp.execute_internal( update_row_query, consistency_for_user(role_name), internal_distributed_timeout_config(), @@ -284,7 +284,7 @@ future<> password_authenticator::alter(std::string_view role_name, const authent SALTED_HASH, meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_user(role_name), internal_distributed_timeout_config(), @@ -297,7 +297,7 @@ future<> password_authenticator::drop(std::string_view name) const { meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_user(name), internal_distributed_timeout_config(), {sstring(name)}).discard_result(); diff --git a/auth/roles-metadata.cc b/auth/roles-metadata.cc index 5eac88b9ce..960708e4f2 100644 --- a/auth/roles-metadata.cc +++ b/auth/roles-metadata.cc @@ -68,14 +68,14 @@ future default_role_row_satisfies( meta::roles_table::role_col_name); return do_with(std::move(p), [&qp](const auto& p) { - return qp.process( + return qp.execute_internal( query, db::consistency_level::ONE, infinite_timeout_config, {meta::DEFAULT_SUPERUSER_NAME}, true).then([&qp, &p](::shared_ptr results) { if (results->empty()) { - return qp.process( + return qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config(), @@ -100,7 +100,7 @@ future any_nondefault_role_row_satisfies( static const sstring query = format("SELECT * FROM {}", meta::roles_table::qualified_name()); return do_with(std::move(p), [&qp](const auto& p) { - return qp.process( + return qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config()).then([&p](::shared_ptr results) { diff --git a/auth/service.cc b/auth/service.cc index a324c2e45b..cb391e1416 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -217,7 +217,7 @@ future service::has_existing_legacy_users() const { // This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we // can potentially avoid doing a range query with a high consistency level. - return _qp.process( + return _qp.execute_internal( default_user_query, db::consistency_level::ONE, infinite_timeout_config, @@ -227,7 +227,7 @@ future service::has_existing_legacy_users() const { return make_ready_future(true); } - return _qp.process( + return _qp.execute_internal( default_user_query, db::consistency_level::QUORUM, infinite_timeout_config, @@ -237,7 +237,7 @@ future service::has_existing_legacy_users() const { return make_ready_future(true); } - return _qp.process( + return _qp.execute_internal( all_users_query, db::consistency_level::QUORUM, infinite_timeout_config).then([](auto results) { diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index 950a7f6e96..f6f5bae795 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -87,7 +87,7 @@ static future> find_record(cql3::query_processor& qp, std: meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return qp.process( + return qp.execute_internal( query, consistency_for_role(role_name), internal_distributed_timeout_config(), @@ -171,7 +171,7 @@ future<> standard_role_manager::create_default_role_if_missing() const { meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config(), @@ -198,7 +198,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const { log.info("Starting migration of legacy user metadata."); static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config()).then([this](::shared_ptr results) { @@ -259,7 +259,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_role(role_name), internal_distributed_timeout_config(), @@ -299,7 +299,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat return make_ready_future<>(); } - return _qp.process( + return _qp.execute_internal( format("UPDATE {} SET {} WHERE {} = ?", meta::roles_table::qualified_name(), build_column_assignments(u), @@ -321,7 +321,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const { static const sstring query = format("SELECT member FROM {} WHERE role = ?", meta::role_members_table::qualified_name()); - return _qp.process( + return _qp.execute_internal( query, consistency_for_role(role_name), internal_distributed_timeout_config(), @@ -360,7 +360,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const { meta::roles_table::qualified_name(), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_role(role_name), internal_distributed_timeout_config(), @@ -387,7 +387,7 @@ standard_role_manager::modify_membership( (ch == membership_change::add ? '+' : '-'), meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, consistency_for_role(grantee_name), internal_distributed_timeout_config(), @@ -397,7 +397,7 @@ standard_role_manager::modify_membership( const auto modify_role_members = [this, role_name, grantee_name, ch] { switch (ch) { case membership_change::add: - return _qp.process( + return _qp.execute_internal( format("INSERT INTO {} (role, member) VALUES (?, ?)", meta::role_members_table::qualified_name()), consistency_for_role(role_name), @@ -405,7 +405,7 @@ standard_role_manager::modify_membership( {sstring(role_name), sstring(grantee_name)}).discard_result(); case membership_change::remove: - return _qp.process( + return _qp.execute_internal( format("DELETE FROM {} WHERE role = ? AND member = ?", meta::role_members_table::qualified_name()), consistency_for_role(role_name), @@ -509,7 +509,7 @@ future standard_role_manager::query_all() const { // To avoid many copies of a view. static const auto role_col_name_string = sstring(meta::roles_table::role_col_name); - return _qp.process( + return _qp.execute_internal( query, db::consistency_level::QUORUM, internal_distributed_timeout_config()).then([](::shared_ptr results) { diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 658647ec90..610f6ee576 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -683,14 +683,6 @@ statements::prepared_statement::checked_weak_ptr query_processor::prepare_intern return p->checked_weak_from_this(); } -future<::shared_ptr> -query_processor::execute_internal(const sstring& query_string, const std::initializer_list& values) { - if (log.is_enabled(logging::log_level::trace)) { - log.trace("execute_internal: \"{}\" ({})", query_string, ::join(", ", values)); - } - return execute_internal(prepare_internal(query_string), values); -} - struct internal_query_state { sstring query_string; std::unique_ptr opts; @@ -817,38 +809,23 @@ query_processor::execute_paged_internal(::shared_ptr state future<::shared_ptr> query_processor::execute_internal( - statements::prepared_statement::checked_weak_ptr p, - const std::initializer_list& values) { - query_options opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config); - return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) { - return p->statement->execute( - _proxy, - *_internal_state, - opts).then([&opts, stmt = p->statement](auto msg) { - return make_ready_future<::shared_ptr>(::make_shared(msg)); - }); - }); -} - -future<::shared_ptr> -query_processor::process( const sstring& query_string, db::consistency_level cl, const timeout_config& timeout_config, const std::initializer_list& values, bool cache) { if (cache) { - return process(prepare_internal(query_string), cl, timeout_config, values); + return execute_with_params(prepare_internal(query_string), cl, timeout_config, values); } else { auto p = parse_statement(query_string)->prepare(_db, _cql_stats); p->statement->validate(_proxy, *_internal_state); auto checked_weak_ptr = p->checked_weak_from_this(); - return process(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {}); + return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {}); } } future<::shared_ptr> -query_processor::process( +query_processor::execute_with_params( statements::prepared_statement::checked_weak_ptr p, db::consistency_level cl, const timeout_config& timeout_config, @@ -862,7 +839,7 @@ query_processor::process( } future<::shared_ptr> -query_processor::process_batch( +query_processor::execute_batch( ::shared_ptr batch, service::query_state& query_state, query_options& options, diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index f69d663768..a3c711b102 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -201,13 +201,13 @@ public: query_options& options); future<::shared_ptr> - execute_internal(const sstring& query_string, const std::initializer_list& = { }); + execute_internal(const sstring& query_string, const std::initializer_list& values = { }) { + return execute_internal(query_string, db::consistency_level::ONE, + infinite_timeout_config, values, true); + } statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query); - future<::shared_ptr> - execute_internal(statements::prepared_statement::checked_weak_ptr p, const std::initializer_list& = { }); - /*! * \brief iterate over all cql results using paging * @@ -284,14 +284,14 @@ public: noncopyable_function(const cql3::untyped_result_set_row&)>&& f); - future<::shared_ptr> process( + future<::shared_ptr> execute_internal( const sstring& query_string, db::consistency_level, const timeout_config& timeout_config, const std::initializer_list& = { }, bool cache = false); - future<::shared_ptr> process( + future<::shared_ptr> execute_with_params( statements::prepared_statement::checked_weak_ptr p, db::consistency_level, const timeout_config& timeout_config, @@ -306,7 +306,7 @@ public: future<> stop(); future<::shared_ptr> - process_batch( + execute_batch( ::shared_ptr, service::query_state& query_state, query_options& options, diff --git a/db/query_context.hh b/db/query_context.hh index e43321af77..d3373c16ca 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -59,7 +59,7 @@ struct query_context { db::timeout_clock::duration::zero(); return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) { - return _qp.local().process(req, + return _qp.local().execute_internal(req, cql3::query_options::DEFAULT.get_consistency(), tcfg, { data_value(std::forward(args))... }, diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 0adc551944..567e9bab2f 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -158,7 +158,7 @@ static const timeout_config internal_distributed_timeout_config = [] { }(); future> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const { - return _qp.process( + return _qp.execute_internal( format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, internal_distributed_timeout_config, @@ -175,7 +175,7 @@ future> system_distributed_keyspace::vi future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const { return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { - return _qp.process( + return _qp.execute_internal( format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, internal_distributed_timeout_config, @@ -186,7 +186,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const { return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { - return _qp.process( + return _qp.execute_internal( format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, internal_distributed_timeout_config, @@ -196,7 +196,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring } future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const { - return _qp.process( + return _qp.execute_internal( format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, internal_distributed_timeout_config, @@ -282,7 +282,7 @@ system_distributed_keyspace::insert_cdc_topology_description( db_clock::time_point time, const cdc::topology_description& description, context ctx) { - return _qp.process( + return _qp.execute_internal( format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, @@ -294,7 +294,7 @@ future> system_distributed_keyspace::read_cdc_topology_description( db_clock::time_point time, context ctx) { - return _qp.process( + return _qp.execute_internal( format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, @@ -322,7 +322,7 @@ system_distributed_keyspace::expire_cdc_topology_description( db_clock::time_point streams_ts, db_clock::time_point expiration_time, context ctx) { - return _qp.process( + return _qp.execute_internal( format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, @@ -343,7 +343,7 @@ system_distributed_keyspace::create_cdc_desc( db_clock::time_point time, const std::vector& streams, context ctx) { - return _qp.process( + return _qp.execute_internal( format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, @@ -356,7 +356,7 @@ system_distributed_keyspace::expire_cdc_desc( db_clock::time_point streams_ts, db_clock::time_point expiration_time, context ctx) { - return _qp.process( + return _qp.execute_internal( format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, @@ -368,7 +368,7 @@ future system_distributed_keyspace::cdc_desc_exists( db_clock::time_point streams_ts, context ctx) { - return _qp.process( + return _qp.execute_internal( format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC), quorum_if_many(ctx.num_token_owners), internal_distributed_timeout_config, diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 1be1a29f1c..66287b6d24 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -584,7 +584,7 @@ public: local_qp().get_cql_stats()); auto qs = make_query_state(); auto& lqo = *qo; - return local_qp().process_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo)] {}); + return local_qp().execute_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo)] {}); } }; diff --git a/transport/server.cc b/transport/server.cc index 3c9f57a76d..9576a61eb3 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1043,7 +1043,7 @@ process_batch_internal(service::client_state& client_state, distributed(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), qp.local().get_cql_stats()); - return qp.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries)) + return qp.local().execute_batch(batch, query_state, options, std::move(pending_authorization_entries)) .then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) { if (msg->move_to_shard()) { return std::variant>, unsigned>(*msg->move_to_shard());