diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 92fa5fd0de..8e415e5022 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1756,9 +1756,11 @@ class data_read_resolver : public abstract_read_resolver { size_t _total_live_count = 0; uint32_t _max_live_count = 0; uint32_t _short_read_diff = 0; - uint32_t _max_partition_live_count = 0; + uint32_t _max_per_partition_live_count = 0; uint32_t _partition_count = 0; + uint32_t _live_partition_count = 0; bool _increase_per_partition_limit = false; + bool _all_reached_end = true; std::vector _data_results; std::unordered_map>> _diffs; private: @@ -1776,7 +1778,7 @@ private: }); if (any_not_at_end && reconciled_live_rows < limit && limit - reconciled_live_rows > _short_read_diff) { _short_read_diff = limit - reconciled_live_rows; - _max_partition_live_count = reconciled_live_rows; + _max_per_partition_live_count = reconciled_live_rows; } } @@ -1894,7 +1896,7 @@ private: } bool got_incomplete_information(const schema& s, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit, - const std::vector& rp, const std::vector>& versions) { + uint32_t original_partition_limit, const std::vector& rp, const std::vector>& versions) { // We need to check whether the reconciled result contains all information from all available // replicas. It is possible that some of the nodes have returned less rows (because the limit // was set and they had some tombstones missing) than the others. In such cases we cannot just @@ -1907,12 +1909,14 @@ private: // asked for (if a replica returned less rows it means it returned everything it has). auto is_reversed = cmd.slice.options.contains(query::partition_slice::option::reversed); - auto limit = original_row_limit; + auto rows_left = original_row_limit; + auto partitions_left = original_partition_limit; auto pv = versions.rbegin(); for (auto&& m_a_rc : rp | boost::adaptors::reversed) { auto row_count = m_a_rc.live_row_count; - if (row_count < limit) { - limit -= row_count; + if (row_count < rows_left && partitions_left > !!row_count) { + rows_left -= row_count; + partitions_left -= !!row_count; if (original_per_partition_limit != query::max_rows) { auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, original_per_partition_limit, is_reversed); if (got_incomplete_information_in_partition(s, last_row, *pv, is_reversed)) { @@ -1921,7 +1925,7 @@ private: } } } else { - auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, limit, is_reversed); + auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, rows_left, is_reversed); return got_incomplete_information_across_partitions(s, last_row, versions, is_reversed); } ++pv; @@ -1951,13 +1955,20 @@ public: bool increase_per_partition_limit() const { return _increase_per_partition_limit; } - uint32_t max_partition_live_count() const { - return _max_partition_live_count; + uint32_t max_per_partition_live_count() const { + return _max_per_partition_live_count; } uint32_t partition_count() const { return _partition_count; } - stdx::optional resolve(schema_ptr schema, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit) { + uint32_t live_partition_count() const { + return _live_partition_count; + } + bool all_reached_end() const { + return _all_reached_end; + } + stdx::optional resolve(schema_ptr schema, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit, + uint32_t original_partition_limit) { assert(_data_results.size()); const auto& s = *schema; @@ -1984,6 +1995,7 @@ public: || boost::range::count_if(r.result->partitions(), [] (const partition& p) { return p.row_count(); }) < cmd.partition_limit); + _all_reached_end = _all_reached_end && r.reached_end; } do { @@ -2024,6 +2036,7 @@ public: auto live_row_count = m.live_row_count(); _total_live_count += live_row_count; register_live_count(v, live_row_count, original_per_partition_limit); + _live_partition_count += !!live_row_count; return mutation_and_live_row_count { std::move(m), live_row_count }; }); _partition_count = reconciled_partitions.size(); @@ -2061,7 +2074,7 @@ public: if (has_diff) { if (_total_live_count >= original_row_limit && !any_partition_short_read() && got_incomplete_information(*schema, cmd, original_row_limit, original_per_partition_limit, - reconciled_partitions, versions)) { + original_partition_limit, reconciled_partitions, versions)) { return {}; } // filter out partitions with empty diffs @@ -2220,6 +2233,9 @@ protected: uint32_t original_per_partition_row_limit() const { return _cmd->slice.partition_row_limit(); } + uint32_t original_partition_limit() const { + return _cmd->partition_limit; + } void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr cmd) { data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); @@ -2229,12 +2245,13 @@ protected: data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { try { f.get(); - auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit()); // reconciliation happens here + auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry). // So in particular, if no host returned count live columns, we know it's not a short read. - if (rr_opt && (data_resolver->max_live_count() < cmd->row_limit || rr_opt->row_count() >= original_row_limit()) + if (rr_opt && (data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit() + || data_resolver->live_partition_count() >= original_partition_limit()) && !data_resolver->any_partition_short_read()) { auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice))); // wait for write to complete before returning result to prevent multiple concurrent read requests to @@ -2265,12 +2282,17 @@ protected: }; if (data_resolver->any_partition_short_read() || data_resolver->increase_per_partition_limit()) { // The number of live rows was bounded by the per partition limit. - auto new_limit = x(cmd->slice.partition_row_limit(), data_resolver->max_partition_live_count()); + auto new_limit = x(cmd->slice.partition_row_limit(), data_resolver->max_per_partition_live_count()); _retry_cmd->slice.set_partition_row_limit(new_limit); _retry_cmd->row_limit = std::max(cmd->row_limit, data_resolver->partition_count() * new_limit); } else { - // The number of live rows was bounded by the total row limit. - _retry_cmd->row_limit = x(cmd->row_limit, data_resolver->total_live_count()); + // The number of live rows was bounded by the total row limit or partition limit. + if (cmd->partition_limit != query::max_partitions) { + _retry_cmd->partition_limit = x(cmd->partition_limit, data_resolver->live_partition_count()); + } + if (cmd->row_limit != query::max_rows) { + _retry_cmd->row_limit = x(cmd->row_limit, data_resolver->total_live_count()); + } } logger.trace("Retrying query with command {} (previous is {})", *_retry_cmd, *cmd); reconcile(cl, timeout, _retry_cmd);