mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
mutation_source: rename make_flat_mutation_reader to make_reader
Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
@@ -640,7 +640,7 @@ column_family::make_reader(schema_ptr s,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
if (_virtual_reader) {
|
||||
return (*_virtual_reader).make_flat_mutation_reader(s, range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
return (*_virtual_reader).make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
std::vector<flat_mutation_reader> readers;
|
||||
@@ -4247,7 +4247,7 @@ future<> column_family::push_view_replica_updates(const schema_ptr& s, const fro
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), this] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = this->as_mutation_source().make_flat_mutation_reader(
|
||||
auto reader = this->as_mutation_source().make_reader(
|
||||
base,
|
||||
pk,
|
||||
slice,
|
||||
|
||||
@@ -630,7 +630,7 @@ public:
|
||||
: impl(s)
|
||||
, _ranges(ranges)
|
||||
, _current_range(_ranges.begin())
|
||||
, _reader(source.make_flat_mutation_reader(s, *_current_range, slice, pc, trace_state, streamed_mutation::forwarding::no,
|
||||
, _reader(source.make_reader(s, *_current_range, slice, pc, trace_state, streamed_mutation::forwarding::no,
|
||||
_ranges.size() > 1 ? mutation_reader::forwarding::yes : fwd_mr))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ protected:
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto ret = _memtable->_underlying->make_flat_mutation_reader(_schema, delegate, slice, pc, nullptr, fwd, fwd_mr);
|
||||
auto ret = _memtable->_underlying->make_reader(_schema, delegate, slice, pc, nullptr, fwd, fwd_mr);
|
||||
_memtable = {};
|
||||
_last = {};
|
||||
return ret;
|
||||
|
||||
@@ -1824,7 +1824,7 @@ future<> data_query(
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, query_result_builder>>(
|
||||
*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
|
||||
|
||||
return do_with(source.make_flat_mutation_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
|
||||
return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no),
|
||||
[cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable {
|
||||
return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout);
|
||||
@@ -1932,7 +1932,7 @@ static do_mutation_query(schema_ptr s,
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>>(
|
||||
*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
|
||||
|
||||
return do_with(source.make_flat_mutation_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
|
||||
return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no),
|
||||
[cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable {
|
||||
return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout);
|
||||
@@ -2118,7 +2118,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
: range(dht::partition_range::make_singular(dk))
|
||||
, reader(source.make_flat_mutation_reader(s, range, slice, service::get_local_sstable_query_read_priority(),
|
||||
, reader(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(),
|
||||
std::move(trace_ptr), streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no))
|
||||
{ }
|
||||
|
||||
@@ -748,7 +748,7 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
|
||||
flat_mutation_reader operator()() {
|
||||
return _ms.make_flat_mutation_reader(std::move(_s), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr);
|
||||
return _ms.make_reader(std::move(_s), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -893,7 +893,7 @@ mutation_source make_combined_mutation_source(std::vector<mutation_source> adden
|
||||
std::vector<flat_mutation_reader> rd;
|
||||
rd.reserve(addends.size());
|
||||
for (auto&& ms : addends) {
|
||||
rd.emplace_back(ms.make_flat_mutation_reader(s, pr, slice, pc, tr, fwd));
|
||||
rd.emplace_back(ms.make_reader(s, pr, slice, pc, tr, fwd));
|
||||
}
|
||||
return make_combined_reader(s, std::move(rd), fwd);
|
||||
});
|
||||
|
||||
@@ -312,7 +312,7 @@ public:
|
||||
// All parameters captured by reference must remain live as long as returned
|
||||
// mutation_reader or streamed_mutation obtained through it are alive.
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader(
|
||||
make_reader(
|
||||
schema_ptr s,
|
||||
partition_range range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -325,12 +325,12 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader(
|
||||
make_reader(
|
||||
schema_ptr s,
|
||||
partition_range range = query::full_partition_range) const
|
||||
{
|
||||
auto& full_slice = s->full_slice();
|
||||
return this->make_flat_mutation_reader(std::move(s), range, full_slice);
|
||||
return this->make_reader(std::move(s), range, full_slice);
|
||||
}
|
||||
|
||||
partition_presence_checker make_partition_presence_checker() {
|
||||
|
||||
@@ -50,7 +50,7 @@ thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduli
|
||||
flat_mutation_reader
|
||||
row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) {
|
||||
ctx.on_underlying_created();
|
||||
return src.make_flat_mutation_reader(_schema, pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
|
||||
return src.make_reader(_schema, pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
|
||||
}
|
||||
|
||||
cache_tracker& global_cache_tracker() {
|
||||
|
||||
@@ -1153,7 +1153,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
|
||||
}
|
||||
}
|
||||
mutation_source ds = create_sstable(s, muts)->as_mutation_source();
|
||||
readers.push_back(ds.make_flat_mutation_reader(s,
|
||||
readers.push_back(ds.make_reader(s,
|
||||
dht::partition_range::make({keys[0]}, {keys[0]}),
|
||||
s->full_slice(), default_priority_class(), nullptr,
|
||||
streamed_mutation::forwarding::yes,
|
||||
@@ -1228,8 +1228,8 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(range).build();
|
||||
readers.push_back(ds1.make_flat_mutation_reader(s, query::full_partition_range, slice));
|
||||
readers.push_back(ds2.make_flat_mutation_reader(s, query::full_partition_range, slice));
|
||||
readers.push_back(ds1.make_reader(s, query::full_partition_range, slice));
|
||||
readers.push_back(ds2.make_reader(s, query::full_partition_range, slice));
|
||||
|
||||
auto rd = make_combined_reader(s, std::move(readers),
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
@@ -1251,9 +1251,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
|
||||
// Check fast_forward_to()
|
||||
{
|
||||
|
||||
readers.push_back(ds1.make_flat_mutation_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
readers.push_back(ds1.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
nullptr, streamed_mutation::forwarding::yes));
|
||||
readers.push_back(ds2.make_flat_mutation_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
readers.push_back(ds2.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
nullptr, streamed_mutation::forwarding::yes));
|
||||
|
||||
auto rd = make_combined_reader(s, std::move(readers),
|
||||
|
||||
@@ -73,10 +73,10 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(populat
|
||||
mutation_source ms = populate(m.schema(), {m});
|
||||
|
||||
flat_mutation_reader sliced_reader =
|
||||
ms.make_flat_mutation_reader(m.schema(), prange, slice_with_ranges);
|
||||
ms.make_reader(m.schema(), prange, slice_with_ranges);
|
||||
|
||||
flat_mutation_reader fwd_reader =
|
||||
ms.make_flat_mutation_reader(m.schema(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes);
|
||||
ms.make_reader(m.schema(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes);
|
||||
|
||||
stdx::optional<mutation_rebuilder> builder{};
|
||||
struct consumer {
|
||||
@@ -162,7 +162,7 @@ static void test_streamed_mutation_forwarding_guarantees(populate_fn populate) {
|
||||
|
||||
auto new_stream = [&ms, s, &m] () -> flat_reader_assertions {
|
||||
BOOST_TEST_MESSAGE("Creating new streamed_mutation");
|
||||
auto res = assert_that(ms.make_flat_mutation_reader(s,
|
||||
auto res = assert_that(ms.make_reader(s,
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -297,7 +297,7 @@ static void test_fast_forwarding_across_partitions_to_empty_range(populate_fn po
|
||||
mutation_source ms = populate(s, partitions);
|
||||
|
||||
auto pr = dht::partition_range::make({keys[0]}, {keys[1]});
|
||||
auto rd = assert_that(ms.make_flat_mutation_reader(s,
|
||||
auto rd = assert_that(ms.make_reader(s,
|
||||
pr,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -399,7 +399,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu
|
||||
))
|
||||
.build();
|
||||
|
||||
auto rd = assert_that(ms.make_flat_mutation_reader(s, pr, slice));
|
||||
auto rd = assert_that(ms.make_reader(s, pr, slice));
|
||||
|
||||
rd.produces_partition_start(m.decorated_key());
|
||||
rd.produces_row_with_key(keys[2]);
|
||||
@@ -418,7 +418,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu
|
||||
))
|
||||
.build();
|
||||
|
||||
auto rd = assert_that(ms.make_flat_mutation_reader(s, pr, slice));
|
||||
auto rd = assert_that(ms.make_reader(s, pr, slice));
|
||||
|
||||
rd.produces_partition_start(m.decorated_key())
|
||||
.produces_range_tombstone(rt3, slice.row_ranges(*s, m.key()))
|
||||
@@ -472,7 +472,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(populate_f
|
||||
));
|
||||
|
||||
mutation_source ms = populate(s, std::vector<mutation>({m}));
|
||||
auto rd = assert_that(ms.make_flat_mutation_reader(s,
|
||||
auto rd = assert_that(ms.make_reader(s,
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -556,7 +556,7 @@ static void test_range_queries(populate_fn populate) {
|
||||
|
||||
auto test_slice = [&] (dht::partition_range r) {
|
||||
BOOST_TEST_MESSAGE(sprint("Testing range %s", r));
|
||||
assert_that(ds.make_flat_mutation_reader(s, r))
|
||||
assert_that(ds.make_reader(s, r))
|
||||
.produces(slice(partitions, r))
|
||||
.produces_end_of_stream();
|
||||
};
|
||||
@@ -741,14 +741,14 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(make_ck(0)))
|
||||
.build();
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, slice))
|
||||
assert_that(ds.make_reader(s, pr, slice))
|
||||
.produces_eos_or_empty_mutation();
|
||||
}
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.build();
|
||||
auto rd = assert_that(ds.make_flat_mutation_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
|
||||
auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
|
||||
rd.produces_partition_start(pk)
|
||||
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
|
||||
.produces_row_with_key(ck1)
|
||||
@@ -759,7 +759,7 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.build();
|
||||
auto rd = assert_that(ds.make_flat_mutation_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
|
||||
auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
|
||||
rd.produces_partition_start(pk)
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
|
||||
@@ -772,7 +772,7 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(make_ck(1)))
|
||||
.build();
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, slice))
|
||||
assert_that(ds.make_reader(s, pr, slice))
|
||||
.produces(row1 + row2 + row3 + row4 + row5 + del_1)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -781,7 +781,7 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(make_ck(2)))
|
||||
.build();
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, slice))
|
||||
assert_that(ds.make_reader(s, pr, slice))
|
||||
.produces(row6 + row7 + del_1 + del_2, slice.row_ranges(*s, pk.key()))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -790,7 +790,7 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(make_ck(1, 2)))
|
||||
.build();
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, slice))
|
||||
assert_that(ds.make_reader(s, pr, slice))
|
||||
.produces(row3 + row4 + del_1)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -799,7 +799,7 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(make_ck(3)))
|
||||
.build();
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, slice))
|
||||
assert_that(ds.make_reader(s, pr, slice))
|
||||
.produces(row8 + del_3)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -807,12 +807,12 @@ static void test_clustering_slices(populate_fn populate) {
|
||||
// Test out-of-range partition keys
|
||||
{
|
||||
auto pr = dht::partition_range::make_singular(keys[0]);
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, s->full_slice()))
|
||||
assert_that(ds.make_reader(s, pr, s->full_slice()))
|
||||
.produces_eos_or_empty_mutation();
|
||||
}
|
||||
{
|
||||
auto pr = dht::partition_range::make_singular(keys[2]);
|
||||
assert_that(ds.make_flat_mutation_reader(s, pr, s->full_slice()))
|
||||
assert_that(ds.make_reader(s, pr, s->full_slice()))
|
||||
.produces_eos_or_empty_mutation();
|
||||
}
|
||||
}
|
||||
@@ -832,7 +832,7 @@ static void test_query_only_static_row(populate_fn populate) {
|
||||
// fully populate cache
|
||||
{
|
||||
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
|
||||
assert_that(ms.make_flat_mutation_reader(s.schema(), prange, s.schema()->full_slice()))
|
||||
assert_that(ms.make_reader(s.schema(), prange, s.schema()->full_slice()))
|
||||
.produces(m1)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -843,7 +843,7 @@ static void test_query_only_static_row(populate_fn populate) {
|
||||
.with_ranges({})
|
||||
.build();
|
||||
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
|
||||
assert_that(ms.make_flat_mutation_reader(s.schema(), prange, slice))
|
||||
assert_that(ms.make_reader(s.schema(), prange, slice))
|
||||
.produces(m1, slice.row_ranges(*s.schema(), m1.key()))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -858,7 +858,7 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(populate_fn populat
|
||||
s.add_row(m, cks[0], "data");
|
||||
|
||||
auto source = populate(s.schema(), {m});
|
||||
assert_that(source.make_flat_mutation_reader(s.schema(),
|
||||
assert_that(source.make_reader(s.schema(),
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -907,7 +907,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(range).build();
|
||||
auto rd = ds.make_flat_mutation_reader(s, query::full_partition_range, slice);
|
||||
auto rd = ds.make_reader(s, query::full_partition_range, slice);
|
||||
|
||||
auto prange = position_range(range);
|
||||
mutation result(m1.decorated_key(), m1.schema());
|
||||
@@ -925,7 +925,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
|
||||
|
||||
// Check fast_forward_to()
|
||||
{
|
||||
auto rd = ds.make_flat_mutation_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
auto rd = ds.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
|
||||
nullptr, streamed_mutation::forwarding::yes);
|
||||
|
||||
auto prange = position_range(range);
|
||||
@@ -981,7 +981,7 @@ void run_conversion_to_mutation_reader_tests(populate_fn populate) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
return source.make_flat_mutation_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return source.make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
};
|
||||
run_mutation_reader_tests(populate_with_flat_mutation_reader_conversion);
|
||||
@@ -1000,7 +1000,7 @@ void test_next_partition(populate_fn populate) {
|
||||
mutations.push_back(std::move(m));
|
||||
}
|
||||
auto source = populate(s.schema(), mutations);
|
||||
assert_that(source.make_flat_mutation_reader(s.schema()))
|
||||
assert_that(source.make_reader(s.schema()))
|
||||
.next_partition() // Does nothing before first partition
|
||||
.produces_partition_start(pkeys[0])
|
||||
.produces_static_row()
|
||||
|
||||
@@ -359,7 +359,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
|
||||
|
||||
auto do_test = [&s, &partitions] (const mutation_source& ds, const dht::partition_range& range,
|
||||
int& secondary_calls_count, int expected_calls) {
|
||||
assert_that(ds.make_flat_mutation_reader(s, range))
|
||||
assert_that(ds.make_reader(s, range))
|
||||
.produces(slice(partitions, range))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(expected_calls, secondary_calls_count);
|
||||
@@ -446,25 +446,25 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
|
||||
auto range = dht::partition_range::make(
|
||||
{partitions[0].decorated_key(), true},
|
||||
{partitions[1].decorated_key(), true});
|
||||
assert_that(ds.make_flat_mutation_reader(s, range))
|
||||
assert_that(ds.make_reader(s, range))
|
||||
.produces(slice(partitions, range))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(3, secondary_calls_count);
|
||||
assert_that(ds.make_flat_mutation_reader(s, range))
|
||||
assert_that(ds.make_reader(s, range))
|
||||
.produces(slice(partitions, range))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(3, secondary_calls_count);
|
||||
auto range2 = dht::partition_range::make(
|
||||
{partitions[0].decorated_key(), true},
|
||||
{partitions[1].decorated_key(), false});
|
||||
assert_that(ds.make_flat_mutation_reader(s, range2))
|
||||
assert_that(ds.make_reader(s, range2))
|
||||
.produces(slice(partitions, range2))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(3, secondary_calls_count);
|
||||
auto range3 = dht::partition_range::make(
|
||||
{dht::ring_position::starting_at(key_before_all.token())},
|
||||
{partitions[2].decorated_key(), false});
|
||||
assert_that(ds.make_flat_mutation_reader(s, range3))
|
||||
assert_that(ds.make_reader(s, range3))
|
||||
.produces(slice(partitions, range3))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(5, secondary_calls_count);
|
||||
@@ -486,7 +486,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
|
||||
|
||||
cache->invalidate([] {}, key_after_all);
|
||||
|
||||
assert_that(ds.make_flat_mutation_reader(s, query::full_partition_range))
|
||||
assert_that(ds.make_reader(s, query::full_partition_range))
|
||||
.produces(slice(partitions, query::full_partition_range))
|
||||
.produces_end_of_stream();
|
||||
BOOST_CHECK_EQUAL(partitions.size() + 2, secondary_calls_count);
|
||||
@@ -1047,7 +1047,7 @@ private:
|
||||
|
||||
flat_mutation_reader make_reader(schema_ptr s, const dht::partition_range& pr,
|
||||
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
|
||||
return make_flat_mutation_reader<reader>(_throttle, _underlying.make_flat_mutation_reader(s, pr, slice, pc, std::move(trace), std::move(fwd)));
|
||||
return make_flat_mutation_reader<reader>(_throttle, _underlying.make_reader(s, pr, slice, pc, std::move(trace), std::move(fwd)));
|
||||
}
|
||||
};
|
||||
lw_shared_ptr<impl> _impl;
|
||||
|
||||
@@ -998,12 +998,12 @@ static future<std::vector<sstables::shared_sstable>> open_sstables(schema_ptr s,
|
||||
|
||||
// mutation_reader for sstable keeping all the required objects alive.
|
||||
static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s) {
|
||||
return sst->as_mutation_source().make_flat_mutation_reader(s, query::full_partition_range, s->full_slice());
|
||||
return sst->as_mutation_source().make_reader(s, query::full_partition_range, s->full_slice());
|
||||
|
||||
}
|
||||
|
||||
static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, const dht::partition_range& pr) {
|
||||
return sst->as_mutation_source().make_flat_mutation_reader(s, pr, s->full_slice());
|
||||
return sst->as_mutation_source().make_reader(s, pr, s->full_slice());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
@@ -2601,7 +2601,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
|
||||
void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query::partition_slice& ps,
|
||||
std::vector<std::pair<partition_key, std::vector<clustering_key>>> expected)
|
||||
{
|
||||
auto reader = sst->as_mutation_source().make_flat_mutation_reader(s, query::full_partition_range, ps);
|
||||
auto reader = sst->as_mutation_source().make_reader(s, query::full_partition_range, ps);
|
||||
|
||||
partition_key::equality pk_eq(*s);
|
||||
clustering_key::equality ck_eq(*s);
|
||||
@@ -3684,7 +3684,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
|
||||
.with_range(query::clustering_range::make_singular(ck2))
|
||||
.with_range(query::clustering_range::make_singular(ck3))
|
||||
.build();
|
||||
flat_mutation_reader rd = ms.make_flat_mutation_reader(table.schema(), query::full_partition_range, slice);
|
||||
flat_mutation_reader rd = ms.make_reader(table.schema(), query::full_partition_range, slice);
|
||||
assert_that(std::move(rd)).has_monotonic_positions();
|
||||
}
|
||||
});
|
||||
@@ -3735,7 +3735,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
|
||||
auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations(partitions), cfg);
|
||||
|
||||
auto ms = as_mutation_source(sst);
|
||||
auto rd = ms.make_flat_mutation_reader(table.schema(),
|
||||
auto rd = ms.make_reader(table.schema(),
|
||||
query::full_partition_range,
|
||||
table.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -4236,7 +4236,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
merged.insert(mutations.begin(), mutations.end());
|
||||
auto rd = assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, query::full_partition_range));
|
||||
auto rd = assert_that(sst->as_mutation_source().make_reader(s, query::full_partition_range));
|
||||
auto keys_read = 0;
|
||||
for (auto&& m : merged) {
|
||||
keys_read++;
|
||||
@@ -4246,7 +4246,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
|
||||
BOOST_REQUIRE(keys_read == keys_written);
|
||||
|
||||
auto r = dht::partition_range::make({mutations.back().decorated_key(), true}, {mutations.back().decorated_key(), true});
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, r))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, r))
|
||||
.produces(slice(mutations, r))
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
@@ -4499,7 +4499,7 @@ SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) {
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_singular({ck})).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
@@ -503,7 +503,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) {
|
||||
SEASTAR_TEST_CASE(broken_ranges_collection) {
|
||||
return reusable_sst(peers_schema(), "tests/sstables/broken_ranges", 2).then([] (auto sstp) {
|
||||
auto s = peers_schema();
|
||||
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_flat_mutation_reader(s, query::full_partition_range));
|
||||
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, query::full_partition_range));
|
||||
return repeat([s, reader] {
|
||||
return read_mutation_from_flat_mutation_reader(*reader).then([s, reader] (mutation_opt mut) {
|
||||
auto key_equal = [s, &mut] (sstring ip) {
|
||||
@@ -901,7 +901,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -957,7 +957,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -1005,7 +1005,7 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck})).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -1047,7 +1047,7 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -1121,7 +1121,7 @@ SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compou
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
@@ -1170,7 +1170,7 @@ SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_posit
|
||||
mt2->make_flat_reader(s)), 1, s, cfg).get();
|
||||
sst->load().get();
|
||||
|
||||
assert_that(sst->as_mutation_source().make_flat_mutation_reader(s))
|
||||
assert_that(sst->as_mutation_source().make_reader(s))
|
||||
.produces(m1 + m2)
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
|
||||
@@ -118,7 +118,7 @@ void run_sstable_resharding_test() {
|
||||
auto shard = shards.front();
|
||||
BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard);
|
||||
|
||||
assert_that(new_sst->as_mutation_source().make_flat_mutation_reader(s))
|
||||
assert_that(new_sst->as_mutation_source().make_reader(s))
|
||||
.produces(muts.at(shard))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_
|
||||
}
|
||||
|
||||
// validate the sstable
|
||||
auto rd = assert_that(sst->as_mutation_source().make_flat_mutation_reader(s));
|
||||
auto rd = assert_that(sst->as_mutation_source().make_reader(s));
|
||||
for (auto&& m : merged) {
|
||||
rd.produces(m);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user