mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-06 23:13:15 +00:00
Merge "Get rid of cql_statement::execute_internal" from Avi
execute_internal() duplicates several code paths, especially in
the select path, for no good reason. It boils down to timeout and
consistency level selection which can be done based on
client_state::is_internal().
This patchset eliminated the duplication and execute_internal(),
simplifying the code.
* github.com:avikivity/scylla cql-no-execute_internal/v2:
cql: schema_altering_statement: make execute() and execute_internal()
equivalent
cql: select_statement: make execute() and execute_internal()
equivalent
cql: query_processor: don't call cql_statement::execute_internal() any
more
cql: cql_statement: remove execute_internal()
This commit is contained in:
@@ -98,14 +98,6 @@ public:
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) = 0;
|
||||
|
||||
/**
|
||||
* Variant of execute used for internal query against the system tables, and thus only query the local node = 0.
|
||||
*
|
||||
* @param state the current query state
|
||||
*/
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) = 0;
|
||||
|
||||
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const = 0;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const = 0;
|
||||
|
||||
@@ -291,12 +291,7 @@ query_processor::process_authorized_statement(const ::shared_ptr<cql_statement>
|
||||
|
||||
statement->validate(_proxy, client_state);
|
||||
|
||||
auto fut = make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
||||
if (client_state.is_internal()) {
|
||||
fut = statement->execute_internal(_proxy, query_state, options);
|
||||
} else {
|
||||
fut = statement->execute(_proxy, query_state, options);
|
||||
}
|
||||
auto fut = statement->execute(_proxy, query_state, options);
|
||||
|
||||
return fut.then([statement] (auto msg) {
|
||||
if (msg) {
|
||||
@@ -522,7 +517,7 @@ future<> query_processor::for_each_cql_result(
|
||||
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_paged_internal(::shared_ptr<internal_query_state> state) {
|
||||
return state->p->statement->execute_internal(_proxy, *_internal_state, *state->opts).then(
|
||||
return state->p->statement->execute(_proxy, *_internal_state, *state->opts).then(
|
||||
[state, this](::shared_ptr<cql_transport::messages::result_message> msg) mutable {
|
||||
class visitor : public result_message::visitor_base {
|
||||
::shared_ptr<internal_query_state> _state;
|
||||
@@ -563,7 +558,7 @@ query_processor::execute_internal(
|
||||
const std::initializer_list<data_value>& values) {
|
||||
query_options opts = make_internal_options(p, values, db::consistency_level::ONE, infinite_timeout_config);
|
||||
return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) {
|
||||
return p->statement->execute_internal(
|
||||
return p->statement->execute(
|
||||
_proxy,
|
||||
*_internal_state,
|
||||
opts).then([&opts, stmt = p->statement](auto msg) {
|
||||
|
||||
@@ -74,10 +74,3 @@ void cql3::statements::authentication_statement::validate(
|
||||
future<> cql3::statements::authentication_statement::check_access(const service::client_state& state) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> cql3::statements::authentication_statement::execute_internal(
|
||||
service::storage_proxy& proxy,
|
||||
service::query_state& state, const query_options& options) {
|
||||
// Internal queries are exclusively on the system keyspace and makes no sense here
|
||||
throw std::runtime_error("unsupported operation");
|
||||
}
|
||||
|
||||
@@ -67,9 +67,6 @@ public:
|
||||
future<> check_access(const service::client_state& state) override;
|
||||
|
||||
void validate(service::storage_proxy&, const service::client_state& state) override;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -75,13 +75,6 @@ future<> cql3::statements::authorization_statement::check_access(const service::
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> cql3::statements::authorization_statement::execute_internal(
|
||||
service::storage_proxy& proxy,
|
||||
service::query_state& state, const query_options& options) {
|
||||
// Internal queries are exclusively on the system keyspace and makes no sense here
|
||||
throw std::runtime_error("unsupported operation");
|
||||
}
|
||||
|
||||
void cql3::statements::authorization_statement::maybe_correct_resource(auth::resource& resource, const service::client_state& state) {
|
||||
if (resource.kind() == auth::resource_kind::data) {
|
||||
const auto data_view = auth::data_resource_view(resource);
|
||||
|
||||
@@ -72,9 +72,6 @@ public:
|
||||
|
||||
void validate(service::storage_proxy&, const service::client_state& state) override;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
|
||||
protected:
|
||||
static void maybe_correct_resource(auth::resource&, const service::client_state&);
|
||||
};
|
||||
|
||||
@@ -403,23 +403,6 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
#endif
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_internal(
|
||||
service::storage_proxy& proxy,
|
||||
service::query_state& query_state, const query_options& options)
|
||||
{
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
#if 0
|
||||
assert !hasConditions;
|
||||
for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
|
||||
{
|
||||
// We don't use counters internally.
|
||||
assert mutation instanceof Mutation;
|
||||
((Mutation) mutation).apply();
|
||||
}
|
||||
return null;
|
||||
#endif
|
||||
}
|
||||
|
||||
namespace raw {
|
||||
|
||||
std::unique_ptr<prepared_statement>
|
||||
|
||||
@@ -154,10 +154,6 @@ private:
|
||||
const query_options& options,
|
||||
service::query_state& state);
|
||||
public:
|
||||
virtual future<shared_ptr<cql_transport::messages::result_message>> execute_internal(
|
||||
service::storage_proxy& proxy,
|
||||
service::query_state& query_state, const query_options& options) override;
|
||||
|
||||
// FIXME: no cql_statement::to_string() yet
|
||||
#if 0
|
||||
sstring to_string() const {
|
||||
|
||||
@@ -438,26 +438,6 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
|
||||
#endif
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
modification_statement::execute_internal(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {
|
||||
if (has_conditions()) {
|
||||
throw exceptions::unsupported_operation_exception();
|
||||
}
|
||||
|
||||
tracing::add_table_name(qs.get_trace_state(), keyspace(), column_family());
|
||||
|
||||
inc_cql_stats();
|
||||
|
||||
return get_mutations(proxy, options, true, options.get_timestamp(qs), qs.get_trace_state()).then(
|
||||
[&proxy] (auto mutations) {
|
||||
return proxy.mutate_locally(std::move(mutations));
|
||||
}).then(
|
||||
[] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
|
||||
::shared_ptr<cql_transport::messages::result_message> {});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
modification_statement::process_where_clause(database& db, std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names) {
|
||||
_restrictions = ::make_shared<restrictions::statement_restrictions>(
|
||||
|
||||
@@ -217,9 +217,6 @@ public:
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) override;
|
||||
|
||||
private:
|
||||
future<>
|
||||
execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options);
|
||||
|
||||
@@ -109,18 +109,17 @@ schema_altering_statement::execute0(service::storage_proxy& proxy, service::quer
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
schema_altering_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) {
|
||||
return execute0(proxy, state, options, false).then([this, &state](::shared_ptr<messages::result_message> result) {
|
||||
return grant_permissions_to_creator(state.get_client_state()).then([result = std::move(result)] {
|
||||
bool internal = state.get_client_state().is_internal();
|
||||
return execute0(proxy, state, options, internal).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
auto permissions_granted_fut = internal
|
||||
? make_ready_future<>()
|
||||
: grant_permissions_to_creator(state.get_client_state());
|
||||
return permissions_granted_fut.then([result = std::move(result)] {
|
||||
return result;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
schema_altering_statement::execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) {
|
||||
return execute0(proxy, state, options, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -93,9 +93,6 @@ protected:
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute_internal(service::storage_proxy&, service::query_state& state, const query_options& options) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -532,48 +532,6 @@ select_statement::execute(service::storage_proxy& proxy,
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute_internal(service::storage_proxy& proxy,
|
||||
service::query_state& state,
|
||||
const query_options& options)
|
||||
{
|
||||
if (options.get_specific_options().page_size > 0) {
|
||||
// need page, use regular execute
|
||||
return do_execute(proxy, state, options);
|
||||
}
|
||||
int32_t limit = get_limit(options);
|
||||
auto now = gc_clock::now();
|
||||
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(),
|
||||
make_partition_slice(options), limit, now, std::experimental::nullopt, query::max_partitions, utils::UUID(), options.get_timestamp(state));
|
||||
auto partition_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
|
||||
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
|
||||
|
||||
++_stats.reads;
|
||||
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::move(partition_ranges), [this, &proxy, &state, command] (auto& prs) {
|
||||
assert(command->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(command->row_limit * prs.size(), query::max_partitions);
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto& pr) {
|
||||
dht::partition_range_vector prange { pr };
|
||||
auto cmd = ::make_lw_shared<query::read_command>(*command);
|
||||
return proxy.query(_schema, cmd, std::move(prange), db::consistency_level::ONE, {db::no_timeout, state.get_trace_state(),
|
||||
}).then([] (service::storage_proxy::coordinator_query_result qr) {
|
||||
return std::move(qr.query_result);
|
||||
});
|
||||
}, std::move(merger));
|
||||
}).then([command, this, &options, now] (auto result) {
|
||||
return this->process_results(std::move(result), command, options, now);
|
||||
}).finally([command] { });
|
||||
} else {
|
||||
auto query = proxy.query(_schema, command, std::move(partition_ranges), db::consistency_level::ONE, {db::no_timeout, state.get_trace_state()});
|
||||
return query.then([command, this, &options, now] (service::storage_proxy::coordinator_query_result qr) {
|
||||
return this->process_results(std::move(qr.query_result), command, options, now);
|
||||
}).finally([command] {});
|
||||
}
|
||||
}
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message>
|
||||
select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> results,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
@@ -117,9 +117,6 @@ public:
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(service::storage_proxy& proxy,
|
||||
service::query_state& state, const query_options& options) override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_internal(service::storage_proxy& proxy,
|
||||
service::query_state& state, const query_options& options) override;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(service::storage_proxy& proxy,
|
||||
lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state,
|
||||
const query_options& options, gc_clock::time_point now);
|
||||
|
||||
@@ -106,12 +106,6 @@ truncate_statement::execute(service::storage_proxy& proxy, service::query_state&
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
truncate_statement::execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options)
|
||||
{
|
||||
throw std::runtime_error("unsupported operation");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -70,9 +70,6 @@ public:
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -105,12 +105,6 @@ use_statement::execute(service::storage_proxy& proxy, service::query_state& stat
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(result);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
use_statement::execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) {
|
||||
// Internal queries are exclusively on the system keyspace and 'use' is thus useless
|
||||
throw std::runtime_error("unsupported operation");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -71,9 +71,6 @@ public:
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_internal(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user