cql3: make read_posting_list return future<rows>

Instead of returning a coordinator result and making a caller parse it
later, read_posting_list now extracts rows by itself.
This change is later needed when querying is replaced with a pager.
This commit is contained in:
Piotr Sarna
2018-07-25 11:29:53 +02:00
parent 1d34ef38a8
commit c3dd1775c8

View File

@@ -779,7 +779,11 @@ get_index_schema(service::storage_proxy& proxy,
// Utility function for reading from the index view (get_index_view()))
// the posting-list for a particular value of the indexed column.
// Remember a secondary index can only be created on a single column.
static future<service::storage_proxy::coordinator_query_result>
template<typename KeyType>
GCC6_CONCEPT(
requires (std::is_same_v<KeyType, partition_key> || std::is_same_v<KeyType, clustering_key>)
)
static future<::shared_ptr<cql_transport::messages::result_message::rows>>
read_posting_list(service::storage_proxy& proxy,
schema_ptr view_schema,
schema_ptr base_schema,
@@ -839,10 +843,11 @@ read_posting_list(service::storage_proxy& proxy,
}
}
auto partition_slice = partition_slice_builder.build();
auto cmd = ::make_lw_shared<query::read_command>(
view_schema->id(),
view_schema->version(),
partition_slice_builder.build(),
partition_slice,
limit,
now,
tracing::make_trace_info(state.get_trace_state()),
@@ -853,7 +858,24 @@ read_posting_list(service::storage_proxy& proxy,
cmd,
std::move(partition_ranges),
options.get_consistency(),
{timeout, state.get_trace_state()});
{timeout, state.get_trace_state()})
.then([base_schema, view_schema, now, &options, partition_slice = std::move(partition_slice)](service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : base_schema->partition_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
if constexpr (std::is_same_v<KeyType, clustering_key>) {
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
columns.emplace_back(view_schema->get_column_definition(cdef.name()));
}
}
auto selection = selection::selection::for_columns(view_schema, columns);
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
query::result_view::consume(*qr.query_result,
std::move(partition_slice),
cql3::selection::result_set_builder::visitor(builder, *view_schema, *selection));
return ::make_shared<cql_transport::messages::result_message::rows>(std::move(result(builder.build())));
});
}
// Note: the partitions keys returned by this function are sorted
@@ -866,21 +888,9 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : _schema->partition_key_columns()) {
columns.emplace_back(view->get_column_definition(cdef.name()));
}
auto selection = selection::selection::for_columns(view, columns);
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
// FIXME: read_posting_list already asks to read primary keys only.
// why do we need to specify this again?
auto slice = partition_slice_builder(*view).build();
query::result_view::consume(*qr.query_result,
slice,
cql3::selection::result_set_builder::visitor(builder, *view, *selection));
auto rs = cql3::untyped_result_set(::make_shared<cql_transport::messages::result_message::rows>(std::move(result(builder.build()))));
return read_posting_list<partition_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
dht::partition_range_vector partition_ranges;
partition_ranges.reserve(rs.size());
// We are reading the list of primary keys as rows of a single
@@ -918,24 +928,10 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : _schema->partition_key_columns()) {
columns.emplace_back(view->get_column_definition(cdef.name()));
}
for (const column_definition& cdef : _schema->clustering_key_columns()) {
columns.emplace_back(view->get_column_definition(cdef.name()));
}
auto selection = selection::selection::for_columns(view, columns);
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
// FIXME: read_posting_list already asks to read primary keys only.
// why do we need to specify this again?
auto slice = partition_slice_builder(*view).build();
query::result_view::consume(*qr.query_result,
slice,
cql3::selection::result_set_builder::visitor(builder, *view, *selection));
auto rs = cql3::untyped_result_set(::make_shared<cql_transport::messages::result_message::rows>(result(builder.build())));
return read_posting_list<clustering_key>(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
std::vector<primary_key> primary_keys;
primary_keys.reserve(rs.size());
for (size_t i = 0; i < rs.size(); i++) {