diff --git a/configure.py b/configure.py index 2df2448cfd..0462618c5c 100755 --- a/configure.py +++ b/configure.py @@ -292,6 +292,8 @@ scylla_tests = [ 'tests/enum_set_test', 'tests/extensions_test', 'tests/cql_auth_syntax_test', + 'tests/querier_cache', + 'tests/querier_cache_resource_based_eviction', ] perf_tests = [ @@ -601,6 +603,7 @@ scylla_core = (['database.cc', 'duration.cc', 'vint-serialization.cc', 'utils/arch/powerpc/crc32-vpmsum/crc32_wrapper.cc', + 'querier.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] @@ -724,6 +727,7 @@ tests_not_using_seastar_test_framework = set([ 'tests/memory_footprint', 'tests/gossip', 'tests/perf/perf_sstable', + 'tests/querier_cache_resource_based_eviction', ]) | pure_boost_tests for t in tests_not_using_seastar_test_framework: diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index cbd9626a3c..c4911cf2c7 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -294,7 +294,8 @@ modification_statement::read_required_rows( query::partition_slice::option::collections_as_maps>()); query::read_command cmd(s->id(), s->version(), ps, std::numeric_limits::max()); // FIXME: ignoring "local" - return proxy.local().query(s, make_lw_shared(std::move(cmd)), std::move(keys), cl, std::move(trace_state)).then([this, ps] (auto result) { + return proxy.local().query(s, make_lw_shared(std::move(cmd)), std::move(keys), + cl, std::move(trace_state)).then([this, ps] (auto result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [&] (query::result_view v) { auto prefetched_rows = update_parameters::prefetched_rows_type({update_parameters::prefetch_data(s)}); v.consume(ps, prefetch_data_builder(s, prefetched_rows.value(), ps)); diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 7e34d95f02..32be267662 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -248,7 +248,7 @@ select_statement::do_execute(distributed& proxy, ++_stats.reads; auto command = ::make_lw_shared(_schema->id(), _schema->version(), - make_partition_slice(options), limit, now, tracing::make_trace_info(state.get_trace_state()), query::max_partitions, options.get_timestamp(state)); + make_partition_slice(options), limit, now, tracing::make_trace_info(state.get_trace_state()), query::max_partitions, utils::UUID(), options.get_timestamp(state)); int32_t page_size = options.get_page_size(); @@ -326,14 +326,20 @@ select_statement::execute(distributed& proxy, return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, &options, cmd] (auto pr) { dht::partition_range_vector prange { pr }; auto command = ::make_lw_shared(*cmd); - return proxy.local().query(_schema, command, std::move(prange), options.get_consistency(), state.get_trace_state()); + return proxy.local().query(_schema, + command, + std::move(prange), + options.get_consistency(), + state.get_trace_state()).then([] (foreign_ptr>&& result, service::replicas_per_token_range) { + return std::move(result); + }); }, std::move(merger)); }).then([this, &options, now, cmd] (auto result) { return this->process_results(std::move(result), cmd, options, now); }); } else { return proxy.local().query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), state.get_trace_state()) - .then([this, &options, now, cmd] (auto result) { + .then([this, &options, now, cmd] (auto result, service::replicas_per_token_range) { return this->process_results(std::move(result), cmd, options, now); }); } @@ -351,7 +357,7 @@ select_statement::execute_internal(distributed& proxy, int32_t limit = get_limit(options); auto now = gc_clock::now(); auto command = ::make_lw_shared(_schema->id(), _schema->version(), - make_partition_slice(options), limit, now, std::experimental::nullopt, query::max_partitions, options.get_timestamp(state)); + make_partition_slice(options), limit, now, std::experimental::nullopt, query::max_partitions, utils::UUID(), options.get_timestamp(state)); auto partition_ranges = _restrictions->get_partition_key_ranges(options); tracing::add_table_name(state.get_trace_state(), keyspace(), column_family()); @@ -366,14 +372,16 @@ select_statement::execute_internal(distributed& proxy, dht::partition_range_vector prange { pr }; auto cmd = ::make_lw_shared(*command); return proxy.local().query(_schema, cmd, std::move(prange), db::consistency_level::ONE, state.get_trace_state(), - db::no_timeout); + db::no_timeout, {}).then([] (foreign_ptr> result, service::replicas_per_token_range) { + return std::move(result); + }); }, std::move(merger)); }).then([command, this, &options, now] (auto result) { return this->process_results(std::move(result), command, options, now); }).finally([command] { }); } else { auto query = proxy.local().query(_schema, command, std::move(partition_ranges), db::consistency_level::ONE, state.get_trace_state(), db::no_timeout); - return query.then([command, this, &options, now] (auto result) { + return query.then([command, this, &options, now] (auto result, service::replicas_per_token_range) { return this->process_results(std::move(result), command, options, now); }).finally([command] {}); } @@ -501,6 +509,7 @@ indexed_table_select_statement::do_execute(distributed& now, tracing::make_trace_info(state.get_trace_state()), query::max_partitions, + utils::UUID(), options.get_timestamp(state)); return this->execute(proxy, command, std::move(partition_ranges), state, options, now); }); @@ -536,12 +545,14 @@ indexed_table_select_statement::find_index_partition_ranges(distributed> result) { + cmd, + std::move(partition_ranges), + options.get_consistency(), + state.get_trace_state()).then([cmd, this, &options, now, &view] (foreign_ptr> result, + service::replicas_per_token_range) { std::vector columns; for (const column_definition& cdef : _schema->partition_key_columns()) { columns.emplace_back(view.schema()->get_column_definition(cdef.name())); diff --git a/database.cc b/database.cc index 83a9b980db..d1c8e2c34d 100644 --- a/database.cc +++ b/database.cc @@ -2117,6 +2117,9 @@ database::database(const db::config& cfg, database_config dbcfg) [this] { ++_stats->sstable_read_queue_overloaded; return std::make_exception_ptr(std::runtime_error("sstable inactive read queue overloaded")); + }, + [this] { + return _querier_cache.evict_one(); }) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. @@ -2259,6 +2262,29 @@ database::setup_metrics() { sm::description("Counts the total number of failed read operations. " "Add the total_reads to this value to get the total amount of reads issued on this shard.")), + sm::make_derive("querier_cache_lookups", _querier_cache.get_stats().lookups, + sm::description("Counts querier cache lookups (paging queries)")), + + sm::make_derive("querier_cache_misses", _querier_cache.get_stats().misses, + sm::description("Counts querier cache lookups that failed to find a cached querier")), + + sm::make_derive("querier_cache_drops", _querier_cache.get_stats().drops, + sm::description("Counts querier cache lookups that found a cached querier but had to drop it due to position mismatch")), + + sm::make_derive("querier_cache_time_based_evictions", _querier_cache.get_stats().time_based_evictions, + sm::description("Counts querier cache entries that timed out and were evicted.")), + + sm::make_derive("querier_cache_resource_based_evictions", _querier_cache.get_stats().resource_based_evictions, + sm::description("Counts querier cache entries that were evicted to free up resources " + "(limited by reader concurency limits) necessary to create new readers.")), + + sm::make_derive("querier_cache_memory_based_evictions", _querier_cache.get_stats().memory_based_evictions, + sm::description("Counts querier cache entries that were evicted because the memory usage " + "of the cached queriers were above the limit.")), + + sm::make_gauge("querier_cache_population", _querier_cache.get_stats().population, + sm::description("The number of entries currently in the querier cache.")), + sm::make_derive("sstable_read_queue_overloads", _stats->sstable_read_queue_overloaded, sm::description("Counts the number of times the sstable read queue was overloaded. " "A non-zero value indicates that we have to drop read requests because they arrive faster than we can serve them.")), @@ -2999,22 +3025,27 @@ struct query_state { }; future> -column_family::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, - const dht::partition_range_vector& partition_ranges, - tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter, - uint64_t max_size, db::timeout_clock::time_point timeout) { +column_family::query(schema_ptr s, + const query::read_command& cmd, + query::result_options opts, + const dht::partition_range_vector& partition_ranges, + tracing::trace_state_ptr trace_state, + query::result_memory_limiter& memory_limiter, + uint64_t max_size, + db::timeout_clock::time_point timeout, + querier_cache_context cache_ctx) { utils::latency_counter lc; _stats.reads.set_latency(lc); auto f = opts.request == query::result_request::only_digest ? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size); - return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable { + return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, + trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable { auto qs_ptr = std::make_unique(std::move(s), cmd, opts, partition_ranges, std::move(accounter)); auto& qs = *qs_ptr; - return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout] { + return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] { auto&& range = *qs.current_partition_range++; return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(), - qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, - timeout); + qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, timeout, cache_ctx); }).then([qs_ptr = std::move(qs_ptr), &qs] { return make_ready_future>( make_lw_shared(qs.builder.build())); @@ -3058,10 +3089,17 @@ future, cache_temperature> database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) { column_family& cf = find_column_family(cmd.cf_id); - return _data_query_stage(&cf, std::move(s), seastar::cref(cmd), opts, seastar::cref(ranges), - std::move(trace_state), seastar::ref(get_result_memory_limiter()), - max_result_size, - timeout).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) { + querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); + return _data_query_stage(&cf, + std::move(s), + seastar::cref(cmd), + opts, + seastar::cref(ranges), + std::move(trace_state), + seastar::ref(get_result_memory_limiter()), + max_result_size, + timeout, + std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) { if (f.failed()) { ++s->total_reads_failed; return make_exception_future, cache_temperature>(f.get_exception()); @@ -3078,9 +3116,18 @@ future database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { column_family& cf = find_column_family(cmd.cf_id); - return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.partition_limit, - cmd.timestamp, std::move(accounter), std::move(trace_state), - timeout).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) { + querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); + return mutation_query(std::move(s), + cf.as_mutation_source(), + range, + cmd.slice, + cmd.row_limit, + cmd.partition_limit, + cmd.timestamp, + std::move(accounter), + std::move(trace_state), + timeout, + std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) { if (f.failed()) { ++s->total_reads_failed; return make_exception_future(f.get_exception()); diff --git a/database.hh b/database.hh index 20b4d09a6e..0710e5d578 100644 --- a/database.hh +++ b/database.hh @@ -84,6 +84,7 @@ #include "dirty_memory_manager.hh" #include "reader_concurrency_semaphore.hh" #include "db/timeout_clock.hh" +#include "querier.hh" class cell_locker; class cell_locker_stats; @@ -660,7 +661,8 @@ public: tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter, uint64_t max_result_size, - db::timeout_clock::time_point timeout = db::no_timeout); + db::timeout_clock::time_point timeout = db::no_timeout, + querier_cache_context cache_ctx = { }); void start(); future<> stop(); @@ -1117,8 +1119,17 @@ private: semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads()}; - concrete_execution_stage>, column_family*, schema_ptr, const query::read_command&, query::result_options, - const dht::partition_range_vector&, tracing::trace_state_ptr, query::result_memory_limiter&, uint64_t, db::timeout_clock::time_point> _data_query_stage; + concrete_execution_stage>, + column_family*, + schema_ptr, + const query::read_command&, + query::result_options, + const dht::partition_range_vector&, + tracing::trace_state_ptr, + query::result_memory_limiter&, + uint64_t, + db::timeout_clock::time_point, + querier_cache_context> _data_query_stage; std::unordered_map _keyspaces; std::unordered_map> _column_families; @@ -1131,6 +1142,9 @@ private: bool _enable_incremental_backups = false; compaction_controller _compaction_controller; + + querier_cache _querier_cache; + future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout); @@ -1302,6 +1316,14 @@ public: return *_stats; } + void set_querier_cache_entry_ttl(std::chrono::seconds entry_ttl) { + _querier_cache.set_entry_ttl(entry_ttl); + } + + const querier_cache::stats& get_querier_cache_stats() const { + return _querier_cache.get_stats(); + } + friend class distributed_loader; }; diff --git a/db/consistency_level.cc b/db/consistency_level.cc index fd46d8ea23..b95ac3dc0f 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -157,7 +157,10 @@ std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair, gms::inet_address* extra, column_family* cf) { + const std::vector& preferred_endpoints, + read_repair_decision read_repair, + gms::inet_address* extra, + column_family* cf) { size_t local_count; if (read_repair == read_repair_decision::GLOBAL) { // take RRD.GLOBAL out of the way @@ -182,6 +185,30 @@ filter_for_query(consistency_level cl, return std::move(live_endpoints); } + std::vector selected_endpoints; + + // Pre-select endpoints based on client preference. If the endpoints + // selected this way aren't enough to satisfy CL requirements select the + // remaining ones according to the load-balancing strategy as before. + if (!preferred_endpoints.empty()) { + const auto it = boost::stable_partition(live_endpoints, [&preferred_endpoints] (const gms::inet_address& a) { + return std::find(preferred_endpoints.cbegin(), preferred_endpoints.cend(), a) == preferred_endpoints.end(); + }); + const size_t selected = std::distance(it, live_endpoints.end()); + if (selected >= bf) { + if (extra) { + *extra = selected == bf ? live_endpoints.front() : *(it + bf); + } + return std::vector(it, it + bf); + } else if (selected) { + selected_endpoints.reserve(bf); + std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints)); + live_endpoints.erase(it, live_endpoints.end()); + } + } + + const auto remaining_bf = bf - selected_endpoints.size(); + if (cf) { auto get_hit_rate = [cf] (gms::inet_address ep) -> float { constexpr float max_hit_rate = 0.999; @@ -213,21 +240,21 @@ filter_for_query(consistency_level cl, if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations // local node is always first if present (see storage_proxy::get_live_sorted_endpoints) unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1; - live_endpoints = miss_equalizing_combination(epi, local_idx, bf, bool(extra)); + live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)); } } if (extra) { - *extra = live_endpoints[bf]; // extra replica for speculation + *extra = live_endpoints[remaining_bf]; // extra replica for speculation } - live_endpoints.erase(live_endpoints.begin() + bf, live_endpoints.end()); + std::move(live_endpoints.begin(), live_endpoints.begin() + remaining_bf, std::back_inserter(selected_endpoints)); - return std::move(live_endpoints); + return selected_endpoints; } std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints, column_family* cf) { - return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE, nullptr, cf); + return filter_for_query(cl, ks, live_endpoints, {}, read_repair_decision::NONE, nullptr, cf); } bool diff --git a/db/consistency_level.hh b/db/consistency_level.hh index fedf437ef0..b9ba56b7db 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -79,7 +79,10 @@ std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair, gms::inet_address* extra, column_family* cf); + const std::vector& preferred_endpoints, + read_repair_decision read_repair, + gms::inet_address* extra, + column_family* cf); std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints, column_family* cf); diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh index 95bdb3fa14..d8b93ee3aa 100644 --- a/db/size_estimates_virtual_reader.hh +++ b/db/size_estimates_virtual_reader.hh @@ -124,6 +124,12 @@ public: } return make_ready_future<>(); } + virtual size_t buffer_size() const override { + if (_partition_reader) { + return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size(); + } + return flat_mutation_reader::impl::buffer_size(); + } /** * Returns the primary ranges for the local node. * Used for testing as well. diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index c644b8d5d7..26ceab096e 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1663,7 +1663,8 @@ query(distributed& proxy, const sstring& ks_name, const auto slice = partition_slice_builder(*schema).build(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), std::numeric_limits::max()); - return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE, nullptr, db::no_timeout).then([schema, cmd] (auto&& result) { + return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE, + nullptr, db::no_timeout).then([schema, cmd] (auto&& result, service::replicas_per_token_range) { return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result)); }); } @@ -1678,7 +1679,8 @@ query(distributed& proxy, const sstring& ks_name, const .build(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), query::max_rows); - return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE, nullptr, db::no_timeout).then([schema, cmd] (auto&& result) { + return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE, + nullptr, db::no_timeout).then([schema, cmd] (auto&& result, service::replicas_per_token_range) { return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result)); }); } diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index f8f1f78b8c..2d30b4d411 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -145,6 +145,9 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override { throw std::bad_function_call(); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _source->buffer_size(); + } }; return make_flat_mutation_reader(original); @@ -250,6 +253,9 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) { }; return _underlying.fast_forward_to(pr, timeout); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _underlying.buffer_size(); + } }; return make_flat_mutation_reader(std::move(m)); } @@ -309,6 +315,9 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par clear_buffer(); return _underlying.fast_forward_to(pr, timeout); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _underlying.buffer_size(); + } }; return make_flat_mutation_reader(std::move(r), single_partition); } @@ -598,6 +607,9 @@ public: _reader.next_partition(); } } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size(); + } }; flat_mutation_reader diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 6121a6f2f2..d311c4656d 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -278,6 +278,14 @@ public: */ virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) = 0; virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) = 0; + + // Altough for most cases this is a mere getter some readers might have + // one or more subreaders and will need to account for their buffer-size + // as well so we need to allow these readers to override the default + // implementation. + virtual size_t buffer_size() const { + return _buffer_size; + } }; private: std::unique_ptr _impl; @@ -394,6 +402,13 @@ public: return peek(); }); } + // The actual buffer size of the reader. + // Altough we consistently refer to this as buffer size throught the code + // we really use "buffer size" as the size of the collective memory + // used by all the mutation fragments stored in the buffer of the reader. + size_t buffer_size() const { + return _impl->buffer_size(); + } }; using flat_mutation_reader_opt = optimized_optional; @@ -474,11 +489,15 @@ flat_mutation_reader transform(flat_mutation_reader r, T t) { virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { throw std::bad_function_call(); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size(); + } }; return make_flat_mutation_reader(std::move(r), std::move(t)); } inline flat_mutation_reader& to_reference(flat_mutation_reader& r) { return r; } +inline const flat_mutation_reader& to_reference(const flat_mutation_reader& r) { return r; } template class delegating_reader : public flat_mutation_reader::impl { @@ -507,6 +526,9 @@ public: clear_buffer(); return to_reference(_underlying).fast_forward_to(pr, timeout); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + to_reference(_underlying).buffer_size(); + } }; flat_mutation_reader make_delegating_reader(flat_mutation_reader&); diff --git a/idl/paging_state.idl.hh b/idl/paging_state.idl.hh index 9c6e6dfb77..81442d3002 100644 --- a/idl/paging_state.idl.hh +++ b/idl/paging_state.idl.hh @@ -4,6 +4,8 @@ class paging_state { partition_key get_partition_key(); std::experimental::optional get_clustering_key(); uint32_t get_remaining(); + utils::UUID get_query_uuid() [[version 2.2]] = utils::UUID(); + std::unordered_map, std::vector> get_last_replicas() [[version 2.2]] = std::unordered_map, std::vector>(); }; } } diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 192a8063ed..cf4e806fe3 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -48,6 +48,8 @@ class read_command { std::chrono::time_point timestamp; std::experimental::optional trace_info [[version 1.3]]; uint32_t partition_limit [[version 1.3]] = std::numeric_limits::max(); + utils::UUID query_uuid [[version 2.2]] = utils::UUID(); + bool is_first_page [[version 2.2]] = false; }; } diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index b4d7af46cf..6d02780bbd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -222,6 +222,14 @@ utils::UUID token_metadata::get_host_id(inet_address endpoint) const { return _endpoint_to_host_id_map.at(endpoint); } +std::optional token_metadata::get_host_id_if_known(inet_address endpoint) const { + auto it = _endpoint_to_host_id_map.find(endpoint); + if (it == _endpoint_to_host_id_map.end()) { + return { }; + } + return it->second; +} + std::experimental::optional token_metadata::get_endpoint_for_host_id(UUID host_id) const { auto beg = _endpoint_to_host_id_map.cbegin(); auto end = _endpoint_to_host_id_map.cend(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 2ad0b38ac5..aaf2f66c6c 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -469,6 +469,9 @@ public: /** Return the unique host ID for an end-point. */ UUID get_host_id(inet_address endpoint) const; + /// Return the unique host ID for an end-point or nullopt if not found. + std::optional get_host_id_if_known(inet_address endpoint) const; + /** Return the end-point for a unique host ID */ std::experimental::optional get_endpoint_for_host_id(UUID host_id) const; diff --git a/memtable.cc b/memtable.cc index f58aef208d..71d30d84e4 100644 --- a/memtable.cc +++ b/memtable.cc @@ -371,6 +371,12 @@ public: virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override { throw std::runtime_error("This reader can't be fast forwarded to another partition."); }; + virtual size_t buffer_size() const override { + if (_delegate) { + return flat_mutation_reader::impl::buffer_size() + _delegate->buffer_size(); + } + return flat_mutation_reader::impl::buffer_size(); + } }; void memtable::add_flushed_memory(uint64_t delta) { @@ -524,6 +530,12 @@ public: virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { throw std::bad_function_call(); } + virtual size_t buffer_size() const override { + if (_partition_reader) { + return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size(); + } + return flat_mutation_reader::impl::buffer_size(); + } }; lw_shared_ptr memtable_entry::snapshot(memtable& mtbl) { diff --git a/mutation_compactor.hh b/mutation_compactor.hh index b40b8a63b0..712f5db039 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -40,27 +40,25 @@ enum class compact_for_sstables { yes, }; -/* -template -concept bool CompactedMutationsConsumer() { - return requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, - clustering_row cr, range_tombstone rt, tombstone current_tombstone, bool is_alive) - { +GCC6_CONCEPT( + template + concept bool CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, + clustering_row cr, range_tombstone rt, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { obj.consume_new_partition(dk); obj.consume(t); - { obj.consume(std::move(sr), current_tombstone, is_alive) } ->stop_iteration; - { obj.consume(std::move(cr), current_tombstone, is_alive) } ->stop_iteration; - { obj.consume(std::move(rt)) } ->stop_iteration; - { obj.consume_end_of_partition() } ->stop_iteration; + { obj.consume(std::move(sr), current_tombstone, is_alive) } -> stop_iteration; + { obj.consume(std::move(cr), current_row_tombstone, is_alive) } -> stop_iteration; + { obj.consume(std::move(rt)) } -> stop_iteration; + { obj.consume_end_of_partition() } -> stop_iteration; obj.consume_end_of_stream(); }; -} -*/ +) + // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. -template -class compact_mutation { +template +class compact_mutation_state { const schema& _schema; gc_clock::time_point _query_time; gc_clock::time_point _gc_before; @@ -72,15 +70,17 @@ class compact_mutation { uint32_t _partition_limit{}; uint32_t _partition_row_limit{}; - CompactedMutationsConsumer _consumer; range_tombstone_accumulator _range_tombstones; bool _static_row_live{}; uint32_t _rows_in_current_partition; uint32_t _current_partition_limit; bool _empty_partition{}; - const dht::decorated_key* _dk; + const dht::decorated_key* _dk{}; + dht::decorated_key _last_dk; bool _has_ck_selector{}; + + std::optional _last_static_row; private: static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; @@ -89,13 +89,14 @@ private: return SSTableCompaction == compact_for_sstables::yes; } - void partition_is_not_empty() { + template + void partition_is_not_empty(Consumer& consumer) { if (_empty_partition) { _empty_partition = false; - _consumer.consume_new_partition(*_dk); + consumer.consume_new_partition(*_dk); auto pt = _range_tombstones.get_partition_tombstone(); if (pt && !can_purge_tombstone(pt)) { - _consumer.consume(pt); + consumer.consume(pt); } } } @@ -120,11 +121,17 @@ private: } return t.timestamp < _max_purgeable; }; -public: - compact_mutation(compact_mutation&&) = delete; // Because 'this' is captured - compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, - uint32_t partition_limit, CompactedMutationsConsumer consumer) +public: + struct parameters { + static constexpr emit_only_live_rows only_live = OnlyLive; + static constexpr compact_for_sstables sstable_compaction = SSTableCompaction; + }; + + compact_mutation_state(compact_mutation_state&&) = delete; // Because 'this' is captured + + compact_mutation_state(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, + uint32_t partition_limit) : _schema(s) , _query_time(query_time) , _gc_before(saturating_subtract(query_time, s.gc_grace_seconds())) @@ -133,22 +140,22 @@ public: , _row_limit(limit) , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) - , _consumer(std::move(consumer)) , _range_tombstones(s, _slice.options.contains(query::partition_slice::option::reversed)) + , _last_dk({dht::token(), partition_key::make_empty()}) { static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction."); } - compact_mutation(const schema& s, gc_clock::time_point compaction_time, CompactedMutationsConsumer consumer, - std::function get_max_purgeable) + compact_mutation_state(const schema& s, gc_clock::time_point compaction_time, + std::function get_max_purgeable) : _schema(s) , _query_time(compaction_time) , _gc_before(saturating_subtract(_query_time, s.gc_grace_seconds())) , _get_max_purgeable(std::move(get_max_purgeable)) , _can_gc([this] (tombstone t) { return can_gc(t); }) , _slice(s.full_slice()) - , _consumer(std::move(consumer)) , _range_tombstones(s, false) + , _last_dk({dht::token(), partition_key::make_empty()}) { static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction."); static_assert(!only_live(), "SSTable compaction cannot be run with emit_only_live_rows::yes."); @@ -164,29 +171,43 @@ public: _range_tombstones.clear(); _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; + _last_static_row.reset(); } - void consume(tombstone t) { + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + void consume(tombstone t, Consumer& consumer) { _range_tombstones.set_partition_tombstone(t); if (!only_live() && !can_purge_tombstone(t)) { - partition_is_not_empty(); + partition_is_not_empty(consumer); } } - stop_iteration consume(static_row&& sr) { + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + stop_iteration consume(static_row&& sr, Consumer& consumer) { + _last_static_row = sr; auto current_tombstone = _range_tombstones.get_partition_tombstone(); bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, row_tombstone(current_tombstone), _query_time, _can_gc, _gc_before); _static_row_live = is_live; if (is_live || (!only_live() && !sr.empty())) { - partition_is_not_empty(); - return _consumer.consume(std::move(sr), current_tombstone, is_live); + partition_is_not_empty(consumer); + return consumer.consume(std::move(sr), current_tombstone, is_live); } return stop_iteration::no; } - stop_iteration consume(clustering_row&& cr) { + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + stop_iteration consume(clustering_row&& cr, Consumer& consumer) { auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key()); auto t = cr.tomb(); if (t.tomb() <= current_tombstone || can_purge_tombstone(t)) { @@ -196,8 +217,8 @@ public: bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before); is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before); if (only_live() && is_live) { - partition_is_not_empty(); - auto stop = _consumer.consume(std::move(cr), t, true); + partition_is_not_empty(consumer); + auto stop = consumer.consume(std::move(cr), t, true); if (++_rows_in_current_partition == _current_partition_limit) { return stop_iteration::yes; } @@ -205,8 +226,8 @@ public: } else if (!only_live()) { auto stop = stop_iteration::no; if (!cr.empty()) { - partition_is_not_empty(); - stop = _consumer.consume(std::move(cr), t, is_live); + partition_is_not_empty(consumer); + stop = consumer.consume(std::move(cr), t, is_live); } if (!sstable_compaction() && is_live && ++_rows_in_current_partition == _current_partition_limit) { return stop_iteration::yes; @@ -216,17 +237,25 @@ public: return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + stop_iteration consume(range_tombstone&& rt, Consumer& consumer) { _range_tombstones.apply(rt); // FIXME: drop tombstone if it is fully covered by other range tombstones if (!can_purge_tombstone(rt.tomb) && rt.tomb > _range_tombstones.get_partition_tombstone()) { - partition_is_not_empty(); - return _consumer.consume(std::move(rt)); + partition_is_not_empty(consumer); + return consumer.consume(std::move(rt)); } return stop_iteration::no; } - stop_iteration consume_end_of_partition() { + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + stop_iteration consume_end_of_partition(Consumer& consumer) { if (!_empty_partition) { // #589 - Do not add extra row for statics unless we did a CK range-less query. // See comment in query @@ -236,7 +265,7 @@ public: _row_limit -= _rows_in_current_partition; _partition_limit -= _rows_in_current_partition > 0; - auto stop = _consumer.consume_end_of_partition(); + auto stop = consumer.consume_end_of_partition(); if (!sstable_compaction()) { return _row_limit && _partition_limit && stop != stop_iteration::yes ? stop_iteration::no : stop_iteration::yes; @@ -245,17 +274,129 @@ public: return stop_iteration::no; } - auto consume_end_of_stream() { - return _consumer.consume_end_of_stream(); + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + auto consume_end_of_stream(Consumer& consumer) { + if (_dk) { + _last_dk = *_dk; + _dk = &_last_dk; + } + return consumer.consume_end_of_stream(); + } + + /// The decorated key of the partition the compaction is positioned in. + /// Can be null if the compaction wasn't started yet. + const dht::decorated_key* current_partition() const { + return _dk; + } + + /// Reset limits and query-time to the new page's ones and re-emit the + /// partition-header and static row if there are clustering rows or range + /// tombstones left in the partition. + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + void start_new_page(uint32_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time, + mutation_fragment::kind next_fragment_kind, + Consumer& consumer) { + _empty_partition = true; + _static_row_live = false; + _row_limit = row_limit; + _partition_limit = partition_limit; + _rows_in_current_partition = 0; + _current_partition_limit = std::min(_row_limit, _partition_row_limit); + _query_time = query_time; + _gc_before = saturating_subtract(query_time, _schema.gc_grace_seconds()); + + if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone) + && _last_static_row) { + // Stopping here would cause an infinite loop so ignore return value. + consume(*std::exchange(_last_static_row, {}), consumer); + } + } + + bool are_limits_reached() const { + return _row_limit == 0 || _partition_limit == 0; } }; -template -struct compact_for_query : compact_mutation { - using compact_mutation::compact_mutation; +template +GCC6_CONCEPT( + requires CompactedFragmentsConsumer +) +class compact_mutation { + lw_shared_ptr> _state; + Consumer _consumer; + +public: + compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, + uint32_t partition_limit, Consumer consumer) + : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) + , _consumer(std::move(consumer)) { + } + + compact_mutation(const schema& s, gc_clock::time_point compaction_time, Consumer consumer, + std::function get_max_purgeable) + : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) + , _consumer(std::move(consumer)) { + } + + compact_mutation(lw_shared_ptr> state, Consumer consumer) + : _state(std::move(state)) + , _consumer(std::move(consumer)) { + } + + void consume_new_partition(const dht::decorated_key& dk) { + _state->consume_new_partition(dk); + } + + void consume(tombstone t) { + _state->consume(std::move(t), _consumer); + } + + stop_iteration consume(static_row&& sr) { + return _state->consume(std::move(sr), _consumer); + } + + stop_iteration consume(clustering_row&& cr) { + return _state->consume(std::move(cr), _consumer); + } + + stop_iteration consume(range_tombstone&& rt) { + return _state->consume(std::move(rt), _consumer); + } + + stop_iteration consume_end_of_partition() { + return _state->consume_end_of_partition(_consumer); + } + + auto consume_end_of_stream() { + return _state->consume_end_of_stream(_consumer); + } }; -template -struct compact_for_compaction : compact_mutation { - using compact_mutation::compact_mutation; +template +GCC6_CONCEPT( + requires CompactedFragmentsConsumer +) +struct compact_for_query : compact_mutation { + using compact_mutation::compact_mutation; +}; + +template +using compact_for_query_state = compact_mutation_state; +using compact_for_mutation_query_state = compact_for_query_state; +using compact_for_data_query_state = compact_for_query_state; + +template +GCC6_CONCEPT( + requires CompactedFragmentsConsumer +) +struct compact_for_compaction : compact_mutation { + using compact_mutation::compact_mutation; }; diff --git a/mutation_partition.cc b/mutation_partition.cc index 86d6cc31dd..10e8f2102d 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1904,22 +1904,26 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, tracing::trace_state_ptr trace_ptr, - db::timeout_clock::time_point timeout) + db::timeout_clock::time_point timeout, + querier_cache_context cache_ctx) { if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) { return make_ready_future<>(); } - auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); + auto q = cache_ctx.lookup(emit_only_live_rows::yes, *s, range, slice, trace_ptr, [&, trace_ptr] { + return querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), + std::move(trace_ptr), emit_only_live_rows::yes); + }); - auto qrb = query_result_builder(*s, builder); - auto cfq = make_stable_flattened_mutations_consumer>( - *s, query_time, slice, row_limit, partition_limit, std::move(qrb)); - - return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr), - streamed_mutation::forwarding::no, mutation_reader::forwarding::no), - [cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable { - return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout); + return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (querier& q) mutable { + auto qrb = query_result_builder(*s, builder); + return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout).then( + [=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable { + if (q.are_limits_reached() || builder.is_short_read()) { + cache_ctx.insert(std::move(q), std::move(trace_ptr)); + } + }); }); } @@ -1928,7 +1932,7 @@ class reconcilable_result_builder { const query::partition_slice& _slice; std::vector _result; - uint32_t _live_rows; + uint32_t _live_rows{}; bool _has_ck_selector{}; bool _static_row_is_alive{}; @@ -2012,22 +2016,28 @@ static do_mutation_query(schema_ptr s, gc_clock::time_point query_time, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, - db::timeout_clock::time_point timeout) + db::timeout_clock::time_point timeout, + querier_cache_context cache_ctx) { if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) { return make_ready_future(reconcilable_result()); } - auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); + auto q = cache_ctx.lookup(emit_only_live_rows::no, *s, range, slice, trace_ptr, [&, trace_ptr] { + return querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), + std::move(trace_ptr), emit_only_live_rows::no); + }); - auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter)); - auto cfq = make_stable_flattened_mutations_consumer>( - *s, query_time, slice, row_limit, partition_limit, std::move(rrb)); - - return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr), - streamed_mutation::forwarding::no, mutation_reader::forwarding::no), - [cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable { - return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout); + return do_with(std::move(q), + [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (querier& q) mutable { + auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter)); + return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout).then( + [=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable { + if (q.are_limits_reached() || r.is_short_read()) { + cache_ctx.insert(std::move(q), std::move(trace_ptr)); + } + return r; + }); }); } @@ -2043,10 +2053,11 @@ mutation_query(schema_ptr s, gc_clock::time_point query_time, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, - db::timeout_clock::time_point timeout) + db::timeout_clock::time_point timeout, + querier_cache_context cache_ctx) { return mutation_query_stage(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice), - row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout); + row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx)); } deletable_row::deletable_row(clustering_row&& cr) diff --git a/mutation_query.hh b/mutation_query.hh index baf9cbcfd2..934a799c11 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -26,6 +26,7 @@ #include "mutation_reader.hh" #include "frozen_mutation.hh" #include "db/timeout_clock.hh" +#include "querier.hh" class reconcilable_result; class frozen_reconcilable_result; @@ -129,7 +130,8 @@ future mutation_query( gc_clock::time_point query_time, query::result_memory_accounter&& accounter = { }, tracing::trace_state_ptr trace_ptr = nullptr, - db::timeout_clock::time_point timeout = db::no_timeout); + db::timeout_clock::time_point timeout = db::no_timeout, + querier_cache_context cache_ctx = { }); future<> data_query( schema_ptr s, @@ -141,7 +143,8 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, tracing::trace_state_ptr trace_ptr = nullptr, - db::timeout_clock::time_point timeout = db::no_timeout); + db::timeout_clock::time_point timeout = db::no_timeout, + querier_cache_context cache_ctx = { }); // Performs a query for counter updates. future counter_write_query(schema_ptr, const mutation_source&, diff --git a/mutation_reader.cc b/mutation_reader.cc index 8089364f5a..d1a4b63321 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -43,6 +43,7 @@ GCC6_CONCEPT( { p.next_partition() }; { p.fast_forward_to(part_range, timeout) } -> future<>; { p.fast_forward_to(pos_range, timeout) } -> future<>; + { p.buffer_size() } -> size_t; }; ) @@ -125,6 +126,10 @@ public: future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) { return _producer.fast_forward_to(std::move(pr), timeout); } + + size_t buffer_size() const { + return _producer.buffer_size(); + } }; // Merges the output of the sub-readers into a single non-decreasing @@ -202,6 +207,7 @@ public: void next_partition(); future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout); future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout); + size_t buffer_size() const; }; // Combines multiple mutation_readers into one. @@ -220,6 +226,7 @@ public: virtual void next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override; + virtual size_t buffer_size() const override; }; // Dumb selector implementation for combined_mutation_reader that simply @@ -464,6 +471,10 @@ future<> mutation_reader_merger::fast_forward_to(position_range pr, db::timeout_ }); } +size_t mutation_reader_merger::buffer_size() const { + return boost::accumulate(_all_readers | boost::adaptors::transformed(std::mem_fn(&flat_mutation_reader::buffer_size)), size_t(0)); +} + combined_mutation_reader::combined_mutation_reader(schema_ptr schema, std::unique_ptr selector, streamed_mutation::forwarding fwd_sm, @@ -520,6 +531,10 @@ future<> combined_mutation_reader::fast_forward_to(position_range pr, db::timeou return _producer.fast_forward_to(std::move(pr), timeout); } +size_t combined_mutation_reader::buffer_size() const { + return flat_mutation_reader::impl::buffer_size() + _producer.buffer_size(); +} + flat_mutation_reader make_combined_reader(schema_ptr schema, std::unique_ptr selectors, streamed_mutation::forwarding fwd_sm, @@ -568,6 +583,9 @@ future> reader_concur return make_exception_future>(_make_queue_overloaded_exception()); } auto r = resources(1, static_cast(memory)); + if (!may_proceed(r) && _evict_an_inactive_reader) { + while (_evict_an_inactive_reader() && !may_proceed(r)); + } if (may_proceed(r)) { _resources -= r; return make_ready_future>(make_lw_shared(*this, r)); @@ -697,7 +715,7 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { }; std::variant _state; - static const std::size_t new_reader_base_cost{16 * 1024}; + static const ssize_t new_reader_base_cost{16 * 1024}; template GCC6_CONCEPT( @@ -767,6 +785,12 @@ public: return reader.fast_forward_to(std::move(pr), timeout); }, timeout); } + virtual size_t buffer_size() const override { + if (auto* state = std::get_if(&_state)) { + return state->reader.buffer_size(); + } + return 0; + } }; flat_mutation_reader diff --git a/mutation_reader.hh b/mutation_reader.hh index 3a341fd631..e6531c5ff8 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -134,6 +134,9 @@ public: _end_of_stream = false; return _rd.fast_forward_to(std::move(pr), timeout); } + virtual size_t buffer_size() const override { + return flat_mutation_reader::impl::buffer_size() + _rd.buffer_size(); + } }; // Creates a mutation_reader wrapper which creates a new stream of mutations diff --git a/querier.cc b/querier.cc new file mode 100644 index 0000000000..75ae274603 --- /dev/null +++ b/querier.cc @@ -0,0 +1,304 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "querier.hh" + +#include "schema.hh" + +#include + +static sstring cannot_use_reason(querier::can_use cu) +{ + switch (cu) + { + case querier::can_use::yes: + return "can be used"; + case querier::can_use::no_emit_only_live_rows_mismatch: + return "emit only live rows mismatch"; + case querier::can_use::no_schema_version_mismatch: + return "schema version mismatch"; + case querier::can_use::no_ring_pos_mismatch: + return "ring pos mismatch"; + case querier::can_use::no_clustering_pos_mismatch: + return "clustering pos mismatch"; + } + return "unknown reason"; +} + +querier::position querier::current_position() const { + const dht::decorated_key* dk = std::visit([] (const auto& cs) { return cs->current_partition(); }, _compaction_state); + const clustering_key_prefix* clustering_key = *_last_ckey ? &**_last_ckey : nullptr; + return {dk, clustering_key}; +} + +bool querier::ring_position_matches(const dht::partition_range& range, const querier::position& pos) const { + const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(_slice->options.contains(query::partition_slice::option::reversed)); + + const auto expected_start = dht::ring_position_view(*pos.partition_key); + // If there are no clustering columns or the select is distinct we don't + // have clustering rows at all. In this case we can be sure we won't have + // anything more in the last page's partition and thus the start bound is + // exclusive. Otherwise there migh be clustering rows still and it is + // inclusive. + const auto expected_inclusiveness = _schema->clustering_key_size() > 0 && + !_slice->options.contains() && + pos.clustering_key; + const auto comparator = dht::ring_position_comparator(*_schema); + + if (is_reversed && !range.is_singular()) { + const auto& end = range.end(); + return end && comparator(end->value(), expected_start) == 0 && end->is_inclusive() == expected_inclusiveness; + } + + const auto& start = range.start(); + return start && comparator(start->value(), expected_start) == 0 && start->is_inclusive() == expected_inclusiveness; +} + +bool querier::clustering_position_matches(const query::partition_slice& slice, const querier::position& pos) const { + const auto& row_ranges = slice.row_ranges(*_schema, pos.partition_key->key()); + + if (row_ranges.empty()) { + // This is a valid slice on the last page of a query with + // clustering restrictions. It simply means the query is + // effectively over, no further results are expected. We + // can assume the clustering position matches. + return true; + } + + if (!pos.clustering_key) { + // We stopped at a non-clustering position so the partition's clustering + // row ranges should be the default row ranges. + return &row_ranges == &slice.default_row_ranges(); + } + + clustering_key_prefix::equality eq(*_schema); + + const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(_slice->options.contains(query::partition_slice::option::reversed)); + + // If the page ended mid-partition the first partition range should start + // with the last clustering key (exclusive). + const auto& first_row_range = row_ranges.front(); + const auto& start = is_reversed ? first_row_range.end() : first_row_range.start(); + if (!start) { + return false; + } + return !start->is_inclusive() && eq(start->value(), *pos.clustering_key); +} + +bool querier::matches(const dht::partition_range& range) const { + const auto& qr = *_range; + if (qr.is_singular() != range.is_singular()) { + return false; + } + + const auto cmp = dht::ring_position_comparator(*_schema); + const auto bound_eq = [&] (const stdx::optional& a, const stdx::optional& b) { + return bool(a) == bool(b) && (!a || a->equal(*b, cmp)); + }; + + return qr.is_singular() ? + bound_eq(qr.start(), range.start()) : + bound_eq(qr.start(), range.start()) || bound_eq(qr.end(), range.end()); +} + +querier::can_use querier::can_be_used_for_page(emit_only_live_rows only_live, const schema& s, + const dht::partition_range& range, const query::partition_slice& slice) const { + if (only_live != emit_only_live_rows(std::holds_alternative>(_compaction_state))) { + return can_use::no_emit_only_live_rows_mismatch; + } + if (s.version() != _schema->version()) { + return can_use::no_schema_version_mismatch; + } + + const auto pos = current_position(); + + if (!pos.partition_key) { + // There was nothing read so far so we assume we are ok. + return can_use::yes; + } + + if (!ring_position_matches(range, pos)) { + return can_use::no_ring_pos_mismatch; + } + if (!clustering_position_matches(slice, pos)) { + return can_use::no_clustering_pos_mismatch; + } + return can_use::yes; +} + +// The time-to-live of a cache-entry. +const std::chrono::seconds querier_cache::default_entry_ttl{10}; + +const size_t querier_cache::max_queriers_memory_usage = memory::stats().total_memory() * 0.04; + +void querier_cache::scan_cache_entries() { + const auto now = lowres_clock::now(); + + auto it = _meta_entries.begin(); + const auto end = _meta_entries.end(); + while (it != end && it->is_expired(now)) { + if (*it) { + ++_stats.time_based_evictions; + } + it = _meta_entries.erase(it); + _stats.population = _entries.size(); + } +} + +querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) { + const auto queriers = _entries.equal_range(key); + + if (queriers.first == _entries.end()) { + tracing::trace(trace_state, "Found no cached querier for key {}", key); + return _entries.end(); + } + + const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair& elem) { + return elem.second.get().matches(range); + }); + + if (it == queriers.second) { + tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range {}", key, range); + } + tracing::trace(trace_state, "Found cached querier for key {} and range {}", key, range); + return it; +} + +querier_cache::querier_cache(std::chrono::seconds entry_ttl) + : _expiry_timer([this] { scan_cache_entries(); }) + , _entry_ttl(entry_ttl) { + _expiry_timer.arm_periodic(entry_ttl / 2); +} + +void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) { + // FIXME: see #3159 + // In reverse mode flat_mutation_reader drops any remaining rows of the + // current partition when the page ends so it cannot be reused across + // pages. + if (q.is_reversed()) { + return; + } + + tracing::trace(trace_state, "Caching querier with key {}", key); + + auto memory_usage = boost::accumulate( + _entries | boost::adaptors::map_values | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0)); + + // We add the memory-usage of the to-be added querier to the memory-usage + // of all the cached queriers. We now need to makes sure this number is + // smaller then the maximum allowed memory usage. If it isn't we evict + // cached queriers and substract their memory usage from this number until + // it goes below the limit. + memory_usage += q.memory_usage(); + + if (memory_usage >= max_queriers_memory_usage) { + auto it = _meta_entries.begin(); + const auto end = _meta_entries.end(); + while (it != end && memory_usage >= max_queriers_memory_usage) { + if (*it) { + ++_stats.memory_based_evictions; + memory_usage -= it->get_entry().memory_usage(); + } + it = _meta_entries.erase(it); + } + } + + const auto it = _entries.emplace(key, entry::param{std::move(q), _entry_ttl}).first; + _meta_entries.emplace_back(_entries, it); + _stats.population = _entries.size(); +} + +querier querier_cache::lookup(utils::UUID key, + emit_only_live_rows only_live, + const schema& s, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + const noncopyable_function& create_fun) { + auto it = find_querier(key, range, trace_state); + ++_stats.lookups; + if (it == _entries.end()) { + ++_stats.misses; + return create_fun(); + } + + auto q = std::move(it->second).get(); + _entries.erase(it); + _stats.population = _entries.size(); + + const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice); + if (can_be_used == querier::can_use::yes) { + tracing::trace(trace_state, "Reusing querier"); + return q; + } + + tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used)); + ++_stats.drops; + return create_fun(); +} + +void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) { + _entry_ttl = entry_ttl; + _expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2); +} + +bool querier_cache::evict_one() { + if (_entries.empty()) { + return false; + } + + auto it = _meta_entries.begin(); + const auto end = _meta_entries.end(); + while (it != end) { + const auto is_live = bool(*it); + it = _meta_entries.erase(it); + _stats.population = _entries.size(); + if (is_live) { + ++_stats.resource_based_evictions; + return true; + } + } + return false; +} + +querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page) + : _cache(&cache) + , _key(key) + , _is_first_page(is_first_page) { +} + +void querier_cache_context::insert(querier&& q, tracing::trace_state_ptr trace_state) { + if (_cache && _key != utils::UUID{}) { + _cache->insert(_key, std::move(q), std::move(trace_state)); + } +} + +querier querier_cache_context::lookup(emit_only_live_rows only_live, + const schema& s, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + const noncopyable_function& create_fun) { + if (_cache && _key != utils::UUID{} && !_is_first_page) { + return _cache->lookup(_key, only_live, s, range, slice, std::move(trace_state), create_fun); + } + return create_fun(); +} diff --git a/querier.hh b/querier.hh new file mode 100644 index 0000000000..e2ddfecd78 --- /dev/null +++ b/querier.hh @@ -0,0 +1,405 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "mutation_compactor.hh" +#include "mutation_reader.hh" + +#include +#include + +/// One-stop object for serving queries. +/// +/// Encapsulates all state and logic for serving all pages for a given range +/// of a query on a given shard. Can serve mutation or data queries. Can be +/// used with any CompactedMutationsConsumer certified result-builder. +/// Intended to be created on the first page of a query then saved and reused on +/// subsequent pages. +/// (1) Create with the parameters of your query. +/// (2) Call consume_page() with your consumer to consume the contents of the +/// next page. +/// (3) At the end of the page save the querier if you expect more pages. +/// The are_limits_reached() method can be used to determine whether the +/// page was filled or not. Also check your result builder for short reads. +/// Most result builders have memory-accounters that will stop the read +/// once some memory limit was reached. This is called a short read as the +/// read stops before the row and/or partition limits are reached. +/// (4) At the beginning of the next page use can_be_used_for_page() to +/// determine whether it can be used with the page's schema and start +/// position. If a schema or position mismatch is detected the querier +/// cannot be used to produce the next page and a new one has to be created +/// instead. +class querier { +public: + enum class can_use { + yes, + no_emit_only_live_rows_mismatch, + no_schema_version_mismatch, + no_ring_pos_mismatch, + no_clustering_pos_mismatch + }; +private: + template + class clustering_position_tracker { + std::unique_ptr _consumer; + lw_shared_ptr> _last_ckey; + + public: + clustering_position_tracker(std::unique_ptr&& consumer, lw_shared_ptr> last_ckey) + : _consumer(std::move(consumer)) + , _last_ckey(std::move(last_ckey)) { + } + + void consume_new_partition(const dht::decorated_key& dk) { + _last_ckey->reset(); + _consumer->consume_new_partition(dk); + } + void consume(tombstone t) { + _consumer->consume(t); + } + stop_iteration consume(static_row&& sr, tombstone t, bool is_live) { + return _consumer->consume(std::move(sr), std::move(t), is_live); + } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_live) { + *_last_ckey = cr.key(); + return _consumer->consume(std::move(cr), std::move(t), is_live); + } + stop_iteration consume(range_tombstone&& rt) { + return _consumer->consume(std::move(rt)); + } + stop_iteration consume_end_of_partition() { + return _consumer->consume_end_of_partition(); + } + auto consume_end_of_stream() { + return _consumer->consume_end_of_stream(); + } + }; + + struct position { + const dht::decorated_key* partition_key; + const clustering_key_prefix* clustering_key; + }; + + schema_ptr _schema; + std::unique_ptr _range; + std::unique_ptr _slice; + flat_mutation_reader _reader; + std::variant, lw_shared_ptr> _compaction_state; + lw_shared_ptr> _last_ckey; + + std::variant, lw_shared_ptr> make_compaction_state( + const schema& s, + uint32_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time, + emit_only_live_rows only_live) const { + if (only_live == emit_only_live_rows::yes) { + return make_lw_shared>(s, query_time, *_slice, row_limit, partition_limit); + } else { + return make_lw_shared>(s, query_time, *_slice, row_limit, partition_limit); + } + } + + position current_position() const; + + bool ring_position_matches(const dht::partition_range& range, const position& pos) const; + + bool clustering_position_matches(const query::partition_slice& slice, const position& pos) const; + +public: + querier(const mutation_source& ms, + schema_ptr schema, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_ptr, + emit_only_live_rows only_live) + : _schema(schema) + , _range(std::make_unique(range)) + , _slice(std::make_unique(slice)) + , _reader(ms.make_reader(schema, *_range, *_slice, pc, std::move(trace_ptr), + streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _compaction_state(make_compaction_state(*schema, 0, 0, gc_clock::time_point{}, only_live)) + , _last_ckey(make_lw_shared>()) { + } + + bool is_reversed() const { + return _slice->options.contains(query::partition_slice::option::reversed); + } + + bool are_limits_reached() const { + return std::visit([] (const auto& cs) { return cs->are_limits_reached(); }, _compaction_state); + } + + /// Does the querier's range matches `range`? + /// + /// A query can have more then one querier executing parallelly for + /// different sub-ranges on the same shard. This method helps identifying + /// the appropriate one for the `range'. + /// Since ranges can be narrowed from page-to-page (as the query moves + /// through it) we cannot just check the two ranges for equality. + /// Instead we exploit the fact the a query-range will always be split into + /// non-overlapping sub ranges and thus each bound of a range is unique. + /// Thus if any of the range's bounds are equal we have a match. + /// For singular ranges we just check the one bound. + bool matches(const dht::partition_range& range) const; + + /// Can the querier be used for the next page? + /// + /// The querier can only be used for the next page if the only_live, the + /// schema versions, the ring and the clustering positions match. + can_use can_be_used_for_page(emit_only_live_rows only_live, const schema& s, + const dht::partition_range& range, const query::partition_slice& slice) const; + + template + GCC6_CONCEPT( + requires CompactedFragmentsConsumer + ) + auto consume_page(Consumer&& consumer, + uint32_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time, + db::timeout_clock::time_point timeout) { + return std::visit([=, consumer = std::move(consumer)] (auto& compaction_state) mutable { + // FIXME: #3158 + // consumer cannot be moved after consume_new_partition() is called + // on it because it stores references to some of it's own members. + // Move it to the heap before any consumption begins to avoid + // accidents. + return _reader.peek().then([=, consumer = std::make_unique(std::move(consumer))] (mutation_fragment* next_fragment) mutable { + const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer); + + const auto is_reversed = flat_mutation_reader::consume_reversed_partitions( + _slice->options.contains(query::partition_slice::option::reversed)); + + using compaction_state_type = typename std::remove_reference::type; + constexpr auto only_live = compaction_state_type::parameters::only_live; + auto reader_consumer = make_stable_flattened_mutations_consumer>>( + compaction_state, + clustering_position_tracker(std::move(consumer), _last_ckey)); + + return _reader.consume(std::move(reader_consumer), is_reversed, timeout); + }); + }, _compaction_state); + } + + size_t memory_usage() const { + return _reader.buffer_size(); + } + +}; + +/// Special-purpose cache for saving queriers between pages. +/// +/// Queriers are saved at the end of the page and looked up at the beginning of +/// the next page. The lookup() always removes the querier from the cache, it +/// has to be inserted again at the end of the page. +/// Lookup provides the following extra logic, special to queriers: +/// * It accepts a factory function which is used to create a new querier if +/// the lookup fails (see below). This allows for simple call sites. +/// * It does range matching. A query sometimes will result in multiple querier +/// objects executing on the same node and shard paralelly. To identify the +/// appropriate querier lookup() will consider - in addition to the lookup +/// key - the read range. +/// * It does schema version and position checking. In some case a subsequent +/// page will have a different schema version or will start from a position +/// that is before the end position of the previous page. lookup() will +/// recognize these cases and drop the previous querier and create a new one. +/// +/// Inserted queriers will have a TTL. When this expires the querier is +/// evicted. This is to avoid excess and unnecessary resource usage due to +/// abandoned queriers. +/// Provides a way to evict readers one-by-one via `evict_one()`. This can be +/// used by the concurrency-limiting code to evict cached readers to free up +/// resources for admitting new ones. +/// Keeps the total memory consumption of cached queriers +/// below max_queriers_memory_usage by evicting older entries upon inserting +/// new ones if the the memory consupmtion would go above the limit. +class querier_cache { +public: + static const std::chrono::seconds default_entry_ttl; + static const size_t max_queriers_memory_usage; + + struct stats { + // The number of cache lookups. + uint64_t lookups = 0; + // The subset of lookups that missed. + uint64_t misses = 0; + // The subset of lookups that hit but the looked up querier had to be + // dropped due to position mismatch. + uint64_t drops = 0; + // The number of queriers evicted due to their TTL expiring. + uint64_t time_based_evictions = 0; + // The number of queriers evicted to free up resources to be able to + // create new readers. + uint64_t resource_based_evictions = 0; + // The number of queriers evicted to because the maximum memory usage + // was reached. + uint64_t memory_based_evictions = 0; + // The number of queriers currently in the cache. + uint64_t population = 0; + }; + +private: + class entry : public weakly_referencable { + querier _querier; + lowres_clock::time_point _expires; + public: + // Since entry cannot be moved and unordered_map::emplace can pass only + // a single param to it's mapped-type we need to force a single-param + // constructor for entry. Oh C++... + struct param { + querier q; + std::chrono::seconds ttl; + }; + + explicit entry(param p) + : _querier(std::move(p.q)) + , _expires(lowres_clock::now() + p.ttl) { + } + + bool is_expired(const lowres_clock::time_point& now) const { + return _expires <= now; + } + + const querier& get() const & { + return _querier; + } + + querier&& get() && { + return std::move(_querier); + } + + size_t memory_usage() const { + return _querier.memory_usage(); + } + }; + + using entries = std::unordered_map; + + class meta_entry { + entries& _entries; + weak_ptr _entry_ptr; + entries::iterator _entry_it; + + public: + meta_entry(entries& e, entries::iterator it) + : _entries(e) + , _entry_ptr(it->second.weak_from_this()) + , _entry_it(it) { + } + + ~meta_entry() { + if (_entry_ptr) { + _entries.erase(_entry_it); + } + } + + bool is_expired(const lowres_clock::time_point& now) const { + return !_entry_ptr || _entry_ptr->is_expired(now); + } + + explicit operator bool() const { + return bool(_entry_ptr); + } + + const entry& get_entry() const { + return *_entry_ptr; + } + }; + + entries _entries; + std::list _meta_entries; + timer _expiry_timer; + std::chrono::seconds _entry_ttl; + stats _stats; + + entries::iterator find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state); + + void scan_cache_entries(); + +public: + querier_cache(std::chrono::seconds entry_ttl = default_entry_ttl); + + querier_cache(const querier_cache&) = delete; + querier_cache& operator=(const querier_cache&) = delete; + + // this is captured + querier_cache(querier_cache&&) = delete; + querier_cache& operator=(querier_cache&&) = delete; + + void insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state); + + /// Lookup a querier in the cache. + /// + /// If the querier doesn't exist, use `create_fun' to create it. + /// + /// Queriers are found based on `key` and `range`. There may be multiple + /// queriers for the same `key` differentiated by their read range. Since + /// each subsequent page may have a narrower read range then the one before + /// it ranges cannot be simply matched based on equality. For matching we + /// use the fact that the coordinator splits the query range into + /// non-overlapping ranges. Thus both bounds of any range, or in case of + /// singular ranges only the start bound are guaranteed to be unique. + /// + /// The found querier is checked for a matching read-kind and schema + /// version. The start position is also checked against the current + /// position of the querier using the `range' and `slice'. If there is a + /// mismatch drop the querier and create a new one with `create_fun'. + querier lookup(utils::UUID key, + emit_only_live_rows only_live, + const schema& s, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + const noncopyable_function& create_fun); + + void set_entry_ttl(std::chrono::seconds entry_ttl); + + /// Evict a querier. + /// + /// Return true if a querier was evicted and false otherwise (if the cache + /// is empty). + bool evict_one(); + + const stats& get_stats() const { + return _stats; + } +}; + +class querier_cache_context { + querier_cache* _cache{}; + utils::UUID _key; + bool _is_first_page; + +public: + querier_cache_context() = default; + querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page); + void insert(querier&& q, tracing::trace_state_ptr trace_state); + querier lookup(emit_only_live_rows only_live, + const schema& s, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + const noncopyable_function& create_fun); +}; diff --git a/query-request.hh b/query-request.hh index 245f4a0bde..688df77b05 100644 --- a/query-request.hh +++ b/query-request.hh @@ -173,6 +173,19 @@ public: gc_clock::time_point timestamp; std::experimental::optional trace_info; uint32_t partition_limit; // The maximum number of live partitions to return. + // The "query_uuid" field is useful in pages queries: It tells the replica + // that when it finishes the read request prematurely, i.e., reached the + // desired number of rows per page, it should not destroy the reader object, + // rather it should keep it alive - at its current position - and save it + // under the unique key "query_uuid". Later, when we want to resume + // the read at exactly the same position (i.e., to request the next page) + // we can pass this same unique id in that query's "query_uuid" field. + utils::UUID query_uuid; + // Signal to the replica that this is the first page of a (maybe) paged + // read request as far the replica is concerned. Can be used by the replica + // to avoid doing work normally done on paged requests, e.g. attempting to + // reused suspended readers. + bool is_first_page; api::timestamp_type read_timestamp; // not serialized public: read_command(utils::UUID cf_id, @@ -182,6 +195,8 @@ public: gc_clock::time_point now = gc_clock::now(), std::experimental::optional ti = std::experimental::nullopt, uint32_t partition_limit = max_partitions, + utils::UUID query_uuid = utils::UUID(), + bool is_first_page = false, api::timestamp_type rt = api::missing_timestamp) : cf_id(std::move(cf_id)) , schema_version(std::move(schema_version)) @@ -190,6 +205,8 @@ public: , timestamp(now) , trace_info(std::move(ti)) , partition_limit(partition_limit) + , query_uuid(query_uuid) + , is_first_page(is_first_page) , read_timestamp(rt) { } diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 976bdd4e9f..158f57f6d9 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -130,6 +130,7 @@ private: size_t _max_queue_length = std::numeric_limits::max(); std::function _make_queue_overloaded_exception = default_make_queue_overloaded_exception; + std::function _evict_an_inactive_reader; bool has_available_units(const resources& r) const { return bool(_resources) && _resources >= r; @@ -152,10 +153,12 @@ public: reader_concurrency_semaphore(unsigned count, size_t memory, size_t max_queue_length = std::numeric_limits::max(), - std::function raise_queue_overloaded_exception = default_make_queue_overloaded_exception) + std::function raise_queue_overloaded_exception = default_make_queue_overloaded_exception, + std::function evict_an_inactive_reader = {}) : _resources(count, memory) , _max_queue_length(max_queue_length) - , _make_queue_overloaded_exception(raise_queue_overloaded_exception) { + , _make_queue_overloaded_exception(raise_queue_overloaded_exception) + , _evict_an_inactive_reader(std::move(evict_an_inactive_reader)) { } reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete; diff --git a/row_cache.cc b/row_cache.cc index 4f6ffae640..f603aa2b1f 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -458,6 +458,12 @@ public: forward_buffer_to(pr.start()); return _reader->fast_forward_to(std::move(pr), timeout); } + virtual size_t buffer_size() const override { + if (_reader) { + return flat_mutation_reader::impl::buffer_size() + _reader->buffer_size(); + } + return flat_mutation_reader::impl::buffer_size(); + } }; void cache_tracker::clear_continuity(cache_entry& ce) { @@ -747,6 +753,12 @@ public: return make_ready_future<>(); } } + virtual size_t buffer_size() const override { + if (_reader) { + return flat_mutation_reader::impl::buffer_size() + _reader->buffer_size(); + } + return flat_mutation_reader::impl::buffer_size(); + } }; flat_mutation_reader diff --git a/serializer_impl.hh b/serializer_impl.hh index 7530953c6a..bd0c457232 100644 --- a/serializer_impl.hh +++ b/serializer_impl.hh @@ -255,6 +255,38 @@ struct serializer> { } }; +template +struct serializer> { + template + static std::unordered_map read(Input& in) { + auto sz = deserialize(in, boost::type()); + std::unordered_map m; + m.reserve(sz); + while (sz--) { + auto k = deserialize(in, boost::type()); + auto v = deserialize(in, boost::type()); + m.emplace(std::move(k), std::move(v)); + } + return m; + } + template + static void write(Output& out, const std::unordered_map& v) { + safe_serialize_as_uint32(out, v.size()); + for (auto&& e : v) { + serialize(out, e.first); + serialize(out, e.second); + } + } + template + static void skip(Input& in) { + auto sz = deserialize(in, boost::type()); + while (sz--) { + serializer::skip(in); + serializer::skip(in); + } + } +}; + template struct serializer> { template diff --git a/service/pager/paging_state.cc b/service/pager/paging_state.cc index 09fcfbc3ea..be776c2c1f 100644 --- a/service/pager/paging_state.cc +++ b/service/pager/paging_state.cc @@ -44,15 +44,25 @@ #include "paging_state.hh" #include "core/simple-stream.hh" #include "idl/keys.dist.hh" +#include "idl/uuid.dist.hh" #include "idl/paging_state.dist.hh" +#include "idl/token.dist.hh" +#include "idl/range.dist.hh" #include "serializer_impl.hh" #include "idl/keys.dist.impl.hh" +#include "idl/uuid.dist.impl.hh" #include "idl/paging_state.dist.impl.hh" +#include "idl/token.dist.impl.hh" +#include "idl/range.dist.impl.hh" #include "message/messaging_service.hh" service::pager::paging_state::paging_state(partition_key pk, std::experimental::optional ck, - uint32_t rem) - : _partition_key(std::move(pk)), _clustering_key(std::move(ck)), _remaining(rem) { + uint32_t rem, utils::UUID query_uuid, replicas_per_token_range last_replicas) + : _partition_key(std::move(pk)) + , _clustering_key(std::move(ck)) + , _remaining(rem) + , _query_uuid(query_uuid) + , _last_replicas(std::move(last_replicas)) { } ::shared_ptr service::pager::paging_state::deserialize( diff --git a/service/pager/paging_state.hh b/service/pager/paging_state.hh index 8c308990f5..c8c253886d 100644 --- a/service/pager/paging_state.hh +++ b/service/pager/paging_state.hh @@ -45,18 +45,30 @@ #include "bytes.hh" #include "keys.hh" +#include "utils/UUID.hh" +#include "dht/i_partitioner.hh" namespace service { namespace pager { class paging_state final { +public: + using replicas_per_token_range = std::unordered_map, std::vector>; + +private: partition_key _partition_key; std::experimental::optional _clustering_key; uint32_t _remaining; + utils::UUID _query_uuid; + replicas_per_token_range _last_replicas; public: - paging_state(partition_key pk, std::experimental::optional ck, uint32_t rem); + paging_state(partition_key pk, + std::experimental::optional ck, + uint32_t rem, + utils::UUID reader_recall_uuid, + replicas_per_token_range last_replicas); /** * Last processed key, i.e. where to start from in next paging round @@ -78,6 +90,37 @@ public: return _remaining; } + /** + * query_uuid is a unique key under which the replicas saved the + * readers used to serve the last page. These saved readers may be + * resumed (if not already purged from the cache) instead of opening new + * ones - as a performance optimization. + * If the uuid is the invalid default-constructed UUID(), it means that + * the client got this paging_state from a coordinator running an older + * version of Scylla. + */ + utils::UUID get_query_uuid() const { + return _query_uuid; + } + + /** + * The replicas used to serve the last page. + * + * Helps paged queries consistently hit the same replicas for each + * subsequent page. Replicas that already served a page will keep + * the readers used for filling it around in a cache. Subsequent + * page request hitting the same replicas can reuse these readers + * to fill the pages avoiding the work of creating these readers + * from scratch on every page. + * In a mixed cluster older coordinators will ignore this value. + * Replicas are stored per token-range where the token-range + * is some subrange of the query range that doesn't cross node + * boundaries. + */ + replicas_per_token_range get_last_replicas() const { + return _last_replicas; + } + static ::shared_ptr deserialize(bytes_opt bytes); bytes_opt serialize() const; }; diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 773fdfd10f..b29a3bc66c 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -78,7 +78,17 @@ private: _max = state->get_remaining(); _last_pkey = state->get_partition_key(); _last_ckey = state->get_clustering_key(); + _cmd->query_uuid = state->get_query_uuid(); + _cmd->is_first_page = false; + _last_replicas = state->get_last_replicas(); + } else { + // Reusing readers is currently only supported for singular queries. + if (_ranges.front().is_singular()) { + _cmd->query_uuid = utils::make_random_uuid(); + } + _cmd->is_first_page = true; } + qlogger.trace("fetch_page query id {}", _cmd->query_uuid); if (_last_pkey) { auto dpk = dht::global_partitioner().decorate_key(*_schema, *_last_pkey); @@ -204,9 +214,11 @@ private: auto ranges = _ranges; auto command = ::make_lw_shared(*_cmd); - return get_local_storage_proxy().query(_schema, std::move(command), std::move(ranges), - _options.get_consistency(), _state.get_trace_state()).then( - [this, &builder, page_size, now](foreign_ptr> results) { + auto& sp = get_local_storage_proxy(); + return sp.query(_schema, std::move(command), std::move(ranges), + _options.get_consistency(), _state.get_trace_state(), sp.default_query_timeout(), std::move(_last_replicas)).then( + [this, &builder, page_size, now](foreign_ptr> results, paging_state::replicas_per_token_range last_replicas) { + _last_replicas = std::move(last_replicas); handle_result(builder, std::move(results), page_size, now); }); } @@ -301,10 +313,8 @@ private: } ::shared_ptr state() const override { - return _exhausted ? - nullptr : - ::make_shared(*_last_pkey, - _last_ckey, _max); + return _exhausted ? nullptr : ::make_shared(*_last_pkey, + _last_ckey, _max, _cmd->query_uuid, _last_replicas); } private: @@ -322,6 +332,7 @@ private: const cql3::query_options& _options; lw_shared_ptr _cmd; dht::partition_range_vector _ranges; + paging_state::replicas_per_token_range _last_replicas; }; bool service::pager::query_pagers::may_need_paging(uint32_t page_size, diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 7e2f22374b..2800c003e9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -402,6 +402,38 @@ public: } }; +static std::vector +replica_ids_to_endpoints(const std::vector& replica_ids) { + const auto& tm = get_local_storage_service().get_token_metadata(); + + std::vector endpoints; + endpoints.reserve(replica_ids.size()); + + for (const auto& replica_id : replica_ids) { + if (auto endpoint_opt = tm.get_endpoint_for_host_id(replica_id)) { + endpoints.push_back(*endpoint_opt); + } + } + + return endpoints; +} + +static std::vector +endpoints_to_replica_ids(const std::vector& endpoints) { + const auto& tm = get_local_storage_service().get_token_metadata(); + + std::vector replica_ids; + replica_ids.reserve(endpoints.size()); + + for (const auto& endpoint : endpoints) { + if (auto replica_id_opt = tm.get_host_id_if_known(endpoint)) { + replica_ids.push_back(*replica_id_opt); + } + } + + return replica_ids; +} + bool storage_proxy::need_throttle_writes() const { return _stats.background_write_bytes > memory::stats().total_memory() / 10 || _stats.queued_write_bytes > 6*1024*1024; } @@ -2478,6 +2510,8 @@ protected: db::consistency_level _cl; size_t _block_for; std::vector _targets; + // Targets that were succesfully used for a data or digest request + std::vector _used_targets; promise>> _result_promise; tracing::trace_state_ptr _trace_state; lw_shared_ptr _cf; @@ -2491,7 +2525,15 @@ public: } virtual ~abstract_read_executor() { _proxy->_stats.reads--; - }; + } + + /// Targets that were successfully ised for data and/or digest requests. + /// + /// Only filled after the request is finished, call only after + /// execute()'s future is ready. + std::vector used_targets() const { + return _used_targets; + } protected: future>, cache_temperature> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { @@ -2563,6 +2605,7 @@ protected: _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); ++_proxy->_stats.data_read_completed.get_ep_stat(ep); + _used_targets.push_back(ep); } catch(...) { ++_proxy->_stats.data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -2578,6 +2621,7 @@ protected: _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); ++_proxy->_stats.digest_read_completed.get_ep_stat(ep); + _used_targets.push_back(ep); } catch(...) { ++_proxy->_stats.digest_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -2845,7 +2889,11 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s return db::read_repair_decision::NONE; } -::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state) { +::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, + dht::partition_range pr, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + const std::vector& preferred_endpoints) { const dht::token& token = pr.start()->value().token(); schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); @@ -2855,7 +2903,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s std::vector all_replicas = get_live_sorted_endpoints(ks, token); db::read_repair_decision repair_decision = new_read_repair_decision(*schema); auto cf = _db.local().find_column_family(schema).shared_from_this(); - std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, repair_decision, + std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr); @@ -2955,37 +3003,60 @@ void storage_proxy::handle_read_error(std::exception_ptr eptr, bool range) { } } -future>> -storage_proxy::query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, - tracing::trace_state_ptr trace_state, - clock_type::time_point timeout) { - std::vector<::shared_ptr> exec; +future>, replicas_per_token_range> +storage_proxy::query_singular(lw_shared_ptr cmd, + dht::partition_range_vector&& partition_ranges, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas) { + std::vector, nonwrapping_range>> exec; exec.reserve(partition_ranges.size()); for (auto&& pr: partition_ranges) { if (!pr.is_singular()) { throw std::runtime_error("mixed singular and non singular range are not supported"); } - exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state)); + + auto token_range = nonwrapping_range::make_singular(pr.start()->value().token()); + auto it = preferred_replicas.find(token_range); + const auto replicas = it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(it->second); + + exec.emplace_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas), std::move(token_range)); } query::result_merger merger(cmd->row_limit, cmd->partition_limit); merger.reserve(exec.size()); - auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr& rex) { + auto used_replicas = make_lw_shared(); + + auto f = ::map_reduce(exec.begin(), exec.end(), [timeout, used_replicas] ( + std::pair<::shared_ptr, nonwrapping_range>& executor_and_token_range) { + auto& [rex, token_range] = executor_and_token_range; utils::latency_counter lc; lc.start(); - return rex->execute(timeout).finally([lc, rex] () mutable { + return rex->execute(timeout).then_wrapped([lc, rex, used_replicas, token_range = std::move(token_range)] ( + future>> f) mutable { + if (!f.failed()) { + used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(rex->used_targets())); + } if (lc.is_start()) { rex->get_cf()->add_coordinator_read_latency(lc.stop().latency()); } + return std::move(f); }); }, std::move(merger)); - return f.handle_exception([exec = std::move(exec), p = shared_from_this()] (std::exception_ptr eptr) { - // hold onto exec until read is complete - p->handle_read_error(eptr, false); - return make_exception_future>>(eptr); + return f.then_wrapped([exec = std::move(exec), + p = shared_from_this(), + used_replicas] (future>> f) { + if (f.failed()) { + auto eptr = f.get_exception(); + // hold onto exec until read is complete + p->handle_read_error(eptr, false); + return make_exception_future>, replicas_per_token_range>(eptr); + } + return make_ready_future>, replicas_per_token_range>(std::move(f.get0()), std::move(*used_replicas)); }); } @@ -3108,10 +3179,13 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t }); } -future>> -storage_proxy::query_partition_key_range(lw_shared_ptr cmd, dht::partition_range_vector partition_ranges, db::consistency_level cl, - tracing::trace_state_ptr trace_state, - clock_type::time_point timeout) { +future>, replicas_per_token_range> +storage_proxy::query_partition_key_range(lw_shared_ptr cmd, + dht::partition_range_vector partition_ranges, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas) { schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); dht::partition_range_vector ranges; @@ -3156,23 +3230,26 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, merger(std::move(r)); } - return merger.get(); + return make_ready_future>, replicas_per_token_range>(merger.get(), replicas_per_token_range{}); }); } -future>> +future>, replicas_per_token_range> storage_proxy::query(schema_ptr s, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, - db::consistency_level cl, tracing::trace_state_ptr trace_state, - clock_type::time_point timeout) + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas) { if (slogger.is_enabled(logging::log_level::trace) || qlogger.is_enabled(logging::log_level::trace)) { static thread_local int next_id = 0; auto query_id = next_id++; slogger.trace("query {}.{} cmd={}, ranges={}, id={}", s->ks_name(), s->cf_name(), *cmd, partition_ranges, query_id); - return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout)).then([query_id, cmd, s] (foreign_ptr>&& res) { + return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout), std::move(preferred_replicas)).then( + [query_id, cmd, s] (foreign_ptr>&& res, replicas_per_token_range&& preferred_replicas) { if (res->buf().is_linearized()) { res->ensure_counts(); slogger.trace("query_result id={}, size={}, rows={}, partitions={}", query_id, res->buf().size(), *res->row_count(), *res->partition_count()); @@ -3180,23 +3257,26 @@ storage_proxy::query(schema_ptr s, slogger.trace("query_result id={}, size={}", query_id, res->buf().size()); } qlogger.trace("id={}, {}", query_id, res->pretty_printer(s, cmd->slice)); - return std::move(res); + return make_ready_future>, replicas_per_token_range>( + std::move(res), std::move(preferred_replicas)); }); } - return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout)); + return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout), std::move(preferred_replicas)); } -future>> +future>, replicas_per_token_range> storage_proxy::do_query(schema_ptr s, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state, - clock_type::time_point timeout) + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas) { static auto make_empty = [] { - return make_ready_future>>(make_foreign(make_lw_shared())); + return make_ready_future>, replicas_per_token_range>( + make_foreign(make_lw_shared()), replicas_per_token_range{}); }; auto& slice = cmd->slice; @@ -3210,11 +3290,16 @@ storage_proxy::do_query(schema_ptr s, if (query::is_single_partition(partition_ranges[0])) { // do not support mixed partitions (yet?) try { - return query_singular(cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout)).finally([lc, p] () mutable { - p->_stats.read.mark(lc.stop().latency()); - if (lc.is_start()) { - p->_stats.estimated_read.add(lc.latency(), p->_stats.read.hist.count); - } + return query_singular(cmd, + std::move(partition_ranges), + cl, + std::move(trace_state), + std::move(timeout), + std::move(preferred_replicas)).finally([lc, p] () mutable { + p->_stats.read.mark(lc.stop().latency()); + if (lc.is_start()) { + p->_stats.estimated_read.add(lc.latency(), p->_stats.read.hist.count); + } }); } catch (const no_such_column_family&) { _stats.read.mark(lc.stop().latency()); @@ -3222,7 +3307,12 @@ storage_proxy::do_query(schema_ptr s, } } - return query_partition_key_range(cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout)).finally([lc, p] () mutable { + return query_partition_key_range(cmd, + std::move(partition_ranges), + cl, + std::move(trace_state), + std::move(timeout), + std::move(preferred_replicas)).finally([lc, p] () mutable { p->_stats.range.mark(lc.stop().latency()); if (lc.is_start()) { p->_stats.estimated_range.add(lc.latency(), p->_stats.range.hist.count); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9dee99ebd6..9b6f74eb94 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -68,6 +68,8 @@ class abstract_write_response_handler; class abstract_read_executor; class mutation_holder; +using replicas_per_token_range = std::unordered_map, std::vector>; + class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { public: using clock_type = lowres_clock; @@ -227,11 +229,12 @@ private: seastar::metrics::metric_groups _metrics; private: void uninit_messaging_service(); - future>> query_singular(lw_shared_ptr cmd, - dht::partition_range_vector&& partition_ranges, - db::consistency_level cl, - tracing::trace_state_ptr trace_state, - clock_type::time_point timeout); + future>, replicas_per_token_range> query_singular(lw_shared_ptr cmd, + dht::partition_range_vector&& partition_ranges, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas); response_id_type register_response_handler(shared_ptr&& h); void remove_response_handler(response_id_type id); void got_response(response_id_type id, gms::inet_address from); @@ -251,7 +254,11 @@ private: std::vector get_live_endpoints(keyspace& ks, const dht::token& token); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); - ::shared_ptr get_read_executor(lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state); + ::shared_ptr get_read_executor(lw_shared_ptr cmd, + dht::partition_range pr, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + const std::vector& preferred_endpoints); future>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state, @@ -262,12 +269,12 @@ private: clock_type::time_point timeout, query::digest_algorithm da, uint64_t max_size = query::result_memory_limiter::maximum_result_size); - future>> - query_partition_key_range(lw_shared_ptr cmd, - dht::partition_range_vector partition_ranges, - db::consistency_level cl, - tracing::trace_state_ptr trace_state, - clock_type::time_point timeout); + future>, replicas_per_token_range> query_partition_key_range(lw_shared_ptr cmd, + dht::partition_range_vector partition_ranges, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas); dht::partition_range_vector get_restricted_ranges(const schema& s, dht::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); static std::vector intersection(const std::vector& l1, const std::vector& l2); @@ -276,11 +283,13 @@ private: dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count); - future>> do_query(schema_ptr, + future>, replicas_per_token_range> do_query(schema_ptr, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, - db::consistency_level cl, tracing::trace_state_ptr trace_state, - clock_type::time_point timeout); + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas); template future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); template @@ -313,6 +322,9 @@ private: public: storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled = {}); ~storage_proxy(); + const distributed& get_db() const { + return _db; + } distributed& get_db() { return _db; } @@ -378,30 +390,48 @@ public: */ future<> truncate_blocking(sstring keyspace, sstring cfname); + /** + * Default query timeout as defined by the configuration. + */ + clock_type::time_point default_query_timeout() const { + return clock_type::now() + std::chrono::milliseconds(get_db().local().get_config().read_request_timeout_in_ms()); + } + /* * Executes data query on the whole cluster. * * Partitions for each range will be ordered according to decorated_key ordering. Results for * each range from "partition_ranges" may appear in any order. * + * Will consider the preferred_replicas provided by the caller when selecting the replicas to + * send read requests to. However this is merely a hint and it is not guaranteed that the read + * requests will be sent to all or any of the listed replicas. After the query is done the list + * of replicas that served it is also returned. + * * IMPORTANT: Not all fibers started by this method have to be done by the time it returns so no * parameter can be changed after being passed to this method. */ - future>> query(schema_ptr, + future>, replicas_per_token_range> query(schema_ptr, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state, - clock_type::time_point timeout); + clock_type::time_point timeout, + replicas_per_token_range preferred_replicas = {}); - // helper method that uses the default timeout for this query (as specified in the config file) - future>> query(schema_ptr s, + /** + * query() overload without the preferred_replicas and timeout. + * + * Convenience overload for code that doesn't care about them. + * As timeout the default one is used as defined by the configuration. + */ + future>, replicas_per_token_range> query(schema_ptr s, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) { - auto timeout = clock_type::now() + std::chrono::milliseconds(get_db().local().get_config().read_request_timeout_in_ms()); - return query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout)); + return query(std::move(s), std::move(cmd), std::move(partition_ranges), std::move(cl), + std::move(trace_state), default_query_timeout(), {}); } future>, cache_temperature> query_mutations_locally( diff --git a/test.py b/test.py index c90e65d736..b0027b2887 100755 --- a/test.py +++ b/test.py @@ -100,10 +100,12 @@ boost_tests = [ 'enum_set_test', 'extensions_test', 'cql_auth_syntax_test', + 'querier_cache', ] other_tests = [ 'memory_footprint', + 'querier_cache_resource_based_eviction', ] last_len = 0 diff --git a/tests/querier_cache.cc b/tests/querier_cache.cc new file mode 100644 index 0000000000..0f916ff60c --- /dev/null +++ b/tests/querier_cache.cc @@ -0,0 +1,538 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "querier.hh" +#include "service/priority_manager.hh" +#include "tests/simple_schema.hh" + +#include +#include +#include + +using namespace std::chrono_literals; + +class dummy_result_builder { + std::optional _dk; + std::optional _ck; + +public: + dummy_result_builder() + : _dk({dht::token(), partition_key::make_empty()}) + , _ck(clustering_key_prefix::make_empty()) { + } + + void consume_new_partition(const dht::decorated_key& dk) { + _dk = dk; + _ck = {}; + } + void consume(tombstone t) { + } + stop_iteration consume(static_row&& sr, tombstone t, bool is_live) { + return stop_iteration::no; + } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_live) { + _ck = cr.key(); + return stop_iteration::no; + } + stop_iteration consume(range_tombstone&& rt) { + return stop_iteration::no; + } + stop_iteration consume_end_of_partition() { + return stop_iteration::no; + } + std::pair, std::optional> consume_end_of_stream() { + return {std::move(_dk), std::move(_ck)}; + } +}; + +class test_querier_cache { +public: + using bound = range_bound; + + static const size_t max_reader_buffer_size = 8 * 1024; + +private: + // Actual counters updated by the cache. + unsigned _factory_invoked{}; + + // Expected value of the above counters, updated by this. + unsigned _expected_factory_invoked{}; + querier_cache::stats _expected_stats; + + simple_schema _s; + querier_cache _cache; + const std::vector _mutations; + const mutation_source _mutation_source; + + static sstring make_value(size_t i) { + return sprint("value%010d", i); + } + + static std::vector make_mutations(simple_schema& s, const noncopyable_function& make_value) { + std::vector mutations; + mutations.reserve(10); + + for (uint32_t i = 0; i != 10; ++i) { + auto mut = mutation(s.schema(), s.make_pkey(i)); + + s.add_static_row(mut, "-"); + s.add_row(mut, s.make_ckey(0), make_value(0)); + s.add_row(mut, s.make_ckey(1), make_value(1)); + s.add_row(mut, s.make_ckey(2), make_value(2)); + s.add_row(mut, s.make_ckey(3), make_value(3)); + + mutations.emplace_back(std::move(mut)); + } + + boost::sort(mutations, [] (const mutation& a, const mutation& b) { + return a.decorated_key().tri_compare(*a.schema(), b.decorated_key()) < 0; + }); + + return mutations; + } + + querier make_querier(const dht::partition_range& range) { + return querier(_mutation_source, + _s.schema(), + range, + _s.schema()->full_slice(), + service::get_local_sstable_query_read_priority(), + nullptr, + emit_only_live_rows::no); + } + + static utils::UUID make_cache_key(unsigned key) { + return utils::UUID{key, 1}; + } + + const dht::decorated_key* find_key(const dht::partition_range& range, unsigned partition_offset) const { + const auto& s = *_s.schema(); + const auto less_cmp = dht::ring_position_less_comparator(s); + + const auto begin = _mutations.begin(); + const auto end = _mutations.end(); + const auto start_position = range.start() ? + dht::ring_position_view::for_range_start(range) : + dht::ring_position_view(_mutations.begin()->decorated_key()); + + const auto it = std::lower_bound(begin, end, start_position, [&] (const mutation& m, const dht::ring_position_view& k) { + return less_cmp(m.ring_position(), k); + }); + + if (it == end) { + return nullptr; + } + + const auto dist = std::distance(it, end); + auto& mut = *(partition_offset >= dist ? it + dist : it + partition_offset); + return &mut.decorated_key(); + } + +public: + struct entry_info { + unsigned key; + dht::partition_range original_range; + query::partition_slice original_slice; + uint32_t row_limit; + size_t memory_usage; + + dht::partition_range expected_range; + query::partition_slice expected_slice; + }; + + test_querier_cache(const noncopyable_function& external_make_value, std::chrono::seconds entry_ttl = 24h) + : _cache(entry_ttl) + , _mutations(make_mutations(_s, external_make_value)) + , _mutation_source([this] (schema_ptr, const dht::partition_range& range) { + auto rd = flat_mutation_reader_from_mutations(_mutations, range); + rd.set_max_buffer_size(max_reader_buffer_size); + return std::move(rd); + }) { + } + + explicit test_querier_cache(std::chrono::seconds entry_ttl = 24h) + : test_querier_cache(test_querier_cache::make_value, entry_ttl) { + } + + const simple_schema& get_simple_schema() const { + return _s; + } + + simple_schema& get_simple_schema() { + return _s; + } + + const schema_ptr get_schema() const { + return _s.schema(); + } + + dht::partition_range make_partition_range(bound begin, bound end) const { + return dht::partition_range::make({_mutations.at(begin.value()).decorated_key(), begin.is_inclusive()}, + {_mutations.at(end.value()).decorated_key(), end.is_inclusive()}); + } + + dht::partition_range make_singular_partition_range(std::size_t i) const { + return dht::partition_range::make_singular(_mutations.at(i).decorated_key()); + } + + dht::partition_range make_default_partition_range() const { + return make_partition_range({0, true}, {_mutations.size() - 1, true}); + } + + const query::partition_slice& make_default_slice() const { + return _s.schema()->full_slice(); + } + + entry_info produce_first_page_and_save_querier(unsigned key, const dht::partition_range& range, + const query::partition_slice& slice, uint32_t row_limit = 5) { + const auto cache_key = make_cache_key(key); + + auto querier = make_querier(range); + auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits::max(), + gc_clock::now(), db::no_timeout).get0(); + const auto memory_usage = querier.memory_usage(); + _cache.insert(cache_key, std::move(querier), nullptr); + + // Either no keys at all (nothing read) or at least partition key. + BOOST_REQUIRE((dk && ck) || !ck); + + // Check that the read stopped at the correct position. + // There are 5 rows in each mutation (1 static + 4 clustering). + const auto* expected_key = find_key(range, row_limit / 5); + if (!expected_key) { + BOOST_REQUIRE(!dk); + BOOST_REQUIRE(!ck); + } else { + BOOST_REQUIRE(dk->equal(*_s.schema(), *expected_key)); + } + + auto expected_range = [&] { + if (range.is_singular() || !dk) { + return range; + } + + return dht::partition_range(dht::partition_range::bound(*dk, true), range.end()); + }(); + + auto expected_slice = [&] { + if (!ck) { + return slice; + } + + auto expected_slice = slice; + auto cr = query::clustering_range::make_starting_with({*ck, false}); + expected_slice.set_range(*_s.schema(), dk->key(), {std::move(cr)}); + + return expected_slice; + }(); + + return {key, std::move(range), std::move(slice), row_limit, memory_usage, std::move(expected_range), std::move(expected_slice)}; + } + + entry_info produce_first_page_and_save_querier(unsigned key, const dht::partition_range& range, uint32_t row_limit = 5) { + return produce_first_page_and_save_querier(key, range, make_default_slice(), row_limit); + } + + // Singular overload + entry_info produce_first_page_and_save_querier(unsigned key, std::size_t i, uint32_t row_limit = 5) { + return produce_first_page_and_save_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit); + } + + // Use the whole range + entry_info produce_first_page_and_save_querier(unsigned key) { + return produce_first_page_and_save_querier(key, make_default_partition_range(), _s.schema()->full_slice()); + } + + // For tests testing just one insert-lookup. + entry_info produce_first_page_and_save_querier() { + return produce_first_page_and_save_querier(1); + } + + test_querier_cache& assert_cache_lookup(unsigned lookup_key, + const schema& lookup_schema, + const dht::partition_range& lookup_range, + const query::partition_slice& lookup_slice) { + + _cache.lookup(make_cache_key(lookup_key), emit_only_live_rows::no, lookup_schema, lookup_range, lookup_slice, nullptr, [this, &lookup_range] { + ++_factory_invoked; + return make_querier(lookup_range); + }); + BOOST_REQUIRE_EQUAL(_cache.get_stats().lookups, ++_expected_stats.lookups); + return *this; + } + + test_querier_cache& no_factory_invoked() { + BOOST_REQUIRE_EQUAL(_factory_invoked, _expected_factory_invoked); + return *this; + } + + test_querier_cache& factory_invoked() { + BOOST_REQUIRE_EQUAL(_factory_invoked, ++_expected_factory_invoked); + return *this; + } + + test_querier_cache& no_misses() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses); + return *this; + } + + test_querier_cache& misses() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, ++_expected_stats.misses); + return *this; + } + + test_querier_cache& no_drops() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().drops, _expected_stats.drops); + return *this; + } + + test_querier_cache& drops() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().drops, ++_expected_stats.drops); + return *this; + } + + test_querier_cache& no_evictions() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions); + return *this; + } + + test_querier_cache& time_based_evictions() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, ++_expected_stats.time_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions); + return *this; + } + + test_querier_cache& resource_based_evictions() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, ++_expected_stats.resource_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions); + return *this; + } + + test_querier_cache& memory_based_evictions() { + BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions); + BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, ++_expected_stats.memory_based_evictions); + return *this; + } +}; + +SEASTAR_THREAD_TEST_CASE(lookup_with_wrong_key_misses) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(); + t.assert_cache_lookup(90, *t.get_schema(), entry.expected_range, entry.expected_slice) + .factory_invoked() + .misses() + .no_drops() + .no_evictions(); +} + +/* + * Range matching tests + */ + +SEASTAR_THREAD_TEST_CASE(singular_range_lookup_with_stop_at_clustering_row) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(1, t.make_singular_partition_range(1), 2); + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice) + .no_factory_invoked() + .no_misses() + .no_drops() + .no_evictions(); +} + +SEASTAR_THREAD_TEST_CASE(singular_range_lookup_with_stop_at_static_row) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(1, t.make_singular_partition_range(1), 1); + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice) + .no_factory_invoked() + .no_misses() + .no_drops() + .no_evictions(); +} + +SEASTAR_THREAD_TEST_CASE(lookup_with_stop_at_clustering_row) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(1, t.make_partition_range({1, true}, {3, false}), 3); + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice) + .no_factory_invoked() + .no_misses() + .no_drops() + .no_evictions(); +} + +SEASTAR_THREAD_TEST_CASE(lookup_with_stop_at_static_row) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(1, t.make_partition_range({1, true}, {3, false}), 1); + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice) + .no_factory_invoked() + .no_misses() + .no_drops() + .no_evictions(); +} + +/* + * Drop tests + */ + +SEASTAR_THREAD_TEST_CASE(lookup_with_original_range_drops) { + test_querier_cache t; + + const auto entry = t.produce_first_page_and_save_querier(1); + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.original_range, entry.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); + +} + +SEASTAR_THREAD_TEST_CASE(lookup_with_wrong_slice_drops) { + test_querier_cache t; + + // Swap slices for different clustering keys. + const auto entry1 = t.produce_first_page_and_save_querier(1, t.make_partition_range({1, false}, {3, true}), 3); + const auto entry2 = t.produce_first_page_and_save_querier(2, t.make_partition_range({1, false}, {3, true}), 4); + t.assert_cache_lookup(entry1.key, *t.get_schema(), entry1.expected_range, entry2.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); + t.assert_cache_lookup(entry2.key, *t.get_schema(), entry2.expected_range, entry1.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); + + // Wrong slice. + const auto entry3 = t.produce_first_page_and_save_querier(3); + t.assert_cache_lookup(entry3.key, *t.get_schema(), entry3.expected_range, t.get_schema()->full_slice()) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); + + // Swap slices for stopped at clustering/static row. + const auto entry4 = t.produce_first_page_and_save_querier(4, t.make_partition_range({1, false}, {3, true}), 1); + const auto entry5 = t.produce_first_page_and_save_querier(5, t.make_partition_range({1, false}, {3, true}), 2); + t.assert_cache_lookup(entry4.key, *t.get_schema(), entry4.expected_range, entry5.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); + t.assert_cache_lookup(entry5.key, *t.get_schema(), entry5.expected_range, entry4.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); +} + +SEASTAR_THREAD_TEST_CASE(lookup_with_different_schema_version_drops) { + test_querier_cache t; + + auto new_schema = schema_builder(t.get_schema()).with_column("v1", utf8_type).build(); + + const auto entry = t.produce_first_page_and_save_querier(); + t.assert_cache_lookup(entry.key, *new_schema, entry.expected_range, entry.expected_slice) + .factory_invoked() + .no_misses() + .drops() + .no_evictions(); +} + +/* + * Eviction tests + */ + +SEASTAR_THREAD_TEST_CASE(test_time_based_cache_eviction) { + test_querier_cache t(1s); + + const auto entry1 = t.produce_first_page_and_save_querier(1); + + seastar::sleep(500ms).get(); + + const auto entry2 = t.produce_first_page_and_save_querier(2); + + seastar::sleep(700ms).get(); + + t.assert_cache_lookup(entry1.key, *t.get_schema(), entry1.expected_range, entry1.expected_slice) + .factory_invoked() + .misses() + .no_drops() + .time_based_evictions(); + + seastar::sleep(700ms).get(); + + t.assert_cache_lookup(entry2.key, *t.get_schema(), entry2.expected_range, entry2.expected_slice) + .factory_invoked() + .misses() + .no_drops() + .time_based_evictions(); +} + +sstring make_string_blob(size_t size) { + const char* const letters = "abcdefghijklmnoqprsuvwxyz"; + std::random_device rd; + std::uniform_int_distribution dist(0, 25); + + sstring s; + s.resize(size); + + for (size_t i = 0; i < size; ++i) { + s[i] = letters[dist(rd)]; + } + + return s; +} + +SEASTAR_THREAD_TEST_CASE(test_memory_based_cache_eviction) { + test_querier_cache t([] (size_t) { + const size_t blob_size = 1 << 1; // 1K + return make_string_blob(blob_size); + }, 24h); + + size_t i = 0; + const auto entry = t.produce_first_page_and_save_querier(i); + + const size_t queriers_needed_to_fill_cache = floor(querier_cache::max_queriers_memory_usage / entry.memory_usage); + + // Fill the cache but don't overflow. + for (; i < queriers_needed_to_fill_cache; ++i) { + t.produce_first_page_and_save_querier(i); + } + + // Should overflow the limit and trigger the eviction of the oldest entry. + t.produce_first_page_and_save_querier(queriers_needed_to_fill_cache); + + t.assert_cache_lookup(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice) + .factory_invoked() + .misses() + .no_drops() + .memory_based_evictions(); +} diff --git a/tests/querier_cache_resource_based_eviction.cc b/tests/querier_cache_resource_based_eviction.cc new file mode 100644 index 0000000000..6783cd339e --- /dev/null +++ b/tests/querier_cache_resource_based_eviction.cc @@ -0,0 +1,163 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "querier.hh" +#include "mutation_query.hh" +#include "tests/cql_test_env.hh" +#include "tests/tmpdir.hh" + +#include + +using namespace std::chrono_literals; + +int main(int argc, char** argv) { + app_template app; + tmpdir tmp; + + app.add_options() + ("verbose", "Enables more logging") + ("trace", "Enables trace-level logging") + ; + + return app.run(argc, argv, [&tmp, &app] { + db::config db_cfg; + + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + db_cfg.data_file_directories({tmp.path}, db::config::config_source::CommandLine); + + if (!app.configuration().count("verbose")) { + logging::logger_registry().set_all_loggers_level(seastar::log_level::warn); + } + if (app.configuration().count("trace")) { + logging::logger_registry().set_logger_level("sstable", seastar::log_level::trace); + } + + return do_with_cql_env([] (cql_test_env& env) { + return seastar::async([&env] { + using namespace std::chrono_literals; + + auto& db = env.local_db(); + + db.set_querier_cache_entry_ttl(24h); + + try { + db.find_keyspace("querier_cache"); + env.execute_cql("drop keyspace querier_cache;").get(); + } catch (const no_such_keyspace&) { + // expected + } + + env.execute_cql("CREATE KEYSPACE querier_cache WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); + env.execute_cql("CREATE TABLE querier_cache.test (pk int, ck int, value int, primary key (pk, ck));").get(); + + env.require_table_exists("querier_cache", "test").get(); + + auto insert_id = env.prepare("INSERT INTO querier_cache.test (pk, ck, value) VALUES (?, ?, ?);").get0(); + auto pk = cql3::raw_value::make_value(data_value(0).serialize()); + for (int i = 0; i < 100; ++i) { + auto ck = cql3::raw_value::make_value(data_value(i).serialize()); + env.execute_prepared(insert_id, {{pk, ck, ck}}).get(); + } + + env.require_table_exists("querier_cache", "test").get(); + + auto& cf = db.find_column_family("querier_cache", "test"); + auto s = cf.schema(); + + cf.flush().get(); + + auto cmd1 = query::read_command(s->id(), + s->version(), + s->full_slice(), + 1, + gc_clock::now(), + stdx::nullopt, + 1, + utils::make_random_uuid()); + + // Should save the querier in cache. + db.query_mutations(s, + cmd1, + query::full_partition_range, + db.get_result_memory_limiter().new_mutation_read(1024 * 1024).get0(), + nullptr, + db::no_timeout).get(); + + // Make a fake keyspace just to obtain the configuration and + // thus the concurrency semaphore. + const auto dummy_ks_metadata = keyspace_metadata("dummy_ks", "SimpleStrategy", {{"replication_factor", "1"}}, false); + auto cfg = db.make_keyspace_config(dummy_ks_metadata); + + assert(db.get_querier_cache_stats().resource_based_evictions == 0); + + // Drain all resources of the semaphore + std::vector> permits; + const auto resources = cfg.read_concurrency_semaphore->available_resources(); + permits.reserve(resources.count); + const auto per_permit_memory = resources.memory / resources.count; + + for (int i = 0; i < resources.count; ++i) { + permits.emplace_back(cfg.read_concurrency_semaphore->wait_admission(per_permit_memory).get0()); + } + + assert(cfg.read_concurrency_semaphore->available_resources().count == 0); + assert(cfg.read_concurrency_semaphore->available_resources().memory < per_permit_memory); + + auto cmd2 = query::read_command(s->id(), + s->version(), + s->full_slice(), + 1, + gc_clock::now(), + stdx::nullopt, + 1, + utils::make_random_uuid()); + + // Should evict the already cached querier. + db.query_mutations(s, + cmd2, + query::full_partition_range, + db.get_result_memory_limiter().new_mutation_read(1024 * 1024).get0(), + nullptr, + db::no_timeout).get(); + + assert(db.get_querier_cache_stats().resource_based_evictions == 1); + + // We want to read the entire partition so that the querier + // is not saved at the end and thus ensure it is destroyed. + // We cannot leave scope with the querier still in the cache + // as that sadly leads to use-after-free as the database's + // resource_concurrency_semaphore will be destroyed before some + // of the tracked buffers. + cmd2.row_limit = query::max_rows; + cmd2.partition_limit = query::max_partitions; + db.query_mutations(s, + cmd2, + query::full_partition_range, + db.get_result_memory_limiter().new_mutation_read(1024 * 1024 * 1024 * 1024).get0(), + nullptr, + db::no_timeout).get(); + + }); + }, db_cfg); + }); +} + diff --git a/tests/simple_schema.hh b/tests/simple_schema.hh index 70d6d7688c..5d10617a8c 100644 --- a/tests/simple_schema.hh +++ b/tests/simple_schema.hh @@ -144,6 +144,10 @@ public: return _s; } + const schema_ptr schema() const { + return _s; + } + // Creates a sequence of keys in ring order std::vector make_pkeys(int n) { auto local_keys = make_local_keys(n, _s); diff --git a/thrift/handler.cc b/thrift/handler.cc index 876d5d6f8b..adb4c0155e 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -270,7 +270,7 @@ public: cmd, std::move(pranges), cl_from_thrift(consistency_level), - nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto result) { + nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { if (schema->is_counter()) { counter_column_aggregator aggregator(*schema, cmd->slice, cell_limit, std::move(keys)); @@ -301,7 +301,7 @@ public: cmd, std::move(pranges), cl_from_thrift(consistency_level), - nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto&& result) { + nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto&& result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { column_counter counter(*schema, cmd->slice, cell_limit, std::move(keys)); v.consume(cmd->slice, counter); @@ -341,7 +341,7 @@ public: cmd, std::move(prange), cl_from_thrift(consistency_level), - nullptr).then([schema, cmd](auto result) { + nullptr).then([schema, cmd](auto result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [schema, cmd](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, std::numeric_limits::max()); }); @@ -402,7 +402,8 @@ public: range = {dht::partition_range::make_singular(std::move(range[0].start()->value()))}; } auto range1 = range; // query() below accepts an rvalue, so need a copy to reuse later - return service::get_local_storage_proxy().query(schema, cmd, std::move(range), consistency_level, nullptr).then([schema, cmd, column_limit](auto result) { + return service::get_local_storage_proxy().query(schema, cmd, std::move(range), + consistency_level, nullptr).then([schema, cmd, column_limit](auto result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, column_limit); }); @@ -650,7 +651,7 @@ public: cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), - nullptr).then([schema, cmd, column_limit](auto result) { + nullptr).then([schema, cmd, column_limit](auto result, service::replicas_per_token_range) { return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) { column_aggregator aggregator(*schema, cmd->slice, column_limit, { }); v.consume(cmd->slice, aggregator);