Add paging for internal queries

Usually, internal queries are used for short queries. Sometimes though,
like in the case of get compaction history, there could be a large
amount of results. Without paging it will overload the system.

This patch adds the ability to use paging internally.

Using paging will be done explicitely, all the relevant information
would be store in an internal_query_state, that would hold both the
paging state but also the query so consecutive calls can be made.

To use paging use the query method with a function.

The function gets beside a statement and its parameters a function that
will be used for each of the returned rows.

For example if qp is a query_processor:

qp.query("SELECT * from system.compaction_history", [] (const cql3::untyped_result_set::row& row) {
  ....
  // do something with row
  ...
  return stop_iteration::no; // keep on reading
});

Will run the function on each of the compaction history table rows.

To stop the iteration, the function can return stop_iteration::yes.
This commit is contained in:
Amnon Heiman
2017-06-12 13:20:43 +03:00
parent 45b3e8cd11
commit 08c81427b9
3 changed files with 162 additions and 4 deletions

View File

@@ -345,7 +345,8 @@ query_processor::parse_statement(const sstring_view& query)
query_options query_processor::make_internal_options(const statements::prepared_statement::checked_weak_ptr& p,
const std::initializer_list<data_value>& values,
db::consistency_level cl)
db::consistency_level cl,
int32_t page_size)
{
if (p->bound_names.size() != values.size()) {
throw std::invalid_argument(sprint("Invalid number of values. Expecting %d but got %d", p->bound_names.size(), values.size()));
@@ -362,6 +363,12 @@ query_options query_processor::make_internal_options(const statements::prepared_
bound_values.push_back(cql3::raw_value::make_value(n->type->decompose(v)));
}
}
if (page_size > 0) {
::shared_ptr<service::pager::paging_state> paging_state;
db::consistency_level serial_consistency = db::consistency_level::SERIAL;
api::timestamp_type ts = api::missing_timestamp;
return query_options(cl, bound_values, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
}
return query_options(cl, bound_values);
}
@@ -386,13 +393,101 @@ query_processor::execute_internal(const sstring& query_string,
return execute_internal(prepare_internal(query_string), values);
}
struct internal_query_state {
sstring query_string;
std::unique_ptr<query_options> opts;
statements::prepared_statement::checked_weak_ptr p;
bool more_results = true;
};
::shared_ptr<internal_query_state> query_processor::create_paged_state(const sstring& query_string,
const std::initializer_list<data_value>& values, int32_t page_size) {
auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, db::consistency_level::ONE, page_size);
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), true});
return res;
}
bool query_processor::has_more_results(::shared_ptr<cql3::internal_query_state> state) const {
if (state) {
return state->more_results;
}
return false;
}
future<> query_processor::for_each_cql_result(::shared_ptr<cql3::internal_query_state> state,
std::function<stop_iteration(const cql3::untyped_result_set::row&)>&& f) {
return do_with(seastar::shared_ptr<bool>(), [f, this, state](auto& is_done) mutable {
is_done = seastar::make_shared<bool>(false);
auto stop_when = [is_done]() {
return *is_done;
};
auto do_resuls = [is_done, state, f, this]() mutable {
return this->execute_paged_internal(state).then([is_done, state, f, this](::shared_ptr<cql3::untyped_result_set> msg) mutable {
if (msg->empty()) {
*is_done = true;
} else {
if (!this->has_more_results(state)) {
*is_done = true;
}
for (auto& row : *msg) {
if (f(row) == stop_iteration::yes) {
*is_done = true;
break;
}
}
}
});
};
return do_until(stop_when, do_resuls);
});
}
future<::shared_ptr<untyped_result_set>> query_processor::execute_paged_internal(::shared_ptr<internal_query_state> state) {
return state->p->statement->execute_internal(_proxy, *_internal_state, *state->opts).then(
[state, this](::shared_ptr<cql_transport::messages::result_message> msg) mutable {
class visitor : public result_message::visitor_base {
::shared_ptr<internal_query_state> _state;
query_processor& _qp;
public:
visitor(::shared_ptr<internal_query_state> state, query_processor& qp) : _state(state), _qp(qp) {
}
virtual ~visitor() = default;
void visit(const result_message::rows& rmrs) override {
auto& rs = rmrs.rs();
if (rs.get_metadata().paging_state()) {
bool done = !rs.get_metadata().flags().contains<cql3::metadata::flag::HAS_MORE_PAGES>();
if (done) {
_state->more_results = false;
} else {
const service::pager::paging_state& st = *rs.get_metadata().paging_state();
shared_ptr<service::pager::paging_state> shrd = ::make_shared<service::pager::paging_state>(st);
_state->opts = std::make_unique<query_options>(std::move(_state->opts), shrd);
_state->p = _qp.prepare_internal(_state->query_string);
}
} else {
_state->more_results = false;
}
}
};
visitor v(state, *this);
if (msg != nullptr) {
msg->accept(v);
}
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
});
}
future<::shared_ptr<untyped_result_set>>
query_processor::execute_internal(statements::prepared_statement::checked_weak_ptr p,
const std::initializer_list<data_value>& values)
{
auto opts = make_internal_options(p, values);
query_options opts = make_internal_options(p, values);
return do_with(std::move(opts), [this, p = std::move(p)](auto& opts) {
return p->statement->execute_internal(_proxy, *_internal_state, opts).then([stmt = p->statement](auto msg) {
return p->statement->execute_internal(_proxy, *_internal_state, opts).then([&opts, stmt = p->statement](auto msg) {
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
});
});

View File

@@ -64,6 +64,13 @@ namespace statements {
class batch_statement;
}
/*!
* \brief to allow paging, holds
* internal state, that needs to be passed to the execute statement.
*
*/
struct internal_query_state;
class query_processor {
public:
class migration_subscriber;
@@ -340,7 +347,30 @@ public:
}
#endif
private:
query_options make_internal_options(const statements::prepared_statement::checked_weak_ptr& p, const std::initializer_list<data_value>&, db::consistency_level = db::consistency_level::ONE);
query_options make_internal_options(const statements::prepared_statement::checked_weak_ptr& p, const std::initializer_list<data_value>&, db::consistency_level = db::consistency_level::ONE,
int32_t page_size = -1);
/*!
* \brief created a state object for paging
*
* When using paging internally a state object is needed.
*/
::shared_ptr<internal_query_state> create_paged_state(const sstring& query_string, const std::initializer_list<data_value>& = { }, int32_t page_size = 1000);
/*!
* \brief run a query using paging
*/
future<::shared_ptr<untyped_result_set>> execute_paged_internal(::shared_ptr<internal_query_state> state);
/*!
* \brief iterate over all results using paging
*/
future<> for_each_cql_result(::shared_ptr<cql3::internal_query_state> state,
std::function<stop_iteration(const cql3::untyped_result_set::row&)>&& f);
/*!
* \brief check, based on the state if there are additional results
* Users of the paging, should not use the internal_query_state directly
*/
bool has_more_results(::shared_ptr<cql3::internal_query_state> state) const;
public:
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
@@ -352,6 +382,35 @@ public:
statements::prepared_statement::checked_weak_ptr p,
const std::initializer_list<data_value>& = { });
/*!
* \brief iterate over all cql results using paging
*
* You Create a statement with optional paraemter and pass
* a function that goes over the results.
*
* The passed function would be called for all the results, return stop_iteration::yes
* to stop during iteration.
*
* For example:
return query("SELECT * from system.compaction_history", [&history] (const cql3::untyped_result_set::row& row) mutable {
....
....
return stop_iteration::no;
});
* You can use place holder in the query, the prepared statement will only be done once.
*
*
* query_string - the cql string, can contain place holder
* f - a function to be run on each of the query result, if the function return false the iteration would stop
* args - arbitrary number of query parameters
*/
template<typename... Args>
future<> query(const sstring& query_string, std::function<stop_iteration(const cql3::untyped_result_set::row&)>&& f, Args&&... args) {
return for_each_cql_result(create_paged_state(query_string, { data_value(std::forward<Args>(args))... }), std::move(f));
}
future<::shared_ptr<untyped_result_set>> process(
const sstring& query_string,
db::consistency_level, const std::initializer_list<data_value>& = { }, bool cache = false);

View File

@@ -341,6 +341,10 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
service::query_state& state,
const query_options& options)
{
if (options.get_specific_options().page_size > 0) {
// need page, use regular execute
return do_execute(proxy, state, options);
}
int32_t limit = get_limit(options);
auto now = gc_clock::now();
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(),