Merge 'root,replica: mv querier to replica/' from Botond Dénes

The querier object is a confusing one. Based on its name it should be in the query/ module and it is already in the query namespace. The query namespace is used for symbols which span the coordinator and replica, or that are mostly coordinator side. The querier is mainly in this namespace due to its similar name and because at the time it was introduced, namespace replica didn't exist yet. But this is a mistake which confuses people.
The querier is actually a completely replica-side logic, implementing the caching of the readers on the replica. Move it to the replica module and namespace to make this more clear.

Code cleanup, no backport.

Closes scylladb/scylladb#26280

* github.com:scylladb/scylladb:
  replica: move querier code to replica namespace
  root,replica: mv querier to replica/
This commit is contained in:
Pavel Emelyanov
2025-10-06 08:26:05 +03:00
14 changed files with 75 additions and 75 deletions

View File

@@ -178,7 +178,6 @@ target_sources(scylla-main
mutation_query.cc
node_ops/task_manager_module.cc
partition_slice_builder.cc
querier.cc
query/query.cc
query_ranges_to_vnodes.cc
query/query-result-set.cc

View File

@@ -803,6 +803,7 @@ scylla_core = (['message/messaging_service.cc',
'replica/dirty_memory_manager.cc',
'replica/multishard_query.cc',
'replica/mutation_dump.cc',
'replica/querier.cc',
'mutation/atomic_cell.cc',
'mutation/canonical_mutation.cc',
'mutation/frozen_mutation.cc',
@@ -1201,7 +1202,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/aws_sigv4.cc',
'types/duration.cc',
'vint-serialization.cc',
'querier.cc',
'mutation_writer/multishard_writer.cc',
'ent/encryption/encryption_config.cc',
'ent/encryption/encryption.cc',

View File

@@ -11,7 +11,8 @@ target_sources(replica
dirty_memory_manager.cc
multishard_query.cc
mutation_dump.cc
schema_describe_helper.cc)
schema_describe_helper.cc
querier.cc)
target_include_directories(replica
PUBLIC
${CMAKE_SOURCE_DIR})

View File

@@ -1692,7 +1692,7 @@ database::query(schema_ptr query_schema, const query::read_command& cmd, query::
auto& semaphore = get_reader_concurrency_semaphore();
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_query_max_result_size();
std::optional<query::querier> querier_opt;
std::optional<querier> querier_opt;
lw_shared_ptr<query::result> result;
std::exception_ptr ex;
@@ -1756,7 +1756,7 @@ database::query_mutations(schema_ptr query_schema, const query::read_command& cm
auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);
std::optional<query::querier> querier_opt;
std::optional<querier> querier_opt;
reconcilable_result result;
std::exception_ptr ex;

View File

@@ -51,7 +51,7 @@
#include "dirty_memory_manager.hh"
#include "reader_concurrency_semaphore_group.hh"
#include "db/timeout_clock.hh"
#include "querier.hh"
#include "replica/querier.hh"
#include "cache_temperature.hh"
#include <unordered_set>
#include "utils/error_injection.hh"
@@ -982,7 +982,7 @@ public:
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
db::timeout_clock::time_point timeout,
std::optional<query::querier>* saved_querier = { });
std::optional<querier>* saved_querier = { });
// Performs a query on given data source returning data in reconcilable form.
//
@@ -1008,7 +1008,7 @@ public:
tracing::trace_state_ptr trace_state,
query::result_memory_accounter accounter,
db::timeout_clock::time_point timeout,
std::optional<query::querier>* saved_querier = { });
std::optional<querier>* saved_querier = { });
void start();
future<> stop();
@@ -1643,7 +1643,7 @@ private:
uint32_t _critical_disk_utilization_mode_count = 0;
bool _shutdown = false;
bool _enable_autocompaction_toggle = false;
query::querier_cache _querier_cache;
querier_cache _querier_cache;
std::unique_ptr<db::large_data_handler> _large_data_handler;
std::unique_ptr<db::large_data_handler> _nop_large_data_handler;
@@ -2034,11 +2034,11 @@ public:
_querier_cache.set_entry_ttl(entry_ttl);
}
const query::querier_cache::stats& get_querier_cache_stats() const {
const querier_cache::stats& get_querier_cache_stats() const {
return _querier_cache.get_stats();
}
query::querier_cache& get_querier_cache() {
querier_cache& get_querier_cache() {
return _querier_cache;
}

View File

@@ -544,7 +544,7 @@ future<> read_context::save_reader(shard_id shard, full_position_view last_pos)
const auto size_after = reader->buffer_size();
auto querier = query::shard_mutation_querier(
auto querier = shard_mutation_querier(
std::move(query_ranges),
std::move(rparts->range),
std::move(rparts->slice),
@@ -731,7 +731,7 @@ future<page_consume_result<ResultBuilder>> read_page(
}
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, result_builder_factory(), cmd.get_row_limit(),
auto f = co_await coroutine::as_future(consume_page(reader, compaction_state, cmd.slice, result_builder_factory(), cmd.get_row_limit(),
cmd.partition_limit, cmd.timestamp));
if (!f.failed()) {
// no exceptions are thrown in this block

View File

@@ -8,14 +8,14 @@
#include <seastar/core/coroutine.hh>
#include "querier.hh"
#include "replica/querier.hh"
#include "dht/i_partitioner.hh"
#include "reader_concurrency_semaphore.hh"
#include "schema/schema.hh"
#include "utils/log.hh"
#include "utils/error_injection.hh"
namespace query {
namespace replica {
logging::logger qlogger("querier_cache");
logging::logger qrlogger("querier");
@@ -494,4 +494,4 @@ future<> querier_cache::stop() noexcept {
}
}
} // namespace query
} // namespace replica

View File

@@ -19,7 +19,7 @@
#include <variant>
namespace query {
namespace replica {
extern logging::logger qrlogger;
@@ -176,7 +176,7 @@ public:
uint32_t partition_limit,
gc_clock::time_point query_time,
tracing::trace_state_ptr trace_ptr = {}) {
return ::query::consume_page(std::get<mutation_reader>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
return ::replica::consume_page(std::get<mutation_reader>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
partition_limit, query_time).then_wrapped([this, trace_ptr = std::move(trace_ptr)] (auto&& fut) {
const auto& cstats = _compaction_state->stats();
tracing::trace(trace_ptr, "Page stats: {} partition(s), {} static row(s) ({} live, {} dead), {} clustering row(s) ({} live, {} dead), {} range tombstone(s) and {} cell(s) ({} live, {} dead)",
@@ -430,4 +430,4 @@ public:
}
};
} // namespace query
} // namespace replica

View File

@@ -3890,7 +3890,7 @@ table::query(schema_ptr query_schema,
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
db::timeout_clock::time_point timeout,
std::optional<query::querier>* saved_querier) {
std::optional<querier>* saved_querier) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
co_return make_lw_shared<query::result>();
}
@@ -3910,7 +3910,7 @@ table::query(schema_ptr query_schema,
query_state qs(query_schema, cmd, opts, partition_ranges, std::move(accounter));
std::optional<query::querier> querier_opt;
std::optional<querier> querier_opt;
if (saved_querier) {
querier_opt = std::move(*saved_querier);
}
@@ -3919,8 +3919,8 @@ table::query(schema_ptr query_schema,
auto&& range = *qs.current_partition_range++;
if (!querier_opt) {
query::querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = query::querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, conf);
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, conf);
}
auto& q = *querier_opt;
@@ -3963,20 +3963,20 @@ table::mutation_query(schema_ptr query_schema,
tracing::trace_state_ptr trace_state,
query::result_memory_accounter accounter,
db::timeout_clock::time_point timeout,
std::optional<query::querier>* saved_querier) {
std::optional<querier>* saved_querier) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
co_return reconcilable_result();
}
const auto table_async_gate_holder = _async_gate.hold();
std::optional<query::querier> querier_opt;
std::optional<querier> querier_opt;
if (saved_querier) {
querier_opt = std::move(*saved_querier);
}
if (!querier_opt) {
query::querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = query::querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, conf);
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, conf);
}
auto& q = *querier_opt;

View File

@@ -1506,7 +1506,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
auto op = std::optional(tbl.read_in_progress());
auto s = tbl.schema();
auto q = query::querier(
auto q = replica::querier(
tbl.as_mutation_source(),
tbl.schema(),
database_test_wrapper(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}),

View File

@@ -165,7 +165,7 @@ static std::pair<schema_ptr, std::vector<dht::decorated_key>> create_test_table(
return {std::move(res.schema), std::move(res.keys)};
}
static uint64_t aggregate_querier_cache_stat(sharded<replica::database>& db, uint64_t query::querier_cache::stats::*stat) {
static uint64_t aggregate_querier_cache_stat(sharded<replica::database>& db, uint64_t replica::querier_cache::stats::*stat) {
return map_reduce(std::views::iota(0u, smp::count), [stat, &db] (unsigned shard) {
return db.invoke_on(shard, [stat] (replica::database& local_db) {
auto& stats = local_db.get_querier_cache_stats();
@@ -191,7 +191,7 @@ static void require_eventually_empty_caches(sharded<replica::database>& db,
testlog.info("{}() called from {}() {}:{:d}", __FUNCTION__, sl.function_name(), sl.file_name(), sl.line());
auto aggregated_population_is_zero = [&] () mutable {
return aggregate_querier_cache_stat(db, &query::querier_cache::stats::population) == 0;
return aggregate_querier_cache_stat(db, &replica::querier_cache::stats::population) == 0;
};
tests::require(eventually_true(aggregated_population_is_zero));
}
@@ -343,7 +343,7 @@ read_partitions_with_generic_paged_scan(sharded<replica::database>& db, schema_p
auto res = ResultBuilder::query(db, s, cmd, *ranges, nullptr, db::no_timeout);
if (is_stateful) {
tests::require(aggregate_querier_cache_stat(db, &query::querier_cache::stats::lookups) >= npages);
tests::require(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::lookups) >= npages);
}
has_more = res_builder.add(*res);
@@ -560,31 +560,31 @@ SEASTAR_THREAD_TEST_CASE(test_read_all) {
uint64_t lookups = 0;
uint64_t misses = 0;
auto saved_readers = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::population);
auto saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
// Then do a paged range-query, with reader caching
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
const auto new_misses = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses);
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
const auto new_misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
if (page) {
tests::require(new_lookups > lookups);
}
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_less_equal(new_misses - misses, smp::count - saved_readers);
lookups = new_lookups;
misses = new_misses;
saved_readers = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::population);
saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
tests::require_greater_equal(saved_readers, 1u);
}).first;
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
require_eventually_empty_caches(env.db());
@@ -644,14 +644,14 @@ SEASTAR_THREAD_TEST_CASE(test_read_all_multi_range) {
auto expected_results = read_all_partitions_one_by_one(env.db(), s, pkeys);
auto results = read_partitions_with_generic_paged_scan<mutation_result_builder>(env.db(), s, page_size, limit, stateful, ranges, slice, [&] (size_t) {
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
}).first;
check_results_are_equal(expected_results, results);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
}}
}
@@ -690,9 +690,9 @@ SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
// First read all partition-by-partition (not paged).
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
auto misses = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses);
auto lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
auto saved_readers = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::population);
auto misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
auto lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
auto saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
// Then do a paged range-query
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, page_size, stateful, [&] (size_t page) {
@@ -700,27 +700,27 @@ SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
return;
}
const auto new_misses = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses);
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
const auto new_misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
if (page) {
tests::require(new_lookups > lookups);
}
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_less_equal(new_misses - misses, smp::count - saved_readers);
lookups = new_lookups;
misses = new_misses;
saved_readers = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::population);
saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
tests::require_greater_equal(saved_readers, 1u);
}).first;
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
} } }
return make_ready_future<>();
@@ -748,7 +748,7 @@ SEASTAR_THREAD_TEST_CASE(test_evict_a_shard_reader_on_each_page) {
// Then do a paged range-query
auto [results2, npages] = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
if (page) {
tests::require(std::cmp_greater(new_lookups, lookups), seastar::compat::source_location::current());
}
@@ -764,15 +764,15 @@ SEASTAR_THREAD_TEST_CASE(test_evict_a_shard_reader_on_each_page) {
}
}
tests::require(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses) >= page, seastar::compat::source_location::current());
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u, seastar::compat::source_location::current());
tests::require(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses) >= page, seastar::compat::source_location::current());
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u, seastar::compat::source_location::current());
});
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), evictions);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), evictions);
require_eventually_empty_caches(env.db());
@@ -825,9 +825,9 @@ SEASTAR_THREAD_TEST_CASE(test_read_reversed) {
BOOST_REQUIRE_EQUAL(data_results, expected_data_results);
tests::require_equal(aggregate_querier_cache_stat(db, &query::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &query::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &query::querier_cache::stats::resource_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::resource_based_evictions), 0u);
} } }

