diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 6b9d26edd0..9dcf00da56 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -49,18 +49,23 @@ static logging::logger logger("paging"); class service::pager::query_pagers::impl : public query_pager { public: impl(schema_ptr s, ::shared_ptr selection, - service::query_state& state, const cql3::query_options& options, - lw_shared_ptr cmd, - std::vector ranges) - : _max(cmd->row_limit), _schema(std::move(s)), _selection( - selection), _state(state), _options(options), _cmd( - std::move(cmd)), _ranges(std::move(ranges)) { - } + service::query_state& state, + const cql3::query_options& options, + lw_shared_ptr cmd, + std::vector ranges) + : _has_clustering_keys(s->clustering_key_size() > 0) + , _max(cmd->row_limit) + , _schema(std::move(s)) + , _selection(selection) + , _state(state) + , _options(options) + , _cmd(std::move(cmd)) + , _ranges(std::move(ranges)) + {} private: future<> fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, db_clock::time_point now) override { auto state = _options.get_paging_state(); - uint32_t extra = 0; if (!_last_pkey && state) { _max = state->get_remaining(); @@ -76,11 +81,13 @@ private: logger.trace("PKey={}, CKey={}, reversed={}", dpk, *_last_ckey, reversed); - bool has_cks = _last_ckey && _schema->clustering_key_size() > 0; - // Note: we're assuming both that the ranges are checked // and "cql-compliant", and that storage_proxy will process // the ranges in order + // + // If the original query has singular restrictions like "col in (x, y, z)", + // we will eventually generate an empty range. This is ok, because empty range == nothing, + // which is what we thus mean. auto modify_ranges = [reversed](auto& ranges, auto& lo, bool inclusive, const auto& cmp) { typedef typename std::remove_reference_t::value_type range_type; typedef typename range_type::bound bound_type; @@ -116,11 +123,16 @@ private: logger.trace("Result ranges {}", ranges); }; + // last ck can be empty depending on whether we + // deserialized state or not. This case means "last page ended on + // something-not-bound-by-clustering" (i.e. a static row, alone) + const bool has_ck = _has_clustering_keys && _last_ckey; + // If we have no clustering keys, it should mean we only have one row // per PK. Thus we can just bypass the last one. - modify_ranges(_ranges, lo, has_cks, dht::ring_position_comparator(*_schema)); + modify_ranges(_ranges, lo, has_ck, dht::ring_position_comparator(*_schema)); - if (has_cks) { + if (has_ck) { query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges(); clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema)); clustering_key_prefix::less_compare cmp_rt(*_schema); @@ -134,20 +146,18 @@ private: }); _cmd->slice.set_range(*_last_pkey, row_ranges); - - // If we're doing cluster filtering, we might be at the last CK. - // Next query will operate on PK + a CK that does not give any row. - // The query code still counts this into row_limit (bug?). - // Add an extra row to fetch to deal with this. - extra = 1; } } - auto max_rows = std::min(_max, page_size) + extra; + auto max_rows = std::min(_max, page_size); // We always need PK so we can determine where to start next. _cmd->slice.options.set(); - _cmd->slice.options.set(); + // don't add empty bytes (cks) unless we have to + if (_has_clustering_keys) { + _cmd->slice.options.set< + query::partition_slice::option::send_clustering_key>(); + } _cmd->row_limit = max_rows; logger.debug("Fetching {}, page size={}, max_rows={}", @@ -173,18 +183,6 @@ private: }); }); } - - bool ignore_if_in_last_pk(const clustering_key& key) const { - auto reversed = _cmd->slice.options.contains(); - - if (reversed && !clustering_key::less_compare(*_schema)(key, *_last_ckey)) { - return true; - } - if (!reversed && !clustering_key::less_compare(*_schema)(*_last_ckey, key)) { - return true; - } - return false; - } void handle_result( cql3::selection::result_set_builder& builder, @@ -195,7 +193,6 @@ private: public: impl& _impl; uint32_t page_size; - uint32_t part_skip = 0; uint32_t part_rows = 0; uint32_t included_rows = 0; @@ -203,24 +200,26 @@ private: std::experimental::optional last_pkey; std::experimental::optional last_ckey; - bool _is_prev_last_pkey = false; // just for verbosity uint32_t part_ignored = 0; clustering_key::less_compare _less; - bool include_row(const clustering_key& key) { + bool include_row() { ++total_rows; ++part_rows; if (included_rows >= page_size) { ++part_ignored; return false; } - if (_is_prev_last_pkey && _impl.ignore_if_in_last_pk(key)) { - ++part_skip; + ++included_rows; + return true; + } + + bool include_row(const clustering_key& key) { + if (!include_row()) { return false; } - ++included_rows; if (included_rows == page_size) { last_ckey = key; } @@ -238,10 +237,8 @@ private: } void accept_new_partition(const partition_key& key, uint32_t row_count) { logger.trace("Begin partition: {} ({})", key, row_count); - part_skip = 0; part_rows = 0; part_ignored = 0; - _is_prev_last_pkey = _impl._last_pkey && key.equal(*_impl._schema, *_impl._last_pkey); if (included_rows < page_size) { last_pkey = key; } @@ -260,33 +257,25 @@ private: } void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { - throw std::logic_error("Should not reach!"); + auto ok = include_row(); + if (ok) { + visitor::accept_new_row(static_row, row); + } } void accept_partition_end(const query::result_row_view& static_row) { + // accept_partition_end with row_count == 0 + // means we had an empty partition but live + // static columns, and since the fix, + // no CK restrictions. + // I.e. _row_count == 0 -> add a partially empty row + // So, treat this case as an accept_row variant + if (_row_count > 0 || include_row()) { + visitor::accept_partition_end(static_row); + } logger.trace( - "End partition, skipped={}, included={}, ignored={}", - part_skip, - part_rows - part_skip - part_ignored, + "End partition, included={}, ignored={}", + part_rows - part_ignored, part_ignored); - // Do not call visitor::accept_partition_end. - // It only exists to create a row of static cols in the case - // we optimized a call to have to rows (only stats selected), - // in which row_count in accept_new_row was zero. - // However, we always force clustering keys (even when there are none!) - // so this should never happen. Also we confuse the APE code - // when we've selected a row range for first key that results in - // zero rows. - if (_row_count == 0 && _is_prev_last_pkey) { - return; - } - if (_row_count == 0 && included_rows == page_size) { - return; - } - if (_row_count == 0) { - ++included_rows; - last_ckey = {}; - } - visitor::accept_partition_end(static_row); } }; @@ -304,7 +293,7 @@ private: if (_last_pkey) { logger.debug("Last partition key: {}", *_last_pkey); } - if (_last_ckey) { + if (_has_clustering_keys && _last_ckey) { logger.debug("Last clustering key: {}", *_last_ckey); } } @@ -319,10 +308,14 @@ private: ::shared_ptr state() const override { return _exhausted ? - nullptr : ::make_shared(*_last_pkey, *_last_ckey, _max); + nullptr : + ::make_shared(*_last_pkey, + _last_ckey, _max); } private: + // remember if we use clustering. if not, each partition == one row + const bool _has_clustering_keys; bool _exhausted = false; uint32_t _rem = 0; uint32_t _max;