diff --git a/alternator/executor.cc b/alternator/executor.cc index a9fb41f916..b65d0be76b 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -1225,7 +1225,8 @@ static future> get_previous_item( service_permit permit, alternator::stats& stats); -static lw_shared_ptr previous_item_read_command(schema_ptr schema, +static lw_shared_ptr previous_item_read_command(service::storage_proxy& proxy, + schema_ptr schema, const clustering_key& ck, shared_ptr selection) { std::vector bounds; @@ -1240,7 +1241,7 @@ static lw_shared_ptr previous_item_read_command(schema_ptr auto regular_columns = boost::copy_range( schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - return ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); + return ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice)); } static dht::partition_range_vector to_partition_ranges(const schema& schema, const partition_key& pk) { @@ -1354,7 +1355,7 @@ static future> get_previous_item( { stats.reads_before_write++; auto selection = cql3::selection::selection::wildcard(schema); - auto command = previous_item_read_command(schema, ck, selection); + auto command = previous_item_read_command(proxy, schema, ck, selection); auto cl = db::consistency_level::LOCAL_QUORUM; return proxy.query(schema, command, to_partition_ranges(*schema, pk), cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), std::move(permit), client_state)).then( @@ -1405,7 +1406,7 @@ future rmw_operation::execute(service::storage_pr auto timeout = executor::default_timeout(); auto selection = cql3::selection::selection::wildcard(schema()); auto read_command = needs_read_before_write ? - previous_item_read_command(schema(), _ck, selection) : + previous_item_read_command(proxy, schema(), _ck, selection) : nullptr; return proxy.cas(schema(), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk), {timeout, std::move(permit), client_state, trace_state}, @@ -2405,7 +2406,7 @@ future executor::get_item(client_state& client_st auto selection = cql3::selection::selection::wildcard(schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); std::unordered_set used_attribute_names; auto attrs_to_get = calculate_attrs_to_get(request, used_attribute_names); @@ -2475,7 +2476,7 @@ future executor::batch_get_item(client_state& cli rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto selection = cql3::selection::selection::wildcard(rs.schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, query::max_partitions); + auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); future>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { @@ -2728,7 +2729,8 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag return last_evaluated_key; } -static future do_query(schema_ptr schema, +static future do_query(service::storage_proxy& proxy, + schema_ptr schema, const rjson::value* exclusive_start_key, dht::partition_range_vector&& partition_ranges, std::vector&& ck_bounds, @@ -2762,7 +2764,7 @@ static future do_query(schema_ptr schema, query::partition_slice::option_set opts = selection->get_query_options(); opts.add(custom_opts); auto partition_slice = query::partition_slice(std::move(ck_bounds), std::move(static_columns), std::move(regular_columns), opts); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice)); auto query_state_ptr = std::make_unique(client_state, trace_state, std::move(permit)); @@ -2883,7 +2885,7 @@ future executor::scan(client_state& client_state, verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "Scan"); verify_all_are_used(request, "ExpressionAttributeValues", used_attribute_values, "Scan"); - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, + return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit)); } @@ -3357,7 +3359,7 @@ future executor::query(client_state& client_state verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "Query"); query::partition_slice::option_set opts; opts.set_if(!forward); - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, + return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit)); } diff --git a/alternator/streams.cc b/alternator/streams.cc index 8fa3e5a8b5..25c854421a 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -717,7 +717,8 @@ future executor::get_records(client_state& client auto partition_slice = query::partition_slice( std::move(bounds) , {}, std::move(regular_columns), selection->get_query_options()); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, limit * 4); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), + query::row_limit(limit * 4)); return _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then( [this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable { diff --git a/cdc/log.cc b/cdc/log.cc index 52040c6bd9..2ae0530b52 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -1356,7 +1356,8 @@ public: opts.set_if(!p.static_row().empty()); auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts)); - auto command = ::make_lw_shared(_schema->id(), _schema->version(), partition_slice, row_limit); + const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice); + auto command = ::make_lw_shared(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), query::row_limit(row_limit)); const auto select_cl = adjust_cl(write_cl); diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc index 63f7998a86..bbf6212ad1 100644 --- a/cql3/statements/batch_statement.cc +++ b/cql3/statements/batch_statement.cc @@ -385,7 +385,7 @@ future> batch_statement::exe make_shared(shard)); } - return proxy.cas(schema, request, request->read_command(), request->key(), + return proxy.cas(schema, request, request->read_command(proxy), request->key(), {read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()}, cl_for_paxos, cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) { return modification_statement::build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied, request->rows()); diff --git a/cql3/statements/cas_request.cc b/cql3/statements/cas_request.cc index f597bad302..058b312ad4 100644 --- a/cql3/statements/cas_request.cc +++ b/cql3/statements/cas_request.cc @@ -81,7 +81,7 @@ std::optional cas_request::apply_updates(api::timestamp_type ts) const return mutation_set; } -lw_shared_ptr cas_request::read_command() const { +lw_shared_ptr cas_request::read_command(service::storage_proxy& proxy) const { column_set columns_to_read(_schema->all_columns_count()); std::vector ranges; @@ -116,7 +116,7 @@ lw_shared_ptr cas_request::read_command() const { options.set(query::partition_slice::option::always_return_static_content); query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options); ps.set_partition_row_limit(max_rows); - return make_lw_shared(_schema->id(), _schema->version(), std::move(ps)); + return make_lw_shared(_schema->id(), _schema->version(), std::move(ps), proxy.get_max_result_size(ps)); } bool cas_request::applies_to() const { diff --git a/cql3/statements/cas_request.hh b/cql3/statements/cas_request.hh index 48246f83e4..5f254b6d20 100644 --- a/cql3/statements/cas_request.hh +++ b/cql3/statements/cas_request.hh @@ -90,7 +90,7 @@ public: return _rows; } - lw_shared_ptr read_command() const; + lw_shared_ptr read_command(service::storage_proxy& proxy) const; void add_row_update(const modification_statement& stmt_arg, std::vector ranges_arg, modification_statement::json_cache_opt json_cache_arg, const query_options& options_arg); diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index c0a062cc5d..bdcc13f0fb 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -168,7 +168,7 @@ modification_statement::get_mutations(service::storage_proxy& proxy, const query } if (requires_read()) { - lw_shared_ptr cmd = read_command(ranges, cl); + lw_shared_ptr cmd = read_command(proxy, ranges, cl); // FIXME: ignoring "local" f = proxy.query(s, cmd, dht::partition_range_vector(keys), cl, {timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()}).then( @@ -246,14 +246,15 @@ std::vector modification_statement::apply_updates( } lw_shared_ptr -modification_statement::read_command(query::clustering_row_ranges ranges, db::consistency_level cl) const { +modification_statement::read_command(service::storage_proxy& proxy, query::clustering_row_ranges ranges, db::consistency_level cl) const { try { validate_for_read(cl); } catch (exceptions::invalid_request_exception& e) { throw exceptions::invalid_request_exception(format("Write operation require a read but consistency {} is not supported on reads", cl)); } query::partition_slice ps(std::move(ranges), *s, columns_to_read(), update_parameters::options); - return make_lw_shared(s->id(), s->version(), std::move(ps)); + const auto max_result_size = proxy.get_max_result_size(ps); + return make_lw_shared(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size)); } std::vector @@ -351,7 +352,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se make_shared(shard)); } - return proxy.cas(s, request, request->read_command(), request->key(), + return proxy.cas(s, request, request->read_command(proxy), request->key(), {read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()}, cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request] (bool is_applied) { return build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied, request->rows()); diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index de5dfeaa6e..435e9ce863 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -242,7 +242,7 @@ public: // Build a read_command instance to fetch the previous mutation from storage. The mutation is // fetched if we need to check LWT conditions or apply updates to non-frozen list elements. - lw_shared_ptr read_command(query::clustering_row_ranges ranges, db::consistency_level cl) const; + lw_shared_ptr read_command(service::storage_proxy& proxy, query::clustering_row_ranges ranges, db::consistency_level cl) const; // Create a mutation object for the update operation represented by this modification statement. // A single mutation object for lightweight transactions, which can only span one partition, or a vector // of mutations, one per partition key, for statements which affect multiple partition keys, diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 315656d22d..64185cc6be 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -319,8 +319,19 @@ select_statement::do_execute(service::storage_proxy& proxy, _stats.select_partition_range_scan += _range_scan; _stats.select_partition_range_scan_no_bypass_cache += _range_scan_no_bypass_cache; - auto command = ::make_lw_shared(_schema->id(), _schema->version(), - make_partition_slice(options), limit, now, tracing::make_trace_info(state.get_trace_state()), query::max_partitions, utils::UUID(), query::is_first_page::no, options.get_timestamp(state)); + auto slice = make_partition_slice(options); + auto command = ::make_lw_shared( + _schema->id(), + _schema->version(), + std::move(slice), + proxy.get_max_result_size(slice), + query::row_limit(limit), + query::partition_limit(query::max_partitions), + now, + tracing::make_trace_info(state.get_trace_state()), + utils::UUID(), + query::is_first_page::no, + options.get_timestamp(state)); int32_t page_size = options.get_page_size(); @@ -471,25 +482,28 @@ generate_base_key_from_index_pk(const partition_key& index_pk, const std::option } lw_shared_ptr -indexed_table_select_statement::prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging) const { +indexed_table_select_statement::prepare_command_for_base_query(service::storage_proxy& proxy, const query_options& options, + service::query_state& state, gc_clock::time_point now, bool use_paging) const { + auto slice = make_partition_slice(options); + if (use_paging) { + slice.options.set(); + slice.options.set(); + if (_schema->clustering_key_size() > 0) { + slice.options.set(); + } + } lw_shared_ptr cmd = ::make_lw_shared( _schema->id(), _schema->version(), - make_partition_slice(options), - get_limit(options), + std::move(slice), + proxy.get_max_result_size(slice), + query::row_limit(get_limit(options)), + query::partition_limit(query::max_partitions), now, tracing::make_trace_info(state.get_trace_state()), - query::max_partitions, utils::UUID(), query::is_first_page::no, options.get_timestamp(state)); - if (use_paging) { - cmd->slice.options.set(); - cmd->slice.options.set(); - if (_schema->clustering_key_size() > 0) { - cmd->slice.options.set(); - } - } return cmd; } @@ -502,7 +516,7 @@ indexed_table_select_statement::do_execute_base_query( gc_clock::time_point now, lw_shared_ptr paging_state) const { using value_type = std::tuple>, lw_shared_ptr>; - auto cmd = prepare_command_for_base_query(options, state, now, bool(paging_state)); + auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); uint32_t queried_ranges_count = partition_ranges.size(); service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata(), _schema, std::move(partition_ranges)); @@ -578,7 +592,7 @@ indexed_table_select_statement::do_execute_base_query( gc_clock::time_point now, lw_shared_ptr paging_state) const { using value_type = std::tuple>, lw_shared_ptr>; - auto cmd = prepare_command_for_base_query(options, state, now, bool(paging_state)); + auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector(); struct base_query_state { @@ -1146,10 +1160,11 @@ indexed_table_select_statement::read_posting_list(service::storage_proxy& proxy, _view_schema->id(), _view_schema->version(), partition_slice, - limit, + proxy.get_max_result_size(partition_slice), + query::row_limit(limit), + query::partition_limit(query::max_partitions), now, tracing::make_trace_info(state.get_trace_state()), - query::max_partitions, utils::UUID(), query::is_first_page::no, options.get_timestamp(state)); diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index 800457cab1..e82ed58c99 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -237,7 +237,8 @@ private: lw_shared_ptr paging_state) const; lw_shared_ptr - prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging) const; + prepare_command_for_base_query(service::storage_proxy& proxy, const query_options& options, service::query_state& state, gc_clock::time_point now, + bool use_paging) const; future>, lw_shared_ptr>> do_execute_base_query( diff --git a/database.cc b/database.cc index be1d5c9e5c..d1399077bc 100644 --- a/database.cc +++ b/database.cc @@ -1178,18 +1178,18 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { future, cache_temperature>> database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, - tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) { + tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { column_family& cf = find_column_family(cmd.cf_id); + auto class_config = query::query_class_config{.semaphore = get_reader_concurrency_semaphore(), .max_memory_for_unlimited_query = *cmd.max_result_size}; query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); return _data_query_stage(&cf, std::move(s), seastar::cref(cmd), - make_query_class_config(), + class_config, opts, seastar::cref(ranges), std::move(trace_state), seastar::ref(get_result_memory_limiter()), - max_result_size, timeout, std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) { if (f.failed()) { @@ -1206,8 +1206,12 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti future> database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, - query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); + return get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed).then( + [&, s = std::move(s), trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) { column_family& cf = find_column_family(cmd.cf_id); + auto class_config = query::query_class_config{.semaphore = get_reader_concurrency_semaphore(), .max_memory_for_unlimited_query = *cmd.max_result_size}; query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); return _mutation_query_stage(std::move(s), cf.as_mutation_source(), @@ -1217,7 +1221,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh cmd.partition_limit, cmd.timestamp, timeout, - make_query_class_config(), + class_config, std::move(accounter), std::move(trace_state), std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) { @@ -1231,6 +1235,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh return make_ready_future>(std::tuple(std::move(result), hit_rate)); } }); + }); } std::unordered_set database::get_initial_tokens() { @@ -1279,16 +1284,16 @@ void database::register_connection_drop_notifier(netw::messaging_service& ms) { }); } -query_class_config database::make_query_class_config() { +reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { // Everything running in the statement group is considered a user query if (current_scheduling_group() == _dbcfg.statement_scheduling_group) { - return query_class_config{_read_concurrency_sem, _cfg.max_memory_for_unlimited_query()}; + return _read_concurrency_sem; // Reads done on behalf of view update generation run in the streaming group } else if (current_scheduling_group() == _dbcfg.streaming_scheduling_group) { - return query_class_config{_streaming_concurrency_sem, std::numeric_limits::max()}; + return _streaming_concurrency_sem; // Everything else is considered a system query } else { - return query_class_config{_system_read_concurrency_sem, std::numeric_limits::max()}; + return _system_read_concurrency_sem; } } @@ -1348,7 +1353,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - return counter_write_query(m_schema, cf.as_mutation_source(), make_query_class_config().semaphore.make_permit(), m.decorated_key(), slice, trace_state, timeout) + return counter_write_query(m_schema, cf.as_mutation_source(), get_reader_concurrency_semaphore().make_permit(), m.decorated_key(), slice, trace_state, timeout) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter // cells we can look for our shard in each of them, increment @@ -1559,7 +1564,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra if (cf.views().empty()) { return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { }); } - future f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), make_query_class_config().semaphore); + future f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()); return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable { return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally( // Hold the local lock on the base-table partition or row diff --git a/database.hh b/database.hh index d0fb002b74..c8540ff5cd 100644 --- a/database.hh +++ b/database.hh @@ -748,12 +748,11 @@ public: // Returns at most "cmd.limit" rows future> query(schema_ptr, const query::read_command& cmd, - query_class_config class_config, + query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter, - uint64_t max_result_size, db::timeout_clock::time_point timeout, query::querier_cache_context cache_ctx = { }); @@ -1294,12 +1293,11 @@ private: column_family*, schema_ptr, const query::read_command&, - query_class_config, + query::query_class_config, query::result_options, const dht::partition_range_vector&, tracing::trace_state_ptr, query::result_memory_limiter&, - uint64_t, db::timeout_clock::time_point, query::querier_cache_context> _data_query_stage; @@ -1396,6 +1394,7 @@ public: return _commitlog.get(); } + seastar::scheduling_group get_statement_scheduling_group() const { return _dbcfg.statement_scheduling_group; } seastar::scheduling_group get_streaming_scheduling_group() const { return _dbcfg.streaming_scheduling_group; } size_t get_available_memory() const { return _dbcfg.available_memory; } @@ -1463,10 +1462,9 @@ public: unsigned shard_of(const frozen_mutation& m); future, cache_temperature>> query(schema_ptr, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, - uint64_t max_result_size, db::timeout_clock::time_point timeout); + db::timeout_clock::time_point timeout); future> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range, - query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, - db::timeout_clock::time_point timeout); + tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); // Apply the mutation atomically. // Throws timed_out_error when timeout is reached. future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout); @@ -1594,7 +1592,9 @@ public: return _supports_infinite_bound_range_deletions; } - query_class_config make_query_class_config(); + // Get the reader concurrency semaphore, appropriate for the query class, + // which is deduced from the current scheduling group. + reader_concurrency_semaphore& get_reader_concurrency_semaphore(); }; future<> start_large_data_handler(sharded& db); diff --git a/db/config.cc b/db/config.cc index b3af507875..0c98b9c4ff 100644 --- a/db/config.cc +++ b/db/config.cc @@ -224,7 +224,7 @@ db::config::config(std::shared_ptr exts) "The directory in which Scylla will put all its subdirectories. The location of individual subdirs can be overriden by the respective *_directory options.") , commitlog_directory(this, "commitlog_directory", value_status::Used, "", "The directory where the commit log is stored. For optimal write performance, it is recommended the commit log be on a separate disk partition (ideally, a separate physical device) from the data file directories.") - , data_file_directories(this, "data_file_directories", value_status::Used, { }, + , data_file_directories(this, "data_file_directories", "datadir", value_status::Used, { }, "The directory location where table data (SSTables) is stored") , hints_directory(this, "hints_directory", value_status::Used, "", "The directory where hints files are stored if hinted handoff is enabled.") @@ -517,7 +517,7 @@ db::config::config(std::shared_ptr exts) /* Native transport (CQL Binary Protocol) */ , start_native_transport(this, "start_native_transport", value_status::Used, true, "Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.") - , native_transport_port(this, "native_transport_port", value_status::Used, 9042, + , native_transport_port(this, "native_transport_port", "cql_port", value_status::Used, 9042, "Port on which the CQL native transport listens for clients.") , native_transport_port_ssl(this, "native_transport_port_ssl", value_status::Used, 9142, "Port on which the CQL TLS native transport listens for clients." @@ -536,7 +536,7 @@ db::config::config(std::shared_ptr exts) /* Settings for configuring and tuning client connections. */ , broadcast_rpc_address(this, "broadcast_rpc_address", value_status::Used, {/* unset */}, "RPC address to broadcast to drivers and other Scylla nodes. This cannot be set to 0.0.0.0. If blank, it is set to the value of the rpc_address or rpc_interface. If rpc_address or rpc_interfaceis set to 0.0.0.0, this property must be set.\n") - , rpc_port(this, "rpc_port", value_status::Used, 9160, + , rpc_port(this, "rpc_port", "thrift_port", value_status::Used, 9160, "Thrift port for client connections.") , start_rpc(this, "start_rpc", value_status::Used, true, "Starts the Thrift RPC server") @@ -722,8 +722,12 @@ db::config::config(std::shared_ptr exts) , max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, "Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, " "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") - , max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20, - "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.") + , max_memory_for_unlimited_query_soft_limit(this, "max_memory_for_unlimited_query_soft_limit", liveness::LiveUpdate, value_status::Used, uint64_t(1) << 20, + "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries. " + "This is the soft limit, there will be a warning logged for queries violating this limit.") + , max_memory_for_unlimited_query_hard_limit(this, "max_memory_for_unlimited_query_hard_limit", "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, (uint64_t(100) << 20), + "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries. " + "This is the hard limit, queries violating this limit will be aborted.") , initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u, "Maximum amount of sstables to load in parallel during initialization. A higher number can lead to more memory consumption. You should not need to touch this") , enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false, @@ -808,39 +812,27 @@ namespace utils { template<> void config_file::named_value::add_command_line_option( - boost::program_options::options_description_easy_init& init, - const std::string_view& name, const std::string_view& desc) { - init((hyphenate(name) + "-class-name").data(), + boost::program_options::options_description_easy_init& init) { + init((hyphenate(name()) + "-class-name").data(), value_ex()->notifier( [this](sstring new_class_name) { auto old_seed_provider = operator()(); old_seed_provider.class_name = new_class_name; set(std::move(old_seed_provider), config_source::CommandLine); }), - desc.data()); - init((hyphenate(name) + "-parameters").data(), + desc().data()); + init((hyphenate(name()) + "-parameters").data(), value_ex>()->notifier( [this](std::unordered_map new_parameters) { auto old_seed_provider = operator()(); old_seed_provider.parameters = new_parameters; set(std::move(old_seed_provider), config_source::CommandLine); }), - desc.data()); + desc().data()); } } -boost::program_options::options_description_easy_init& -db::config::add_options(boost::program_options::options_description_easy_init& init) { - config_file::add_options(init); - - data_file_directories.add_command_line_option(init, "datadir", "alias for 'data-file-directories'"); - rpc_port.add_command_line_option(init, "thrift-port", "alias for 'rpc-port'"); - native_transport_port.add_command_line_option(init, "cql-port", "alias for 'native-transport-port'"); - - return init; -} - db::fs::path db::config::get_conf_dir() { using namespace db::fs; diff --git a/db/config.hh b/db/config.hh index d85d61ed68..feadfa0dda 100644 --- a/db/config.hh +++ b/db/config.hh @@ -303,7 +303,8 @@ public: named_value abort_on_internal_error; named_value max_partition_key_restrictions_per_query; named_value max_clustering_key_restrictions_per_query; - named_value max_memory_for_unlimited_query; + named_value max_memory_for_unlimited_query_soft_limit; + named_value max_memory_for_unlimited_query_hard_limit; named_value initial_sstable_loading_concurrency; named_value enable_3_1_0_compatibility_mode; named_value enable_user_defined_functions; @@ -329,9 +330,6 @@ public: seastar::logging_settings logging_settings(const boost::program_options::variables_map&) const; - boost::program_options::options_description_easy_init& - add_options(boost::program_options::options_description_easy_init&); - const db::extensions& extensions() const; static const sstring default_tls_priority; @@ -346,8 +344,7 @@ private: return this->is_set() ? (*this)() : t; } // do not add to boost::options. We only care about yaml config - void add_command_line_option(boost::program_options::options_description_easy_init&, - const std::string_view&, const std::string_view&) override {} + void add_command_line_option(boost::program_options::options_description_easy_init&) override {} }; log_legacy_value default_log_level; diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 7f65f9c7e7..0014cf22e3 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -743,7 +743,7 @@ future query_partition_mutation(service::storage_proxy& proxy, { auto dk = dht::decorate_key(*s, pkey); return do_with(dht::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) { - return proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout) + return proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout, tracing::trace_state_ptr{}) .then([dk = std::move(dk), s](rpc::tuple>, cache_temperature> res_hit_rate) { auto&& [res, hit_rate] = res_hit_rate; auto&& partitions = res->partitions(); @@ -778,7 +778,8 @@ read_schema_partition_for_table(distributed& proxy, sche auto slice = partition_slice_builder(*schema) .with_range(std::move(clustering_range)) .build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), query::max_rows); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), + query::row_limit(query::max_rows)); return query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key)).then([&proxy] (mutation mut) { return redact_columns_for_missing_features(std::move(mut), proxy.local().get_db().local().features().cluster_schema_features()); }); @@ -788,7 +789,8 @@ future read_keyspace_mutation(distributed& proxy, const sstring& keyspace_name) { schema_ptr s = keyspaces(); auto key = partition_key::from_singular(*s, keyspace_name); - auto cmd = make_lw_shared(s->id(), s->version(), s->full_slice()); + auto slice = s->full_slice(); + auto cmd = make_lw_shared(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice)); return query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key)); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 50d4699103..07cccae1aa 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1984,8 +1984,7 @@ query_mutations(distributed& proxy, const sstring& ks_na database& db = proxy.local().get_db().local(); schema_ptr schema = db.find_schema(ks_name, cf_name); auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), - std::move(slice), std::numeric_limits::max()); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range, db::no_timeout) .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); } @@ -1995,8 +1994,7 @@ query(distributed& proxy, const sstring& ks_name, const database& db = proxy.local().get_db().local(); schema_ptr schema = db.find_schema(ks_name, cf_name); auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), - std::move(slice), std::numeric_limits::max()); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE, {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result)); @@ -2011,7 +2009,7 @@ query(distributed& proxy, const sstring& ks_name, const auto slice = partition_slice_builder(*schema) .with_range(std::move(row_range)) .build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), query::max_rows); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE, {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { diff --git a/db/view/view.cc b/db/view/view.cc index 4b18d59f6d..3d0ee79134 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1285,7 +1285,7 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas void view_builder::initialize_reader_at_current_token(build_step& step) { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); - auto permit = _db.make_query_class_config().semaphore.make_permit(); + auto permit = _db.get_reader_concurrency_semaphore().make_permit(); step.reader = make_local_shard_sstable_reader( step.base->schema(), std::move(permit), diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 624d192e5f..0552b95d7a 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -78,7 +78,7 @@ future<> view_update_generator::start() { auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), s, - _db.make_query_class_config().semaphore.make_permit(), + _db.get_reader_concurrency_semaphore().make_permit(), query::full_partition_range, s->full_slice(), service::get_local_streaming_priority(), diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 746c09e6f8..c054a80c18 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -60,14 +60,15 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() { _buffer_size = compute_buffer_size(*_schema, _buffer); } -flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) { +flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) { class partition_reversing_mutation_reader final : public flat_mutation_reader::impl { flat_mutation_reader* _source; range_tombstone_list _range_tombstones; std::stack _mutation_fragments; mutation_fragment_opt _partition_end; size_t _stack_size = 0; - const size_t _max_stack_size; + const query::max_result_size _max_size; + bool _below_soft_limit = true; private: stop_iteration emit_partition() { auto emit_range_tombstone = [&] { @@ -119,7 +120,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ } else { _mutation_fragments.emplace(std::move(mf)); _stack_size += _mutation_fragments.top().memory_usage(*_schema); - if (_stack_size >= _max_stack_size) { + if (_stack_size > _max_size.hard_limit || (_stack_size > _max_size.soft_limit && _below_soft_limit)) { const partition_key* key = nullptr; auto it = buffer().end(); --it; @@ -129,21 +130,30 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ --it; key = &it->as_partition_start().key().key(); } - throw std::runtime_error(fmt::format( - "Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.", - key->with_schema(*_schema), - _max_stack_size)); + + if (_stack_size > _max_size.hard_limit) { + throw std::runtime_error(fmt::format( + "Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}", + _max_size.hard_limit, + key->with_schema(*_schema))); + } else { + fmr_logger.warn( + "Memory usage of reversed read exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit), while reading partition {}", + _max_size.soft_limit, + key->with_schema(*_schema)); + _below_soft_limit = false; + } } } } return make_ready_future(is_buffer_full()); } public: - explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size) + explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size) : flat_mutation_reader::impl(mr.schema()) , _source(&mr) , _range_tombstones(*_schema) - , _max_stack_size(max_stack_size) + , _max_size(max_size) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { @@ -185,7 +195,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ } }; - return make_flat_mutation_reader(original, max_memory_consumption); + return make_flat_mutation_reader(original, max_size); } template diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 31373a44f6..a7b0a4ca23 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -29,6 +29,7 @@ #include "mutation_fragment.hh" #include "tracing/trace_state.hh" #include "mutation.hh" +#include "query_class_config.hh" #include #include @@ -720,15 +721,17 @@ make_generating_reader(schema_ptr s, std::function /// /// \param original the reader to be reversed, has to be kept alive while the /// reversing reader is in use. -/// \param max_memory_consumption the maximum amount of memory the reader is -/// allowed to use for reversing. The reverse reader reads entire partitions -/// into memory, before reversing them. Since partitions can be larger than -/// the available memory, we need to enforce a limit on memory consumption. -/// If the read uses more memory then this limit, the read is aborted. +/// \param max_size the maximum amount of memory the reader is allowed to use +/// for reversing and conversely the maximum size of the results. The +/// reverse reader reads entire partitions into memory, before reversing +/// them. Since partitions can be larger than the available memory, we need +/// to enforce a limit on memory consumption. When reaching the soft limit +/// a warning will be logged. When reaching the hard limit the read will be +/// aborted. /// /// FIXME: reversing should be done in the sstable layer, see #1413. flat_mutation_reader -make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption); +make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size); /// Low level fragment stream validator. /// diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 29d8a4bcb5..50db525cd1 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -40,6 +40,11 @@ class partition_slice { uint32_t partition_row_limit() [[version 1.3]] = std::numeric_limits::max(); }; +struct max_result_size { + uint64_t soft_limit; + uint64_t hard_limit; +} + class read_command { utils::UUID cf_id; utils::UUID schema_version; @@ -50,6 +55,7 @@ class read_command { uint32_t partition_limit [[version 1.3]] = std::numeric_limits::max(); utils::UUID query_uuid [[version 2.2]] = utils::UUID(); query::is_first_page is_first_page [[version 2.2]] = query::is_first_page::no; + std::optional max_result_size [[version 4.3]] = std::nullopt; }; } diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index f2177f1c57..f21f17e2d4 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -244,7 +244,7 @@ public: virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); if (!_semaphores[shard]) { - _semaphores[shard] = &_db.local().make_query_class_config().semaphore; + _semaphores[shard] = &_db.local().get_reader_concurrency_semaphore(); } return *_semaphores[shard]; } @@ -618,18 +618,17 @@ static future do_query_mutations( mutation_reader::forwarding fwd_mr) { return make_multishard_combining_reader(ctx, std::move(s), pr, ps, pc, std::move(trace_state), fwd_mr); }); - auto class_config = ctx->db().local().make_query_class_config(); - auto reader = make_flat_multi_range_reader(s, class_config.semaphore.make_permit(), std::move(ms), ranges, cmd.slice, - service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); + auto reader = make_flat_multi_range_reader(s, ctx->db().local().get_reader_concurrency_semaphore().make_permit(), std::move(ms), ranges, + cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.row_limit, cmd.partition_limit); - return do_with(std::move(reader), std::move(compaction_state), [&, class_config, accounter = std::move(accounter), timeout] ( + return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.row_limit, cmd.partition_limit, cmd.timestamp, - timeout, class_config.max_memory_for_unlimited_query).then([&] (consume_result&& result) mutable { + timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable { return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); }); }).then_wrapped([&ctx] (future&& result_fut) { @@ -659,7 +658,6 @@ future>, cache_tempera const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, - uint64_t max_size, db::timeout_clock::time_point timeout) { if (cmd.row_limit == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { return make_ready_future>, cache_temperature>>( @@ -668,8 +666,9 @@ future>, cache_tempera db.local().find_column_family(s).get_global_cache_hit_rate())); } - return db.local().get_result_memory_limiter().new_mutation_read(max_size).then([&, s = std::move(s), trace_state = std::move(trace_state), - timeout] (query::result_memory_accounter accounter) mutable { + const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); + return db.local().get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed).then([&, s = std::move(s), + trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable { return do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)).then_wrapped( [&db, s = std::move(s)] (future&& f) { auto& local_db = db.local(); diff --git a/multishard_mutation_query.hh b/multishard_mutation_query.hh index 8d49a4dd5b..f0baed9c78 100644 --- a/multishard_mutation_query.hh +++ b/multishard_mutation_query.hh @@ -67,5 +67,4 @@ future>, cache_tempera const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, - uint64_t max_size, db::timeout_clock::time_point timeout); diff --git a/mutation.cc b/mutation.cc index 0f68421695..3260ec97c8 100644 --- a/mutation.cc +++ b/mutation.cc @@ -122,20 +122,22 @@ mutation::query(query::result::builder& builder, query::result mutation::query(const query::partition_slice& slice, + query::result_memory_accounter&& accounter, query::result_options opts, gc_clock::time_point now, uint32_t row_limit) && { - query::result::builder builder(slice, opts, { }); + query::result::builder builder(slice, opts, std::move(accounter)); std::move(*this).query(builder, slice, now, row_limit); return builder.build(); } query::result mutation::query(const query::partition_slice& slice, + query::result_memory_accounter&& accounter, query::result_options opts, gc_clock::time_point now, uint32_t row_limit) const& { - return mutation(*this).query(slice, opts, now, row_limit); + return mutation(*this).query(slice, std::move(accounter), opts, now, row_limit); } size_t diff --git a/mutation.hh b/mutation.hh index 8baee800a8..e85ed56bd6 100644 --- a/mutation.hh +++ b/mutation.hh @@ -108,6 +108,7 @@ public: public: // The supplied partition_slice must be governed by this mutation's schema query::result query(const query::partition_slice&, + query::result_memory_accounter&& accounter, query::result_options opts = query::result_options::only_result(), gc_clock::time_point now = gc_clock::now(), uint32_t row_limit = query::max_rows) &&; @@ -115,6 +116,7 @@ public: // The supplied partition_slice must be governed by this mutation's schema // FIXME: Slower than the r-value version query::result query(const query::partition_slice&, + query::result_memory_accounter&& accounter, query::result_options opts = query::result_options::only_result(), gc_clock::time_point now = gc_clock::now(), uint32_t row_limit = query::max_rows) const&; diff --git a/mutation_partition.cc b/mutation_partition.cc index 59c7611028..a33b77f807 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1976,7 +1976,6 @@ class mutation_querier { bool _live_data_in_static_row{}; uint32_t _live_clustering_rows = 0; std::optional> _rows_wr; - bool _short_reads_allowed; private: void query_static_row(const row& r, tombstone current_tombstone); void prepare_writers(); @@ -1998,7 +1997,6 @@ mutation_querier::mutation_querier(const schema& s, query::result::partition_wri , _memory_accounter(memory_accounter) , _pw(std::move(pw)) , _static_cells_wr(pw.start().start_static_row().start_cells()) - , _short_reads_allowed(pw.slice().options.contains()) { } @@ -2011,7 +2009,7 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston get_compacted_row_slice(_schema, slice, column_kind::static_column, r, slice.static_columns, _static_cells_wr); _memory_accounter.update(_static_cells_wr._out.size() - start); - } else if (_short_reads_allowed) { + } else { seastar::measuring_output_stream stream; ser::qr_partition__static_row__cells out(stream, { }); auto start = stream.size(); @@ -2075,7 +2073,7 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr auto start = _rows_wr->_out.size(); write_row(*_rows_wr); stop = _memory_accounter.update_and_check(_rows_wr->_out.size() - start); - } else if (_short_reads_allowed) { + } else { seastar::measuring_output_stream stream; ser::qr_partition__rows out(stream, { }); auto start = stream.size(); @@ -2084,7 +2082,7 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr } _live_clustering_rows++; - return stop && stop_iteration(_short_reads_allowed); + return stop; } uint32_t mutation_querier::consume_end_of_stream() { @@ -2115,11 +2113,9 @@ class query_result_builder { query::result::builder& _rb; std::optional _mutation_consumer; stop_iteration _stop; - stop_iteration _short_read_allowed; public: query_result_builder(const schema& s, query::result::builder& rb) : _schema(s), _rb(rb) - , _short_read_allowed(_rb.slice().options.contains()) { } void consume_new_partition(const dht::decorated_key& dk) { @@ -2130,21 +2126,21 @@ public: _mutation_consumer->consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(sr), t) && _short_read_allowed; + _stop = _mutation_consumer->consume(std::move(sr), t); return _stop; } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(cr), t) && _short_read_allowed; + _stop = _mutation_consumer->consume(std::move(cr), t); return _stop; } stop_iteration consume(range_tombstone&& rt) { - _stop = _mutation_consumer->consume(std::move(rt)) && _short_read_allowed; + _stop = _mutation_consumer->consume(std::move(rt)); return _stop; } stop_iteration consume_end_of_partition() { auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); - if (_short_read_allowed && live_rows_in_partition > 0 && !_stop) { + if (live_rows_in_partition > 0 && !_stop) { _stop = _rb.memory_accounter().check(); } if (_stop) { @@ -2167,7 +2163,7 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, db::timeout_clock::time_point timeout, - query_class_config class_config, + query::query_class_config class_config, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) { @@ -2191,6 +2187,24 @@ future<> data_query( }); } +stop_iteration query::result_memory_accounter::check_local_limit() const { + if (_total_used_memory > _maximum_result_size.hard_limit) { + if (_short_read_allowed) { + return stop_iteration::yes; + } + throw std::runtime_error(fmt::format( + "Memory usage of unpaged query exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit)", + _maximum_result_size.hard_limit)); + } + if (_below_soft_limit && !_short_read_allowed && _total_used_memory > _maximum_result_size.soft_limit) { + mplog.warn( + "Memory usage of unpaged query exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit)", + _maximum_result_size.soft_limit); + _below_soft_limit = false; + } + return stop_iteration::no; +} + void reconcilable_result_builder::consume_new_partition(const dht::decorated_key& dk) { _return_static_content_on_partition_with_no_rows = _slice.options.contains(query::partition_slice::option::always_return_static_content) || @@ -2220,7 +2234,7 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom // guarantee progress, not ending the result on a live row would // mean that the next page fetch will read all tombstones after the // last live row again. - _stop = stop && stop_iteration(_short_read_allowed); + _stop = stop; } return _mutation_consumer->consume(std::move(cr)) || _stop; } @@ -2239,7 +2253,7 @@ stop_iteration reconcilable_result_builder::consume_end_of_partition() { // well. Next page fetch will ask for the next partition and if we // don't do that we could end up with an unbounded number of // partitions with only a static row. - _stop = _stop || (_memory_accounter.check() && stop_iteration(_short_read_allowed)); + _stop = _stop || _memory_accounter.check(); } _total_live_rows += _live_rows; _result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() }); @@ -2261,7 +2275,7 @@ static do_mutation_query(schema_ptr s, uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - query_class_config class_config, + query::query_class_config class_config, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) @@ -2301,7 +2315,7 @@ mutation_query(schema_ptr s, uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - query_class_config class_config, + query::query_class_config class_config, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) diff --git a/mutation_query.cc b/mutation_query.cc index 8c9dffa71c..530c987978 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -58,7 +58,8 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const { query::result to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions, query::result_options opts) { - query::result::builder builder(slice, opts, { }); + // This result was already built with a limit, don't apply another one. + query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); for (const partition& p : r.partitions()) { if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) { break; diff --git a/mutation_query.hh b/mutation_query.hh index ce2309a11e..66c4da2b87 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -122,14 +122,12 @@ class reconcilable_result_builder { uint32_t _total_live_rows = 0; query::result_memory_accounter _memory_accounter; stop_iteration _stop; - bool _short_read_allowed; std::optional _mutation_consumer; public: reconcilable_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) : _schema(s), _slice(slice) , _memory_accounter(std::move(accounter)) - , _short_read_allowed(slice.options.contains()) { } void consume_new_partition(const dht::decorated_key& dk); @@ -163,8 +161,8 @@ future mutation_query( uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - query_class_config class_config, - query::result_memory_accounter&& accounter = { }, + query::query_class_config class_config, + query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr = nullptr, query::querier_cache_context cache_ctx = { }); @@ -178,7 +176,7 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, db::timeout_clock::time_point timeout, - query_class_config class_config, + query::query_class_config class_config, tracing::trace_state_ptr trace_ptr = nullptr, query::querier_cache_context cache_ctx = { }); @@ -193,7 +191,7 @@ class mutation_query_stage { uint32_t, gc_clock::time_point, db::timeout_clock::time_point, - query_class_config, + query::query_class_config, query::result_memory_accounter&&, tracing::trace_state_ptr, query::querier_cache_context> _execution_stage; diff --git a/partition_slice_builder.hh b/partition_slice_builder.hh index 2b3796bc1a..5337775377 100644 --- a/partition_slice_builder.hh +++ b/partition_slice_builder.hh @@ -54,6 +54,11 @@ public: partition_slice_builder& without_partition_key_columns(); partition_slice_builder& without_clustering_key_columns(); partition_slice_builder& reversed(); + template + partition_slice_builder& with_option() { + _options.set