View File

@@ -22,7 +22,7 @@
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/test_utils.hh"
#include "querier.hh"
#include "replica/querier.hh"
#include "mutation_query.hh"
#include <seastar/core/do_with.hh>
#include <seastar/core/thread.hh>
@@ -84,7 +84,7 @@ query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, cons
static reconcilable_result mutation_query(schema_ptr s, reader_permit permit, const mutation_source& source, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time) {
auto querier = query::querier(source, s, std::move(permit), range, slice, {});
auto querier = replica::querier(source, s, std::move(permit), range, slice, {});
auto close_querier = deferred_close(querier);
auto rrb = reconcilable_result_builder(*s, slice, make_accounter());
return querier.consume_page(std::move(rrb), row_limit, partition_limit, query_time).get();
@@ -538,7 +538,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
static void data_query(schema_ptr s, reader_permit permit, const mutation_source& source, const dht::partition_range& range,
const query::partition_slice& slice, query::result::builder& builder) {
auto querier = query::querier(source, s, std::move(permit), range, slice, {});
auto querier = replica::querier(source, s, std::move(permit), range, slice, {});
auto close_querier = deferred_close(querier);
auto qrb = query_result_builder(*s, builder);
querier.consume_page(std::move(qrb), std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();

View File

@@ -8,7 +8,7 @@
#include <algorithm>
#include "querier.hh"
#include "replica/querier.hh"
#include "mutation_query.hh"
#include "reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
@@ -74,11 +74,11 @@ public:
private:
// Expected value of the above counters, updated by this.
query::querier_cache::stats _expected_stats;
replica::querier_cache::stats _expected_stats;
simple_schema _s;
reader_concurrency_semaphore _sem;
query::querier_cache _cache;
replica::querier_cache _cache;
const utils::chunked_vector<mutation> _mutations;
const mutation_source _mutation_source;
@@ -159,7 +159,7 @@ public:
};
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h,
ssize_t max_memory = std::numeric_limits<ssize_t>::max(), query::querier_cache::is_user_semaphore_func is_user_semaphore = {})
ssize_t max_memory = std::numeric_limits<ssize_t>::max(), replica::querier_cache::is_user_semaphore_func is_user_semaphore = {})
: _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits<int>::max(), max_memory)
, _cache(is_user_semaphore ? std::move(is_user_semaphore) : [] (const reader_concurrency_semaphore&) { return true; }, entry_ttl)
, _mutations(make_mutations(_s, external_make_value))
@@ -174,7 +174,7 @@ public:
: test_querier_cache(test_querier_cache::make_value, entry_ttl) {
}
test_querier_cache(query::querier_cache::is_user_semaphore_func is_user_semaphore)
test_querier_cache(replica::querier_cache::is_user_semaphore_func is_user_semaphore)
: test_querier_cache(test_querier_cache::make_value, 24h, std::numeric_limits<ssize_t>::max(), std::move(is_user_semaphore))
{ }
@@ -217,7 +217,7 @@ public:
}
template <typename Querier>
entry_info produce_first_page_and_save_querier(void(query::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
entry_info produce_first_page_and_save_querier(void(replica::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit, db::timeout_clock::time_point timeout = db::no_timeout) {
const auto cache_key = make_cache_key(key);
@@ -267,7 +267,7 @@ public:
entry_info produce_first_page_and_save_data_querier(unsigned key, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit = 5) {
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_data_querier, key, range, slice, row_limit);
return produce_first_page_and_save_querier<replica::querier>(&replica::querier_cache::insert_data_querier, key, range, slice, row_limit);
}
entry_info produce_first_page_and_save_data_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5) {
@@ -291,7 +291,7 @@ public:
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit = 5, db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_mutation_querier, key, range, slice, row_limit, timeout);
return produce_first_page_and_save_querier<replica::querier>(&replica::querier_cache::insert_mutation_querier, key, range, slice, row_limit, timeout);
}
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5,

View File

@@ -24,7 +24,7 @@
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/simple_schema.hh"
#include "querier.hh"
#include "replica/querier.hh"
#include "types/types.hh"
#include "reader_concurrency_semaphore.hh"
@@ -190,7 +190,7 @@ void test_scan_with_range_delete_over_rows() {
auto d = duration_in_seconds([&] {
auto slice = partition_slice_builder(*s).build();
auto q = query::querier(cache_ms, s, semaphore.make_permit(), pr, slice, nullptr);
auto q = replica::querier(cache_ms, s, semaphore.make_permit(), pr, slice, nullptr);
auto close_q = deferred_close(q);
q.consume_page(noop_compacted_fragments_consumer(),
std::numeric_limits<uint32_t>::max(),