Merge "flat_mutation_reader: abort reverse reads when size of mutation exceeds limit" from Botond
" Reverse queries work by reading an entire partition into memory, then start emitting its rows in reverse order. It is easy to see how this can lead to disasters combined with large partitions. In fact a handful of such reverse queries on large partitions is enough to bring a node down. To prevent this, abort reverse queries, when we find out that the size of the partition is larger than a limit. This might be annoying to users, but I'm sure it is not as annoying as their nodes going down. The limit is configurable via `max_memory_for_unlimited_query` configuration option, which is 1MB by default. This limit is propagated to each table, system tables having no limit. This limit is planned to be used by other queries capable of consuming unlimited amount of memory, like unpaged queries. Not in this series. The proper solution would be to read the data in reverse (#1413), but that is a major effort. In the meanwhile make sure the unsuspecting user won't bring their nodes down with an innocent looking ordering directive. Note that for calculating the memory footprint of the partition-in-question, only the clustering rows are used. This should be fine, the 1MB limit is conservative enough that an eventual overshoot caused by the omitted range tombstones and the static row would not make a big difference. Fixes: #5804 " * 'limit-reverse-query-memory-consumption/v3' of https://github.com/denesb/scylla: flat_mutation_reader: make_reversing_reader(): add memory limit db/config: add config memory limit of otherwise unlimited queries utils::updateable_value: add operator=(T) flat_mutation_reader: expose reverse reader as a standalone reader
This commit is contained in:
@@ -929,8 +929,10 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
|
||||
// avoid self-reporting
|
||||
if (is_system_table(s)) {
|
||||
cfg.sstables_manager = &db.get_system_sstables_manager();
|
||||
cfg.max_memory_for_unlimited_query = std::numeric_limits<uint64_t>::max();
|
||||
} else {
|
||||
cfg.sstables_manager = &db.get_user_sstables_manager();
|
||||
cfg.max_memory_for_unlimited_query = db_config.max_memory_for_unlimited_query();
|
||||
}
|
||||
|
||||
cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore;
|
||||
@@ -1206,6 +1208,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
cmd.row_limit,
|
||||
cmd.partition_limit,
|
||||
cmd.timestamp,
|
||||
cf.get_config().max_memory_for_unlimited_query,
|
||||
std::move(accounter),
|
||||
std::move(trace_state),
|
||||
timeout,
|
||||
|
||||
@@ -389,6 +389,7 @@ public:
|
||||
db::timeout_semaphore* view_update_concurrency_semaphore;
|
||||
size_t view_update_concurrency_semaphore_limit;
|
||||
db::data_listeners* data_listeners = nullptr;
|
||||
utils::updateable_value<uint64_t> max_memory_for_unlimited_query;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
|
||||
@@ -915,6 +916,10 @@ public:
|
||||
return _config.cf_stats;
|
||||
}
|
||||
|
||||
const config& get_config() const {
|
||||
return _config;
|
||||
}
|
||||
|
||||
compaction_manager& get_compaction_manager() const {
|
||||
return _compaction_manager;
|
||||
}
|
||||
|
||||
@@ -721,6 +721,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100,
|
||||
"Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, "
|
||||
"especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.")
|
||||
, max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20,
|
||||
"Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.")
|
||||
, enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false,
|
||||
"Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version,"
|
||||
" or installed from a later version, leave this set to false. This adjusts the communication protocol to"
|
||||
|
||||
@@ -302,6 +302,7 @@ public:
|
||||
named_value<bool> abort_on_internal_error;
|
||||
named_value<uint32_t> max_partition_key_restrictions_per_query;
|
||||
named_value<uint32_t> max_clustering_key_restrictions_per_query;
|
||||
named_value<uint64_t> max_memory_for_unlimited_query;
|
||||
named_value<bool> enable_3_1_0_compatibility_mode;
|
||||
named_value<bool> enable_user_defined_functions;
|
||||
named_value<unsigned> user_defined_function_time_limit_ms;
|
||||
|
||||
@@ -59,14 +59,14 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
|
||||
_buffer_size = compute_buffer_size(*_schema, _buffer);
|
||||
}
|
||||
|
||||
flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutation_reader::impl& original) {
|
||||
// FIXME: #1413 Full partitions get accumulated in memory.
|
||||
|
||||
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) {
|
||||
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader::impl* _source;
|
||||
flat_mutation_reader* _source;
|
||||
range_tombstone_list _range_tombstones;
|
||||
std::stack<mutation_fragment> _mutation_fragments;
|
||||
mutation_fragment_opt _partition_end;
|
||||
size_t _stack_size = 0;
|
||||
const size_t _max_stack_size;
|
||||
private:
|
||||
stop_iteration emit_partition() {
|
||||
auto emit_range_tombstone = [&] {
|
||||
@@ -76,12 +76,13 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
|
||||
auto rt_owner = alloc_strategy_unique_ptr<range_tombstone>(&rt);
|
||||
push_mutation_fragment(mutation_fragment(std::move(rt)));
|
||||
};
|
||||
position_in_partition::less_compare cmp(*_source->_schema);
|
||||
position_in_partition::less_compare cmp(*_schema);
|
||||
while (!_mutation_fragments.empty() && !is_buffer_full()) {
|
||||
auto& mf = _mutation_fragments.top();
|
||||
if (!_range_tombstones.empty() && !cmp(_range_tombstones.tombstones().rbegin()->end_position(), mf.position())) {
|
||||
emit_range_tombstone();
|
||||
} else {
|
||||
_stack_size -= mf.memory_usage(*_schema);
|
||||
push_mutation_fragment(std::move(mf));
|
||||
_mutation_fragments.pop();
|
||||
}
|
||||
@@ -113,18 +114,35 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
} else if (mf.is_range_tombstone()) {
|
||||
_range_tombstones.apply(*_source->_schema, std::move(mf.as_range_tombstone()));
|
||||
_range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone()));
|
||||
} else {
|
||||
_mutation_fragments.emplace(std::move(mf));
|
||||
_stack_size += _mutation_fragments.top().memory_usage(*_schema);
|
||||
if (_stack_size >= _max_stack_size) {
|
||||
const partition_key* key = nullptr;
|
||||
auto it = buffer().end();
|
||||
--it;
|
||||
if (it->is_partition_start()) {
|
||||
key = &it->as_partition_start().key().key();
|
||||
} else {
|
||||
--it;
|
||||
key = &it->as_partition_start().key().key();
|
||||
}
|
||||
throw std::runtime_error(fmt::format(
|
||||
"Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.",
|
||||
key->with_schema(*_schema),
|
||||
_max_stack_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future<stop_iteration>(is_buffer_full());
|
||||
}
|
||||
public:
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader::impl& mr)
|
||||
: flat_mutation_reader::impl(mr._schema)
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size)
|
||||
: flat_mutation_reader::impl(mr.schema())
|
||||
, _source(&mr)
|
||||
, _range_tombstones(*mr._schema)
|
||||
, _range_tombstones(*_schema)
|
||||
, _max_stack_size(max_stack_size)
|
||||
{ }
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
@@ -145,6 +163,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty() && !is_end_of_stream()) {
|
||||
while (!_mutation_fragments.empty()) {
|
||||
_stack_size -= _mutation_fragments.top().memory_usage(*_schema);
|
||||
_mutation_fragments.pop();
|
||||
}
|
||||
_range_tombstones.clear();
|
||||
@@ -165,7 +184,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
|
||||
}
|
||||
};
|
||||
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original);
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_memory_consumption);
|
||||
}
|
||||
|
||||
template<typename Source>
|
||||
|
||||
@@ -79,15 +79,6 @@ GCC6_CONCEPT(
|
||||
*/
|
||||
class flat_mutation_reader final {
|
||||
public:
|
||||
// Causes a stream of reversed mutations to be emitted.
|
||||
// 1. Static row is still emitted first.
|
||||
// 2. Range tombstones are ordered by their end position.
|
||||
// 3. Clustered rows and range tombstones are emitted in descending order.
|
||||
// Because of 2 and 3 the guarantee that a range tombstone is emitted before
|
||||
// any mutation fragment affected by it still holds.
|
||||
// Ordering of partitions themselves remains unchanged.
|
||||
using consume_reversed_partitions = seastar::bool_class<class consume_reversed_partitions_tag>;
|
||||
|
||||
class impl {
|
||||
private:
|
||||
circular_buffer<mutation_fragment> _buffer;
|
||||
@@ -122,8 +113,6 @@ public:
|
||||
const circular_buffer<mutation_fragment>& buffer() const {
|
||||
return _buffer;
|
||||
}
|
||||
private:
|
||||
static flat_mutation_reader reverse_partitions(flat_mutation_reader::impl&);
|
||||
public:
|
||||
impl(schema_ptr s) : _schema(std::move(s)) { }
|
||||
virtual ~impl() {}
|
||||
@@ -353,14 +342,7 @@ public:
|
||||
GCC6_CONCEPT(
|
||||
requires FlattenedConsumer<Consumer>()
|
||||
)
|
||||
auto consume(Consumer consumer,
|
||||
db::timeout_clock::time_point timeout,
|
||||
consume_reversed_partitions reversed = consume_reversed_partitions::no) {
|
||||
if (reversed) {
|
||||
return do_with(impl::reverse_partitions(*_impl), [&] (auto& reversed_partition_stream) {
|
||||
return reversed_partition_stream._impl->consume(std::move(consumer), timeout);
|
||||
});
|
||||
}
|
||||
auto consume(Consumer consumer, db::timeout_clock::time_point timeout) {
|
||||
return _impl->consume(std::move(consumer), timeout);
|
||||
}
|
||||
|
||||
@@ -747,6 +729,27 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db:
|
||||
flat_mutation_reader
|
||||
make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment);
|
||||
|
||||
/// A reader that emits partitions in reverse.
|
||||
///
|
||||
/// 1. Static row is still emitted first.
|
||||
/// 2. Range tombstones are ordered by their end position.
|
||||
/// 3. Clustered rows and range tombstones are emitted in descending order.
|
||||
/// Because of 2 and 3 the guarantee that a range tombstone is emitted before
|
||||
/// any mutation fragment affected by it still holds.
|
||||
/// Ordering of partitions themselves remains unchanged.
|
||||
///
|
||||
/// \param original the reader to be reversed, has to be kept alive while the
|
||||
/// reversing reader is in use.
|
||||
/// \param max_memory_consumption the maximum amount of memory the reader is
|
||||
/// allowed to use for reversing. The reverse reader reads entire partitions
|
||||
/// into memory, before reversing them. Since partitions can be larger than
|
||||
/// the available memory, we need to enforce a limit on memory consumption.
|
||||
/// If the read uses more memory then this limit, the read is aborted.
|
||||
///
|
||||
/// FIXME: reversing should be done in the sstable layer, see #1413.
|
||||
flat_mutation_reader
|
||||
make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption);
|
||||
|
||||
/// Low level fragment stream validator.
|
||||
///
|
||||
/// Tracks and validates the monotonicity of the passed in fragment kinds,
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "service/priority_manager.hh"
|
||||
#include "multishard_mutation_query.hh"
|
||||
#include "database.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
|
||||
@@ -220,6 +221,10 @@ public:
|
||||
read_context& operator=(read_context&&) = delete;
|
||||
read_context& operator=(const read_context&) = delete;
|
||||
|
||||
distributed<database>& db() {
|
||||
return _db;
|
||||
}
|
||||
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
const dht::partition_range& pr,
|
||||
@@ -604,8 +609,9 @@ static future<reconcilable_result> do_query_mutations(
|
||||
return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] (
|
||||
flat_mutation_reader& reader, lw_shared_ptr<compact_for_mutation_query_state>& compaction_state) mutable {
|
||||
auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter));
|
||||
auto& table = ctx->db().local().find_column_family(reader.schema());
|
||||
return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.row_limit, cmd.partition_limit, cmd.timestamp,
|
||||
timeout).then([&] (consume_result&& result) mutable {
|
||||
timeout, table.get_config().max_memory_for_unlimited_query).then([&] (consume_result&& result) mutable {
|
||||
return make_ready_future<page_consume_result>(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)));
|
||||
});
|
||||
}).then_wrapped([&ctx] (future<page_consume_result>&& result_fut) {
|
||||
|
||||
@@ -2168,6 +2168,7 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
@@ -2181,9 +2182,10 @@ future<> data_query(
|
||||
? std::move(*querier_opt)
|
||||
: query::data_querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
|
||||
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
|
||||
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr),
|
||||
cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
|
||||
auto qrb = query_result_builder(*s, builder);
|
||||
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout).then(
|
||||
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
|
||||
[=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable {
|
||||
if (q.are_limits_reached() || builder.is_short_read()) {
|
||||
cache_ctx.insert(std::move(q), std::move(trace_ptr));
|
||||
@@ -2261,6 +2263,7 @@ static do_mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
@@ -2278,7 +2281,7 @@ static do_mutation_query(schema_ptr s,
|
||||
return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (
|
||||
query::mutation_querier& q) mutable {
|
||||
auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter));
|
||||
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout).then(
|
||||
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
|
||||
[=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable {
|
||||
if (q.are_limits_reached() || r.is_short_read()) {
|
||||
cache_ctx.insert(std::move(q), std::move(trace_ptr));
|
||||
@@ -2300,13 +2303,14 @@ mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
{
|
||||
return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice),
|
||||
row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
|
||||
row_limit, partition_limit, query_time, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
|
||||
}
|
||||
|
||||
deletable_row::deletable_row(clustering_row&& cr)
|
||||
@@ -2528,7 +2532,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no);
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout);
|
||||
return f.finally([r_a_r = std::move(r_a_r)] { });
|
||||
}
|
||||
|
||||
|
||||
@@ -161,6 +161,7 @@ future<reconcilable_result> mutation_query(
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter = { },
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
@@ -175,6 +176,7 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
query::querier_cache_context cache_ctx = { });
|
||||
@@ -189,6 +191,7 @@ class mutation_query_stage {
|
||||
uint32_t,
|
||||
uint32_t,
|
||||
gc_clock::time_point,
|
||||
uint64_t,
|
||||
query::result_memory_accounter&&,
|
||||
tracing::trace_state_ptr,
|
||||
db::timeout_clock::time_point,
|
||||
|
||||
@@ -52,7 +52,7 @@ static sstring cannot_use_reason(can_use cu)
|
||||
|
||||
static bool ring_position_matches(const schema& s, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
const position_view& pos) {
|
||||
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed));
|
||||
const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
const auto expected_start = dht::ring_position_view(*pos.partition_key);
|
||||
// If there are no clustering columns or the select is distinct we don't
|
||||
@@ -93,7 +93,7 @@ static bool clustering_position_matches(const schema& s, const query::partition_
|
||||
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed));
|
||||
const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
// If the page ended mid-partition the first partition range should start
|
||||
// with the last clustering key (exclusive).
|
||||
|
||||
23
querier.hh
23
querier.hh
@@ -84,7 +84,8 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
// FIXME: #3158
|
||||
// consumer cannot be moved after consume_new_partition() is called
|
||||
// on it because it stores references to some of it's own members.
|
||||
@@ -95,15 +96,22 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end;
|
||||
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer);
|
||||
|
||||
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(
|
||||
slice.options.contains(query::partition_slice::option::reversed));
|
||||
|
||||
auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
|
||||
auto reader_consumer = make_stable_flattened_mutations_consumer<compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>>(
|
||||
compaction_state,
|
||||
clustering_position_tracker(std::move(consumer), last_ckey));
|
||||
|
||||
return reader.consume(std::move(reader_consumer), timeout, is_reversed).then([last_ckey] (auto&&... results) mutable {
|
||||
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, reverse_read_max_memory] () mutable {
|
||||
if (slice.options.contains(query::partition_slice::option::reversed)) {
|
||||
return do_with(make_reversing_reader(reader, reverse_read_max_memory),
|
||||
[reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable {
|
||||
return reversing_reader.consume(std::move(reader_consumer), timeout);
|
||||
});
|
||||
}
|
||||
return reader.consume(std::move(reader_consumer), timeout);
|
||||
};
|
||||
|
||||
return consume().then([last_ckey] (auto&&... results) mutable {
|
||||
static_assert(sizeof...(results) <= 1);
|
||||
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
|
||||
});
|
||||
@@ -176,9 +184,10 @@ public:
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time,
|
||||
timeout).then([this] (auto&& results) {
|
||||
timeout, reverse_read_max_memory).then([this] (auto&& results) {
|
||||
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
|
||||
constexpr auto size = std::tuple_size<std::decay_t<decltype(results)>>::value;
|
||||
static_assert(size <= 2);
|
||||
|
||||
2
table.cc
2
table.cc
@@ -2397,7 +2397,7 @@ table::query(schema_ptr s,
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, timeout, cache_ctx);
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, _config.max_memory_for_unlimited_query, trace_state, timeout, cache_ctx);
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
|
||||
@@ -589,8 +589,11 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
assert(bool(!reversed));
|
||||
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
|
||||
} else {
|
||||
auto reversed_flag = flat_mutation_reader::consume_reversed_partitions(bool(reversed));
|
||||
return fmr.consume(std::move(fsc), db::no_timeout, reversed_flag).get0();
|
||||
if (reversed) {
|
||||
auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20);
|
||||
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
|
||||
}
|
||||
return fmr.consume(std::move(fsc), db::no_timeout).get0();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -784,3 +787,49 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_fragments_as_mutation_source)
|
||||
};
|
||||
run_mutation_source_tests(populate);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
simple_schema schema;
|
||||
|
||||
struct phony_consumer {
|
||||
void consume_new_partition(const dht::decorated_key&) { }
|
||||
void consume(tombstone) { }
|
||||
stop_iteration consume(static_row&&) { return stop_iteration::no; }
|
||||
stop_iteration consume(clustering_row&&) { return stop_iteration::no; }
|
||||
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
|
||||
stop_iteration consume_end_of_partition() { return stop_iteration::no; }
|
||||
void consume_end_of_stream() { }
|
||||
};
|
||||
|
||||
auto test_with_partition = [&] (bool with_static_row) {
|
||||
BOOST_TEST_MESSAGE(fmt::format("Testing with_static_row={}", with_static_row));
|
||||
auto mut = schema.new_mutation("pk1");
|
||||
const size_t desired_mut_size = 1 * 1024 * 1024;
|
||||
const size_t row_size = 10 * 1024;
|
||||
|
||||
if (with_static_row) {
|
||||
schema.add_static_row(mut, "s1");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < desired_mut_size / row_size; ++i) {
|
||||
schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0'));
|
||||
}
|
||||
|
||||
auto reader = flat_mutation_reader_from_mutations({mut});
|
||||
auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10);
|
||||
|
||||
try {
|
||||
reverse_reader.consume(phony_consumer{}, db::no_timeout).get();
|
||||
BOOST_FAIL("No exception thrown for reversing overly big partition");
|
||||
} catch (const std::runtime_error& e) {
|
||||
BOOST_TEST_MESSAGE(fmt::format("Got exception with message: {}", e.what()));
|
||||
auto str = sstring(e.what());
|
||||
BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0);
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
};
|
||||
|
||||
test_with_partition(true);
|
||||
test_with_partition(false);
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ static query::partition_slice make_full_slice(const schema& s) {
|
||||
}
|
||||
|
||||
static auto inf32 = std::numeric_limits<unsigned>::max();
|
||||
static const uint64_t max_memory_for_reverse_query = 1 << 20;
|
||||
|
||||
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
|
||||
@@ -100,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
// FIXME: use mutation assertions
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
@@ -123,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -159,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -173,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -206,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -236,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -264,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -284,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -296,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -323,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -347,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -369,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -395,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.is_empty();
|
||||
@@ -445,7 +446,8 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
|
||||
auto query_time = now + std::chrono::seconds(1);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time).get0();
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time,
|
||||
max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
@@ -463,24 +465,29 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(s, partition_key::from_single_value(*s, "key2"));
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
@@ -503,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 10, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -519,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 1, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -541,10 +548,12 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), digest_only_builder, max_memory_for_reverse_query).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), result_and_digest_builder, max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
@@ -2928,7 +2928,7 @@ void run_compaction_data_stream_split_test(const schema& schema, gc_clock::time_
|
||||
survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable),
|
||||
purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable));
|
||||
|
||||
auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no).get0();
|
||||
auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout).get0();
|
||||
|
||||
tlog.info("Survived data: {}", create_stats(survived_partitions));
|
||||
tlog.info("Purged data: {}", create_stats(purged_partitions));
|
||||
|
||||
@@ -213,7 +213,7 @@ public:
|
||||
|
||||
auto querier = make_querier<Querier>(range);
|
||||
auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), db::no_timeout).get0();
|
||||
gc_clock::now(), db::no_timeout, std::numeric_limits<uint64_t>::max()).get0();
|
||||
const auto memory_usage = querier.memory_usage();
|
||||
_cache.insert(cache_key, std::move(querier), nullptr);
|
||||
|
||||
|
||||
@@ -82,6 +82,15 @@ updateable_value_base::operator=(updateable_value_base&& v) noexcept {
|
||||
return *this;
|
||||
}
|
||||
|
||||
updateable_value_base&
|
||||
updateable_value_base::updateable_value_base::operator=(nullptr_t) {
|
||||
if (_source) {
|
||||
_source->del_ref(this);
|
||||
_source = nullptr;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
void
|
||||
updateable_value_source_base::for_each_ref(std::function<void (updateable_value_base* ref)> func) {
|
||||
for (auto ref : _refs) {
|
||||
|
||||
@@ -61,6 +61,7 @@ public:
|
||||
updateable_value_base& operator=(const updateable_value_base&);
|
||||
updateable_value_base(updateable_value_base&&) noexcept;
|
||||
updateable_value_base& operator=(updateable_value_base&&) noexcept;
|
||||
updateable_value_base& operator=(nullptr_t);
|
||||
|
||||
friend class updateable_value_source_base;
|
||||
};
|
||||
@@ -78,6 +79,7 @@ public:
|
||||
explicit updateable_value(T value) : _value(std::move(value)) {}
|
||||
explicit updateable_value(const updateable_value_source<T>& source);
|
||||
updateable_value(const updateable_value& v);
|
||||
updateable_value& operator=(T value);
|
||||
updateable_value& operator=(const updateable_value&);
|
||||
updateable_value(updateable_value&&) noexcept;
|
||||
updateable_value& operator=(updateable_value&&) noexcept;
|
||||
@@ -170,6 +172,13 @@ template <typename T>
|
||||
updateable_value<T>::updateable_value(const updateable_value& v) : updateable_value_base(v), _value(v._value) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>& updateable_value<T>::operator=(T value) {
|
||||
updateable_value_base::operator=(nullptr);
|
||||
_value = std::move(value);
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>& updateable_value<T>::operator=(const updateable_value& v) {
|
||||
if (this != &v) {
|
||||
|
||||
Reference in New Issue
Block a user