From c3dd1775c847a3cd3b5ebda12c2bc97985f013bd Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 25 Jul 2018 11:29:53 +0200 Subject: [PATCH] cql3: make read_posting_list return future Instead of returning a coordinator result and making a caller parse it later, read_posting_list now extracts rows by itself. This change is later needed when querying is replaced with a pager. --- cql3/statements/select_statement.cc | 68 ++++++++++++++--------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 1edd5e0c9f..d022507e96 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -779,7 +779,11 @@ get_index_schema(service::storage_proxy& proxy, // Utility function for reading from the index view (get_index_view())) // the posting-list for a particular value of the indexed column. // Remember a secondary index can only be created on a single column. -static future +template +GCC6_CONCEPT( + requires (std::is_same_v || std::is_same_v) +) +static future<::shared_ptr> read_posting_list(service::storage_proxy& proxy, schema_ptr view_schema, schema_ptr base_schema, @@ -839,10 +843,11 @@ read_posting_list(service::storage_proxy& proxy, } } + auto partition_slice = partition_slice_builder.build(); auto cmd = ::make_lw_shared( view_schema->id(), view_schema->version(), - partition_slice_builder.build(), + partition_slice, limit, now, tracing::make_trace_info(state.get_trace_state()), @@ -853,7 +858,24 @@ read_posting_list(service::storage_proxy& proxy, cmd, std::move(partition_ranges), options.get_consistency(), - {timeout, state.get_trace_state()}); + {timeout, state.get_trace_state()}) + .then([base_schema, view_schema, now, &options, partition_slice = std::move(partition_slice)](service::storage_proxy::coordinator_query_result qr) { + std::vector columns; + for (const column_definition& cdef : base_schema->partition_key_columns()) { + columns.emplace_back(view_schema->get_column_definition(cdef.name())); + } + if constexpr (std::is_same_v) { + for (const column_definition& cdef : base_schema->clustering_key_columns()) { + columns.emplace_back(view_schema->get_column_definition(cdef.name())); + } + } + auto selection = selection::selection::for_columns(view_schema, columns); + cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format()); + query::result_view::consume(*qr.query_result, + std::move(partition_slice), + cql3::selection::result_set_builder::visitor(builder, *view_schema, *selection)); + return ::make_shared(std::move(result(builder.build()))); + }); } // Note: the partitions keys returned by this function are sorted @@ -866,21 +888,9 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state()); auto now = gc_clock::now(); auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); - return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then( - [this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) { - std::vector columns; - for (const column_definition& cdef : _schema->partition_key_columns()) { - columns.emplace_back(view->get_column_definition(cdef.name())); - } - auto selection = selection::selection::for_columns(view, columns); - cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format()); - // FIXME: read_posting_list already asks to read primary keys only. - // why do we need to specify this again? - auto slice = partition_slice_builder(*view).build(); - query::result_view::consume(*qr.query_result, - slice, - cql3::selection::result_set_builder::visitor(builder, *view, *selection)); - auto rs = cql3::untyped_result_set(::make_shared(std::move(result(builder.build())))); + return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then( + [this, now, &options, view] (::shared_ptr rows) { + auto rs = cql3::untyped_result_set(rows); dht::partition_range_vector partition_ranges; partition_ranges.reserve(rs.size()); // We are reading the list of primary keys as rows of a single @@ -918,24 +928,10 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state()); auto now = gc_clock::now(); auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); - return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then( - [this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) { - std::vector columns; - for (const column_definition& cdef : _schema->partition_key_columns()) { - columns.emplace_back(view->get_column_definition(cdef.name())); - } - for (const column_definition& cdef : _schema->clustering_key_columns()) { - columns.emplace_back(view->get_column_definition(cdef.name())); - } - auto selection = selection::selection::for_columns(view, columns); - cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format()); - // FIXME: read_posting_list already asks to read primary keys only. - // why do we need to specify this again? - auto slice = partition_slice_builder(*view).build(); - query::result_view::consume(*qr.query_result, - slice, - cql3::selection::result_set_builder::visitor(builder, *view, *selection)); - auto rs = cql3::untyped_result_set(::make_shared(result(builder.build()))); + return read_posting_list(proxy, view, _schema, _index, _restrictions, options, get_limit(options), state, now, timeout).then( + [this, now, &options, view] (::shared_ptr rows) { + + auto rs = cql3::untyped_result_set(rows); std::vector primary_keys; primary_keys.reserve(rs.size()); for (size_t i = 0; i < rs.size(); i++) {