query_processor: fold one execute_internal() into another.

All internal execution always uses query text as a key in the
cache of internal prepared statements. There is no need
to publish API for executing an internal prepared statement object.

The folded execute_internal() calls an internal prepare() and then
internal execute().
execute_internal(cache=true) does exactly that.
This commit is contained in:
Konstantin Osipov
2020-02-06 21:19:57 +03:00
parent 2e07c76153
commit 93db4d748c
12 changed files with 57 additions and 80 deletions

View File

@@ -129,7 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
auto cl = auth::password_authenticator::consistency_for_user(username);
auto timeout = auth::internal_distributed_timeout_config();
return qp.process(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
auto res = f.get0();
auto salted_hash = std::optional<sstring>();
if (res->empty()) {

View File

@@ -101,7 +101,7 @@ bool default_authorizer::legacy_metadata_exists() const {
future<bool> default_authorizer::any_granted() const {
static const sstring query = format("SELECT * FROM {}.{} LIMIT 1", meta::AUTH_KS, PERMISSIONS_CF);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
@@ -115,7 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const {
alogger.info("Starting migration of legacy permissions metadata.");
static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
@@ -195,7 +195,7 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
ROLE_NAME,
RESOURCE_NAME);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
@@ -224,7 +224,7 @@ default_authorizer::modify(
ROLE_NAME,
RESOURCE_NAME),
[this, &role_name, set, &resource](const auto& query) {
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
@@ -249,7 +249,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
meta::AUTH_KS,
PERMISSIONS_CF);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
@@ -276,7 +276,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const {
PERMISSIONS_CF,
ROLE_NAME);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_timeout_config(),
@@ -296,7 +296,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
PERMISSIONS_CF,
RESOURCE_NAME);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
@@ -313,7 +313,7 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
ROLE_NAME,
RESOURCE_NAME);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,

View File

@@ -111,7 +111,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
plogger.info("Starting migration of legacy authentication metadata.");
static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
@@ -119,7 +119,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
auto username = row.get_as<sstring>("username");
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
return _qp.process(
return _qp.execute_internal(
update_row_query,
consistency_for_user(username),
internal_distributed_timeout_config(),
@@ -136,7 +136,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
future<> password_authenticator::create_default_if_missing() const {
return default_role_row_satisfies(_qp, &has_salted_hash).then([this](bool exists) {
if (!exists) {
return _qp.process(
return _qp.execute_internal(
update_row_query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
@@ -233,7 +233,7 @@ future<authenticated_user> password_authenticator::authenticate(
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_user(username),
internal_distributed_timeout_config(),
@@ -267,7 +267,7 @@ future<> password_authenticator::create(std::string_view role_name, const authen
return make_ready_future<>();
}
return _qp.process(
return _qp.execute_internal(
update_row_query,
consistency_for_user(role_name),
internal_distributed_timeout_config(),
@@ -284,7 +284,7 @@ future<> password_authenticator::alter(std::string_view role_name, const authent
SALTED_HASH,
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_user(role_name),
internal_distributed_timeout_config(),
@@ -297,7 +297,7 @@ future<> password_authenticator::drop(std::string_view name) const {
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query, consistency_for_user(name),
internal_distributed_timeout_config(),
{sstring(name)}).discard_result();

View File

@@ -68,14 +68,14 @@ future<bool> default_role_row_satisfies(
meta::roles_table::role_col_name);
return do_with(std::move(p), [&qp](const auto& p) {
return qp.process(
return qp.execute_internal(
query,
db::consistency_level::ONE,
infinite_timeout_config,
{meta::DEFAULT_SUPERUSER_NAME},
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
return qp.process(
return qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
@@ -100,7 +100,7 @@ future<bool> any_nondefault_role_row_satisfies(
static const sstring query = format("SELECT * FROM {}", meta::roles_table::qualified_name());
return do_with(std::move(p), [&qp](const auto& p) {
return qp.process(
return qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {

View File

@@ -217,7 +217,7 @@ future<bool> service::has_existing_legacy_users() const {
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
// can potentially avoid doing a range query with a high consistency level.
return _qp.process(
return _qp.execute_internal(
default_user_query,
db::consistency_level::ONE,
infinite_timeout_config,
@@ -227,7 +227,7 @@ future<bool> service::has_existing_legacy_users() const {
return make_ready_future<bool>(true);
}
return _qp.process(
return _qp.execute_internal(
default_user_query,
db::consistency_level::QUORUM,
infinite_timeout_config,
@@ -237,7 +237,7 @@ future<bool> service::has_existing_legacy_users() const {
return make_ready_future<bool>(true);
}
return _qp.process(
return _qp.execute_internal(
all_users_query,
db::consistency_level::QUORUM,
infinite_timeout_config).then([](auto results) {

View File

@@ -87,7 +87,7 @@ static future<std::optional<record>> find_record(cql3::query_processor& qp, std:
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return qp.process(
return qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
@@ -171,7 +171,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config(),
@@ -198,7 +198,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
log.info("Starting migration of legacy user metadata.");
static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
@@ -259,7 +259,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
@@ -299,7 +299,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
return make_ready_future<>();
}
return _qp.process(
return _qp.execute_internal(
format("UPDATE {} SET {} WHERE {} = ?",
meta::roles_table::qualified_name(),
build_column_assignments(u),
@@ -321,7 +321,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
static const sstring query = format("SELECT member FROM {} WHERE role = ?",
meta::role_members_table::qualified_name());
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
@@ -360,7 +360,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
meta::roles_table::qualified_name(),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_timeout_config(),
@@ -387,7 +387,7 @@ standard_role_manager::modify_membership(
(ch == membership_change::add ? '+' : '-'),
meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
consistency_for_role(grantee_name),
internal_distributed_timeout_config(),
@@ -397,7 +397,7 @@ standard_role_manager::modify_membership(
const auto modify_role_members = [this, role_name, grantee_name, ch] {
switch (ch) {
case membership_change::add:
return _qp.process(
return _qp.execute_internal(
format("INSERT INTO {} (role, member) VALUES (?, ?)",
meta::role_members_table::qualified_name()),
consistency_for_role(role_name),
@@ -405,7 +405,7 @@ standard_role_manager::modify_membership(
{sstring(role_name), sstring(grantee_name)}).discard_result();
case membership_change::remove:
return _qp.process(
return _qp.execute_internal(
format("DELETE FROM {} WHERE role = ? AND member = ?",
meta::role_members_table::qualified_name()),
consistency_for_role(role_name),
@@ -509,7 +509,7 @@ future<role_set> standard_role_manager::query_all() const {
// To avoid many copies of a view.
static const auto role_col_name_string = sstring(meta::roles_table::role_col_name);
return _qp.process(
return _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_timeout_config()).then([](::shared_ptr<cql3::untyped_result_set> results) {

View File

@@ -683,14 +683,6 @@ statements::prepared_statement::checked_weak_ptr query_processor::prepare_intern
return p->checked_weak_from_this();
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values) {
if (log.is_enabled(logging::log_level::trace)) {
log.trace("execute_internal: \"{}\" ({})", query_string, ::join(", ", values));
}
return execute_internal(prepare_internal(query_string), values);
}
struct internal_query_state {
sstring query_string;
std::unique_ptr<query_options> opts;
@@ -817,38 +809,23 @@ query_processor::execute_paged_internal(::shared_ptr<internal_query_state> state
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(
statements::prepared_statement::checked_weak_ptr p,
const std::initializer_list<data_value>& values) {
query_options opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config);
return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) {
return p->statement->execute(
_proxy,
*_internal_state,
opts).then([&opts, stmt = p->statement](auto msg) {
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
});
});
}
future<::shared_ptr<untyped_result_set>>
query_processor::process(
const sstring& query_string,
db::consistency_level cl,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& values,
bool cache) {
if (cache) {
return process(prepare_internal(query_string), cl, timeout_config, values);
return execute_with_params(prepare_internal(query_string), cl, timeout_config, values);
} else {
auto p = parse_statement(query_string)->prepare(_db, _cql_stats);
p->statement->validate(_proxy, *_internal_state);
auto checked_weak_ptr = p->checked_weak_from_this();
return process(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
}
}
future<::shared_ptr<untyped_result_set>>
query_processor::process(
query_processor::execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level cl,
const timeout_config& timeout_config,
@@ -862,7 +839,7 @@ query_processor::process(
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::process_batch(
query_processor::execute_batch(
::shared_ptr<statements::batch_statement> batch,
service::query_state& query_state,
query_options& options,

View File

@@ -201,13 +201,13 @@ public:
query_options& options);
future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& = { });
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
return execute_internal(query_string, db::consistency_level::ONE,
infinite_timeout_config, values, true);
}
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
future<::shared_ptr<untyped_result_set>>
execute_internal(statements::prepared_statement::checked_weak_ptr p, const std::initializer_list<data_value>& = { });
/*!
* \brief iterate over all cql results using paging
*
@@ -284,14 +284,14 @@ public:
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)>&& f);
future<::shared_ptr<untyped_result_set>> process(
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
const timeout_config& timeout_config,
const std::initializer_list<data_value>& = { },
bool cache = false);
future<::shared_ptr<untyped_result_set>> process(
future<::shared_ptr<untyped_result_set>> execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level,
const timeout_config& timeout_config,
@@ -306,7 +306,7 @@ public:
future<> stop();
future<::shared_ptr<cql_transport::messages::result_message>>
process_batch(
execute_batch(
::shared_ptr<statements::batch_statement>,
service::query_state& query_state,
query_options& options,

View File

@@ -59,7 +59,7 @@ struct query_context {
db::timeout_clock::duration::zero();
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) {
return _qp.local().process(req,
return _qp.local().execute_internal(req,
cql3::query_options::DEFAULT.get_consistency(),
tcfg,
{ data_value(std::forward<Args>(args))... },

View File

@@ -158,7 +158,7 @@ static const timeout_config internal_distributed_timeout_config = [] {
}();
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
return _qp.process(
return _qp.execute_internal(
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
@@ -175,7 +175,7 @@ future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::vi
future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const {
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
return _qp.process(
return _qp.execute_internal(
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
@@ -186,7 +186,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const {
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
return _qp.process(
return _qp.execute_internal(
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
@@ -196,7 +196,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
}
future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const {
return _qp.process(
return _qp.execute_internal(
format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
internal_distributed_timeout_config,
@@ -282,7 +282,7 @@ system_distributed_keyspace::insert_cdc_topology_description(
db_clock::time_point time,
const cdc::topology_description& description,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
@@ -294,7 +294,7 @@ future<std::optional<cdc::topology_description>>
system_distributed_keyspace::read_cdc_topology_description(
db_clock::time_point time,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
@@ -322,7 +322,7 @@ system_distributed_keyspace::expire_cdc_topology_description(
db_clock::time_point streams_ts,
db_clock::time_point expiration_time,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
@@ -343,7 +343,7 @@ system_distributed_keyspace::create_cdc_desc(
db_clock::time_point time,
const std::vector<cdc::stream_id>& streams,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("INSERT INTO {}.{} (time, streams) VALUES (?,?)", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
@@ -356,7 +356,7 @@ system_distributed_keyspace::expire_cdc_desc(
db_clock::time_point streams_ts,
db_clock::time_point expiration_time,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,
@@ -368,7 +368,7 @@ future<bool>
system_distributed_keyspace::cdc_desc_exists(
db_clock::time_point streams_ts,
context ctx) {
return _qp.process(
return _qp.execute_internal(
format("SELECT time FROM {}.{} WHERE time = ?", NAME, CDC_DESC),
quorum_if_many(ctx.num_token_owners),
internal_distributed_timeout_config,

View File

@@ -584,7 +584,7 @@ public:
local_qp().get_cql_stats());
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().process_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo)] {});
return local_qp().execute_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo)] {});
}
};

View File

@@ -1043,7 +1043,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
}
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), qp.local().get_cql_stats());
return qp.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries))
return qp.local().execute_batch(batch, query_state, options, std::move(pending_authorization_entries))
.then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) {
if (msg->move_to_shard()) {
return std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>(*msg->move_to_shard());