cql3: add paging to read_posting_list

Instead of a single query, paging is used in order to query
an index.
This commit is contained in:
Piotr Sarna
2018-07-25 12:10:41 +02:00
parent b83aa69a2e
commit 7c1e4c2deb

View File

@@ -793,7 +793,8 @@ read_posting_list(service::storage_proxy& proxy,
int32_t limit,
service::query_state& state,
gc_clock::time_point now,
db::timeout_clock::time_point timeout)
db::timeout_clock::time_point timeout,
cql3::cql_stats& stats)
{
dht::partition_range_vector partition_ranges;
// FIXME: there should be only one index restriction for this index!
@@ -854,28 +855,37 @@ read_posting_list(service::storage_proxy& proxy,
query::max_partitions,
utils::UUID(),
options.get_timestamp(state));
return proxy.query(view_schema,
cmd,
std::move(partition_ranges),
options.get_consistency(),
{timeout, state.get_trace_state()})
.then([base_schema, view_schema, now, &options, partition_slice = std::move(partition_slice)](service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : base_schema->partition_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
if constexpr (std::is_same_v<KeyType, clustering_key>) {
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
}
auto selection = selection::selection::for_columns(view_schema, columns);
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
query::result_view::consume(*qr.query_result,
std::move(partition_slice),
cql3::selection::result_set_builder::visitor(builder, *view_schema, *selection));
return ::make_shared<cql_transport::messages::result_message::rows>(std::move(result(builder.build())));
});
std::vector<const column_definition*> columns;
for (const column_definition& cdef : base_schema->partition_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
if constexpr (std::is_same_v<KeyType, clustering_key>) {
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
}
auto selection = selection::selection::for_columns(view_schema, columns);
int32_t page_size = options.get_page_size();
if (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*view_schema, page_size, *cmd, partition_ranges)) {
stats.unpaged_select_queries += 1;
return proxy.query(view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_trace_state()})
.then([base_schema, view_schema, now, &options, selection = std::move(selection), partition_slice = std::move(partition_slice)] (service::storage_proxy::coordinator_query_result qr) {
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
query::result_view::consume(*qr.query_result,
std::move(partition_slice),
cql3::selection::result_set_builder::visitor(builder, *view_schema, *selection));
return ::make_shared<cql_transport::messages::result_message::rows>(std::move(result(builder.build())));
});
}
auto p = service::pager::query_pagers::pager(view_schema, selection,
state, options, cmd, std::move(partition_ranges), stats, nullptr);
return p->fetch_page(options.get_page_size(), now, timeout).then([p, &options, limit, now] (std::unique_ptr<cql3::result_set> rs) {
rs->get_metadata().set_has_more_pages(p->state());
return ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
});
}
// Note: the partitions keys returned by this function are sorted
@@ -888,7 +898,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list<partition_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
return read_posting_list<partition_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout, _stats).then(
[this, now, &options, view] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
dht::partition_range_vector partition_ranges;
@@ -929,7 +939,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list<clustering_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
return read_posting_list<clustering_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout, _stats).then(
[this, now, &options, view] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);