mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-03 13:37:04 +00:00
storage_proxy: honour partition limit
At the moment the coordinator does not care much for the partition limit. In particular it doesn't check whether after reconciliation the result still contains enough partitions. This patch makes it honour the partition limit and increase it in the retried queries if necessary. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -1756,9 +1756,11 @@ class data_read_resolver : public abstract_read_resolver {
|
||||
size_t _total_live_count = 0;
|
||||
uint32_t _max_live_count = 0;
|
||||
uint32_t _short_read_diff = 0;
|
||||
uint32_t _max_partition_live_count = 0;
|
||||
uint32_t _max_per_partition_live_count = 0;
|
||||
uint32_t _partition_count = 0;
|
||||
uint32_t _live_partition_count = 0;
|
||||
bool _increase_per_partition_limit = false;
|
||||
bool _all_reached_end = true;
|
||||
std::vector<reply> _data_results;
|
||||
std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>> _diffs;
|
||||
private:
|
||||
@@ -1776,7 +1778,7 @@ private:
|
||||
});
|
||||
if (any_not_at_end && reconciled_live_rows < limit && limit - reconciled_live_rows > _short_read_diff) {
|
||||
_short_read_diff = limit - reconciled_live_rows;
|
||||
_max_partition_live_count = reconciled_live_rows;
|
||||
_max_per_partition_live_count = reconciled_live_rows;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1894,7 +1896,7 @@ private:
|
||||
}
|
||||
|
||||
bool got_incomplete_information(const schema& s, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit,
|
||||
const std::vector<mutation_and_live_row_count>& rp, const std::vector<std::vector<version>>& versions) {
|
||||
uint32_t original_partition_limit, const std::vector<mutation_and_live_row_count>& rp, const std::vector<std::vector<version>>& versions) {
|
||||
// We need to check whether the reconciled result contains all information from all available
|
||||
// replicas. It is possible that some of the nodes have returned less rows (because the limit
|
||||
// was set and they had some tombstones missing) than the others. In such cases we cannot just
|
||||
@@ -1907,12 +1909,14 @@ private:
|
||||
// asked for (if a replica returned less rows it means it returned everything it has).
|
||||
auto is_reversed = cmd.slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
auto limit = original_row_limit;
|
||||
auto rows_left = original_row_limit;
|
||||
auto partitions_left = original_partition_limit;
|
||||
auto pv = versions.rbegin();
|
||||
for (auto&& m_a_rc : rp | boost::adaptors::reversed) {
|
||||
auto row_count = m_a_rc.live_row_count;
|
||||
if (row_count < limit) {
|
||||
limit -= row_count;
|
||||
if (row_count < rows_left && partitions_left > !!row_count) {
|
||||
rows_left -= row_count;
|
||||
partitions_left -= !!row_count;
|
||||
if (original_per_partition_limit != query::max_rows) {
|
||||
auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, original_per_partition_limit, is_reversed);
|
||||
if (got_incomplete_information_in_partition(s, last_row, *pv, is_reversed)) {
|
||||
@@ -1921,7 +1925,7 @@ private:
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, limit, is_reversed);
|
||||
auto&& last_row = get_last_reconciled_row(s, m_a_rc, cmd, rows_left, is_reversed);
|
||||
return got_incomplete_information_across_partitions(s, last_row, versions, is_reversed);
|
||||
}
|
||||
++pv;
|
||||
@@ -1951,13 +1955,20 @@ public:
|
||||
bool increase_per_partition_limit() const {
|
||||
return _increase_per_partition_limit;
|
||||
}
|
||||
uint32_t max_partition_live_count() const {
|
||||
return _max_partition_live_count;
|
||||
uint32_t max_per_partition_live_count() const {
|
||||
return _max_per_partition_live_count;
|
||||
}
|
||||
uint32_t partition_count() const {
|
||||
return _partition_count;
|
||||
}
|
||||
stdx::optional<reconcilable_result> resolve(schema_ptr schema, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit) {
|
||||
uint32_t live_partition_count() const {
|
||||
return _live_partition_count;
|
||||
}
|
||||
bool all_reached_end() const {
|
||||
return _all_reached_end;
|
||||
}
|
||||
stdx::optional<reconcilable_result> resolve(schema_ptr schema, const query::read_command& cmd, uint32_t original_row_limit, uint32_t original_per_partition_limit,
|
||||
uint32_t original_partition_limit) {
|
||||
assert(_data_results.size());
|
||||
const auto& s = *schema;
|
||||
|
||||
@@ -1984,6 +1995,7 @@ public:
|
||||
|| boost::range::count_if(r.result->partitions(), [] (const partition& p) {
|
||||
return p.row_count();
|
||||
}) < cmd.partition_limit);
|
||||
_all_reached_end = _all_reached_end && r.reached_end;
|
||||
}
|
||||
|
||||
do {
|
||||
@@ -2024,6 +2036,7 @@ public:
|
||||
auto live_row_count = m.live_row_count();
|
||||
_total_live_count += live_row_count;
|
||||
register_live_count(v, live_row_count, original_per_partition_limit);
|
||||
_live_partition_count += !!live_row_count;
|
||||
return mutation_and_live_row_count { std::move(m), live_row_count };
|
||||
});
|
||||
_partition_count = reconciled_partitions.size();
|
||||
@@ -2061,7 +2074,7 @@ public:
|
||||
if (has_diff) {
|
||||
if (_total_live_count >= original_row_limit && !any_partition_short_read()
|
||||
&& got_incomplete_information(*schema, cmd, original_row_limit, original_per_partition_limit,
|
||||
reconciled_partitions, versions)) {
|
||||
original_partition_limit, reconciled_partitions, versions)) {
|
||||
return {};
|
||||
}
|
||||
// filter out partitions with empty diffs
|
||||
@@ -2220,6 +2233,9 @@ protected:
|
||||
uint32_t original_per_partition_row_limit() const {
|
||||
return _cmd->slice.partition_row_limit();
|
||||
}
|
||||
uint32_t original_partition_limit() const {
|
||||
return _cmd->partition_limit;
|
||||
}
|
||||
void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
@@ -2229,12 +2245,13 @@ protected:
|
||||
data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit()); // reconciliation happens here
|
||||
auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here
|
||||
|
||||
// We generate a retry if at least one node reply with count live columns but after merge we have less
|
||||
// than the total number of column we are interested in (which may be < count on a retry).
|
||||
// So in particular, if no host returned count live columns, we know it's not a short read.
|
||||
if (rr_opt && (data_resolver->max_live_count() < cmd->row_limit || rr_opt->row_count() >= original_row_limit())
|
||||
if (rr_opt && (data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit()
|
||||
|| data_resolver->live_partition_count() >= original_partition_limit())
|
||||
&& !data_resolver->any_partition_short_read()) {
|
||||
auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice)));
|
||||
// wait for write to complete before returning result to prevent multiple concurrent read requests to
|
||||
@@ -2265,12 +2282,17 @@ protected:
|
||||
};
|
||||
if (data_resolver->any_partition_short_read() || data_resolver->increase_per_partition_limit()) {
|
||||
// The number of live rows was bounded by the per partition limit.
|
||||
auto new_limit = x(cmd->slice.partition_row_limit(), data_resolver->max_partition_live_count());
|
||||
auto new_limit = x(cmd->slice.partition_row_limit(), data_resolver->max_per_partition_live_count());
|
||||
_retry_cmd->slice.set_partition_row_limit(new_limit);
|
||||
_retry_cmd->row_limit = std::max(cmd->row_limit, data_resolver->partition_count() * new_limit);
|
||||
} else {
|
||||
// The number of live rows was bounded by the total row limit.
|
||||
_retry_cmd->row_limit = x(cmd->row_limit, data_resolver->total_live_count());
|
||||
// The number of live rows was bounded by the total row limit or partition limit.
|
||||
if (cmd->partition_limit != query::max_partitions) {
|
||||
_retry_cmd->partition_limit = x(cmd->partition_limit, data_resolver->live_partition_count());
|
||||
}
|
||||
if (cmd->row_limit != query::max_rows) {
|
||||
_retry_cmd->row_limit = x(cmd->row_limit, data_resolver->total_live_count());
|
||||
}
|
||||
}
|
||||
logger.trace("Retrying query with command {} (previous is {})", *_retry_cmd, *cmd);
|
||||
reconcile(cl, timeout, _retry_cmd);
|
||||
|
||||
Reference in New Issue
Block a user