Merge "Migrate all clients of make_combined_reader to flat reader" from Piotr

"Remove old overloads that use mutation_reader."

* 'haaawk/combined_reader_clients_migration_v1_after_comments_2' of github.com:scylladb/seastar-dev:
  Remove unused make_combined_reader overload.
  Migrate test_fast_forwarding_combining_reader to flat reader
  flat_mutation_reader_from_mutations: support partition_range
  Don't pass fwd to flat_mutation_reader_from_mutations if it's no
  Remove unused make_combined_reader overload.
  Migrate test_combining_two_partially_overlapping_readers to flat reader
  Migrate test_combining_two_non_overlapping_readers to flat reader
  Migrate combined_mutation_reader_test to flat reader
  Migrate test_sm_fast_forwarding_combining_reader to flat reader
  Migrate test_combining_one_empty_reader to flat reader
  Migrate test_combining_two_empty_readers to flat reader
  Migrate test_combining_two_readers_with_one_reader_empty to flat reader
  Migrate test_combining_one_reader_with_many_partitions to flat reader
  Migrate test_combining_two_readers_with_the_same_row to flat reader
  Migrate make_combined_mutation_source to flat reader
  mutation_source: Add constructors for sources that ignore forwarding
This commit is contained in:
Paweł Dziepak
2017-12-21 16:04:49 +00:00
7 changed files with 120 additions and 90 deletions

View File

@@ -421,7 +421,7 @@ flat_mutation_reader make_empty_flat_reader(schema_ptr s) {
}
flat_mutation_reader
flat_mutation_reader_from_mutations(std::vector<mutation> mutations, streamed_mutation::forwarding fwd) {
flat_mutation_reader_from_mutations(std::vector<mutation> mutations, const dht::partition_range& pr, streamed_mutation::forwarding fwd) {
class reader final : public flat_mutation_reader::impl {
std::vector<mutation> _mutations;
std::vector<mutation>::iterator _cur;
@@ -511,20 +511,59 @@ flat_mutation_reader_from_mutations(std::vector<mutation> mutations, streamed_mu
rt = rts.unlink_leftmost_without_rebalance();
}
}
struct cmp {
bool operator()(const mutation& m, const dht::ring_position& p) const {
return m.decorated_key().tri_compare(*m.schema(), p) < 0;
}
bool operator()(const dht::ring_position& p, const mutation& m) const {
return m.decorated_key().tri_compare(*m.schema(), p) > 0;
}
};
static std::vector<mutation>::iterator find_first_partition(std::vector<mutation>& ms, const dht::partition_range& pr) {
if (!pr.start()) {
return std::begin(ms);
}
if (pr.is_singular()) {
return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
if (pr.start()->is_inclusive()) {
return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
}
}
}
static std::vector<mutation>::iterator find_last_partition(std::vector<mutation>& ms, const dht::partition_range& pr) {
if (!pr.end()) {
return std::end(ms);
}
if (pr.is_singular()) {
return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
if (pr.end()->is_inclusive()) {
return std::upper_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{});
} else {
return std::lower_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{});
}
}
}
public:
reader(schema_ptr s, std::vector<mutation>&& mutations)
reader(schema_ptr s, std::vector<mutation>&& mutations, const dht::partition_range& pr)
: impl(std::move(s))
, _mutations(std::move(mutations))
, _cur(_mutations.begin())
, _end(_mutations.end())
, _cur(find_first_partition(_mutations, pr))
, _end(find_last_partition(_mutations, pr))
, _cmp(*_cur->schema())
{
auto mutation_destroyer = defer([this] { destroy_mutations(); });
start_new_partition();
_end_of_stream = _cur == _end;
if (!_end_of_stream) {
auto mutation_destroyer = defer([this] { destroy_mutations(); });
start_new_partition();
do_fill_buffer();
do_fill_buffer();
mutation_destroyer.cancel();
mutation_destroyer.cancel();
}
}
void destroy_mutations() noexcept {
// After unlink_leftmost_without_rebalance() was called on a bi::set
@@ -557,7 +596,17 @@ flat_mutation_reader_from_mutations(std::vector<mutation> mutations, streamed_mu
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
throw std::runtime_error("This reader can't be fast forwarded to another partition.");
clear_buffer();
_cur = find_first_partition(_mutations, pr);
_end = find_last_partition(_mutations, pr);
_static_row_done = false;
_cr = {};
_rt = {};
_end_of_stream = _cur == _end;
if (!_end_of_stream) {
start_new_partition();
}
return make_ready_future<>();
};
virtual future<> fast_forward_to(position_range cr) override {
throw std::runtime_error("This reader can't be fast forwarded to another position.");
@@ -565,7 +614,7 @@ flat_mutation_reader_from_mutations(std::vector<mutation> mutations, streamed_mu
};
assert(!mutations.empty());
schema_ptr s = mutations[0].schema();
auto res = make_flat_mutation_reader<reader>(std::move(s), std::move(mutations));
auto res = make_flat_mutation_reader<reader>(std::move(s), std::move(mutations), pr);
if (fwd) {
return make_forwardable(std::move(res));
}

View File

@@ -477,7 +477,10 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader, bool);
flat_mutation_reader make_empty_flat_reader(schema_ptr s);
flat_mutation_reader flat_mutation_reader_from_mutations(std::vector<mutation>, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
flat_mutation_reader flat_mutation_reader_from_mutations(std::vector<mutation>, const dht::partition_range& pr = query::full_partition_range, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
inline flat_mutation_reader flat_mutation_reader_from_mutations(std::vector<mutation> ms, streamed_mutation::forwarding fwd) {
return flat_mutation_reader_from_mutations(std::move(ms), query::full_partition_range, fwd);
}
flat_mutation_reader
make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges,

View File

@@ -299,37 +299,6 @@ future<> combined_mutation_reader::fast_forward_to(position_range pr) {
return _producer.fast_forward_to(std::move(pr));
}
mutation_reader
make_combined_reader(schema_ptr schema,
std::vector<mutation_reader> readers,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader> flat_readers;
flat_readers.reserve(readers.size());
for (auto& reader : readers) {
flat_readers.emplace_back(flat_mutation_reader_from_mutation_reader(schema, std::move(reader), fwd_sm));
}
return mutation_reader_from_flat_mutation_reader(make_flat_mutation_reader<combined_mutation_reader>(
schema,
std::make_unique<list_reader_selector>(std::move(flat_readers)),
fwd_sm,
fwd_mr));
}
mutation_reader
make_combined_reader(schema_ptr schema,
mutation_reader&& a,
mutation_reader&& b,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
std::vector<mutation_reader> v;
v.reserve(2);
v.push_back(std::move(a));
v.push_back(std::move(b));
return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr);
}
flat_mutation_reader make_combined_reader(schema_ptr schema,
std::vector<flat_mutation_reader> readers,
streamed_mutation::forwarding fwd_sm,
@@ -690,10 +659,10 @@ mutation_source make_combined_mutation_source(std::vector<mutation_source> adden
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd) {
std::vector<mutation_reader> rd;
std::vector<flat_mutation_reader> rd;
rd.reserve(addends.size());
for (auto&& ms : addends) {
rd.emplace_back(ms(s, pr, slice, pc, tr, fwd));
rd.emplace_back(ms.make_flat_mutation_reader(s, pr, slice, pc, tr, fwd));
}
return make_combined_reader(s, std::move(rd), fwd);
});

View File

@@ -347,15 +347,6 @@ public:
// Creates a mutation reader which combines data return by supplied readers.
// Returns mutation of the same schema only when all readers return mutations
// of the same schema.
mutation_reader make_combined_reader(schema_ptr schema,
std::vector<mutation_reader> readers,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
mutation_reader make_combined_reader(schema_ptr schema,
mutation_reader&& a,
mutation_reader&& b,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
flat_mutation_reader make_combined_reader(schema_ptr schema,
std::vector<flat_mutation_reader>,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
@@ -604,6 +595,25 @@ public:
assert(!fwd);
return fn(s, range);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
return fn(s, range, slice, pc, std::move(tr), fwd);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&, io_priority)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range, slice, pc);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range, slice);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range range)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range);
}) {}
mutation_source(const mutation_source& other) = default;
mutation_source& operator=(const mutation_source& other) = default;

View File

@@ -92,7 +92,7 @@ static void test_conversion_to_flat_mutation_reader_through_mutation_reader(cons
static void test_conversion(const std::vector<mutation>& mutations) {
BOOST_REQUIRE(!mutations.empty());
auto flat_reader = flat_mutation_reader_from_mutations(std::vector<mutation>(mutations), streamed_mutation::forwarding::no);
auto flat_reader = flat_mutation_reader_from_mutations(std::vector<mutation>(mutations));
for (auto& m : mutations) {
mutation_opt m2 = read_mutation_from_flat_mutation_reader(flat_reader).get0();
BOOST_REQUIRE(m2);
@@ -197,7 +197,7 @@ struct mock_consumer {
};
static size_t count_fragments(mutation m) {
auto r = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no);
auto r = flat_mutation_reader_from_mutations({m});
size_t res = 0;
auto mfopt = r().get0();
while (bool(mfopt)) {
@@ -212,13 +212,13 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
for_each_mutation([&] (const mutation& m) {
size_t fragments_in_m = count_fragments(m);
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no);
auto r = flat_mutation_reader_from_mutations({m});
auto result = r.consume(mock_consumer(depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
auto r2 = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no);
auto r2 = flat_mutation_reader_from_mutations({m});
auto start = r2().get0();
BOOST_REQUIRE(start);
BOOST_REQUIRE(start->is_partition_start());
@@ -238,13 +238,13 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
size_t fragments_in_m1 = count_fragments(m1);
size_t fragments_in_m2 = count_fragments(m2);
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no);
auto r = flat_mutation_reader_from_mutations({m1, m2});
auto result = r.consume(mock_consumer(depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
auto r2 = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no);
auto r2 = flat_mutation_reader_from_mutations({m1, m2});
auto start = r2().get0();
BOOST_REQUIRE(start);
BOOST_REQUIRE(start->is_partition_start());
@@ -255,7 +255,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
}
}
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no);
auto r = flat_mutation_reader_from_mutations({m1, m2});
auto result = r.consume(mock_consumer(depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
@@ -268,7 +268,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
++tombstones_count;
}
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
auto r2 = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no);
auto r2 = flat_mutation_reader_from_mutations({m1, m2});
auto start = r2().get0();
BOOST_REQUIRE(start);
BOOST_REQUIRE(start->is_partition_start());
@@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) {
return seastar::async([] {
for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) {
auto get_hash = [] (mutation m) {
return partition_checksum::compute(flat_mutation_reader_from_mutations({ m }, streamed_mutation::forwarding::no),
return partition_checksum::compute(flat_mutation_reader_from_mutations({ m }),
repair_checksum::streamed).get0();
};
auto h1 = get_hash(m1);
@@ -418,7 +418,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) {
auto muts2 = muts;
std::vector<partition_checksum> checksum;
while (!muts2.empty()) {
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(muts2, streamed_mutation::forwarding::no),
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(muts2),
repair_checksum::streamed).get0();
BOOST_REQUIRE(boost::count(checksum, chk) == 0);
checksum.emplace_back(chk);
@@ -426,7 +426,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) {
}
std::vector<partition_checksum> individually_computed_checksums(muts.size());
for (auto k = 0u; k < muts.size(); k++) {
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations({ muts[k] }, streamed_mutation::forwarding::no),
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations({ muts[k] }),
repair_checksum::streamed).get0();
for (auto j = 0u; j < (muts.size() - k); j++) {
individually_computed_checksums[j].add(chk);
@@ -598,12 +598,12 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
};
BOOST_TEST_MESSAGE(sprint("Consume all%s", reversed_msg));
auto fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no);
auto fmr = flat_mutation_reader_from_mutations(muts);
auto muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed));
BOOST_REQUIRE_EQUAL(muts, muts2);
BOOST_TEST_MESSAGE(sprint("Consume first fragment from partition%s", reversed_msg));
fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no);
fmr = flat_mutation_reader_from_mutations(muts);
muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed, skip_after_first_fragment::yes));
BOOST_REQUIRE_EQUAL(muts.size(), muts2.size());
for (auto j = 0u; j < muts.size(); j++) {
@@ -616,7 +616,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
}
BOOST_TEST_MESSAGE(sprint("Consume first partition%s", reversed_msg));
fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no);
fmr = flat_mutation_reader_from_mutations(muts);
muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed, skip_after_first_fragment::no,
skip_after_first_partition::yes));
BOOST_REQUIRE_EQUAL(muts2.size(), 1);
@@ -632,7 +632,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
return true;
};
BOOST_TEST_MESSAGE("Consume all, filtered");
fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no);
fmr = flat_mutation_reader_from_mutations(muts);
muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter));
BOOST_REQUIRE_EQUAL(muts.size() / 2, muts2.size());
for (auto j = 1; j < muts.size(); j += 2) {

View File

@@ -59,7 +59,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_the_same_row) {
mutation m2(partition_key::from_single_value(*s, "key1"), s);
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
assert_that(make_combined_reader(s, make_reader_returning(m1), make_reader_returning(m2)))
assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), flat_mutation_reader_from_mutations({m2})))
.produces(m2)
.produces_end_of_stream();
});
@@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) {
mutation m2(partition_key::from_single_value(*s, "keyA"), s);
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
auto cr = make_combined_reader(s, make_reader_returning(m1), make_reader_returning(m2));
auto cr = make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), flat_mutation_reader_from_mutations({m2}));
assert_that(std::move(cr))
.produces(m2)
.produces(m1)
@@ -86,7 +86,6 @@ SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) {
SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) {
return seastar::async([] {
auto s = make_schema();
auto& slice = s->full_slice();
mutation m1(partition_key::from_single_value(*s, "keyA"), s);
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
@@ -97,7 +96,7 @@ SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) {
mutation m3(partition_key::from_single_value(*s, "keyC"), s);
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
assert_that(make_combined_reader(s, make_reader_returning_many({m1, m2}, slice), make_reader_returning_many({m2, m3}, slice)))
assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1, m2}), flat_mutation_reader_from_mutations({m2, m3})))
.produces(m1)
.produces(m2)
.produces(m3)
@@ -118,8 +117,8 @@ SEASTAR_TEST_CASE(test_combining_one_reader_with_many_partitions) {
mutation m3(partition_key::from_single_value(*s, "keyC"), s);
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
std::vector<mutation_reader> v;
v.push_back(make_reader_returning_many({m1, m2, m3}));
std::vector<flat_mutation_reader> v;
v.push_back(flat_mutation_reader_from_mutations({m1, m2, m3}));
assert_that(make_combined_reader(s, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(m1)
.produces(m2)
@@ -211,7 +210,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) {
mutation m1(partition_key::from_single_value(*s, "key1"), s);
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
assert_that(make_combined_reader(s, make_reader_returning(m1), make_empty_reader()))
assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), make_empty_flat_reader(s)))
.produces(m1)
.produces_end_of_stream();
});
@@ -219,16 +218,18 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) {
SEASTAR_TEST_CASE(test_combining_two_empty_readers) {
return seastar::async([] {
assert_that(make_combined_reader(make_schema(), make_empty_reader(), make_empty_reader()))
auto s = make_schema();
assert_that(make_combined_reader(s, make_empty_flat_reader(s), make_empty_flat_reader(s)))
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_one_empty_reader) {
return seastar::async([] {
std::vector<mutation_reader> v;
v.push_back(make_empty_reader());
assert_that(make_combined_reader(make_schema(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
std::vector<flat_mutation_reader> v;
auto s = make_schema();
v.push_back(make_empty_flat_reader(s));
assert_that(make_combined_reader(s, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces_end_of_stream();
});
}
@@ -279,9 +280,9 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combining_reader) {
};
auto make_reader = [&] (const dht::partition_range& pr) {
std::vector<mutation_reader> readers;
std::vector<flat_mutation_reader> readers;
boost::range::transform(mutations, std::back_inserter(readers), [&pr] (auto& ms) {
return make_reader_returning_many(ms, pr);
return flat_mutation_reader_from_mutations({ms}, pr);
});
return make_combined_reader(s, std::move(readers));
};
@@ -346,9 +347,7 @@ SEASTAR_TEST_CASE(test_sm_fast_forwarding_combining_reader) {
std::vector<flat_mutation_reader> readers;
for (auto& mutations : readers_mutations) {
readers.emplace_back(flat_mutation_reader_from_mutation_reader(s.schema(),
make_reader_returning_many(mutations, s.schema()->full_slice(), streamed_mutation::forwarding::yes),
streamed_mutation::forwarding::yes));
readers.emplace_back(flat_mutation_reader_from_mutations(mutations, streamed_mutation::forwarding::yes));
}
assert_that(make_combined_reader(s.schema(), std::move(readers), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no))
@@ -478,20 +477,20 @@ SEASTAR_TEST_CASE(combined_mutation_reader_test) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {});
auto sstables = make_lw_shared<sstables::sstable_set>(cs.make_sstable_set(s.schema()));
std::vector<mutation_reader> sstable_mutation_readers;
std::vector<flat_mutation_reader> sstable_mutation_readers;
for (auto table : tables) {
sstables->insert(table);
sstable_mutation_readers.emplace_back(
mutation_reader_from_flat_mutation_reader(table->read_range_rows_flat(
table->read_range_rows_flat(
s.schema(),
query::full_partition_range,
s.schema()->full_slice(),
seastar::default_priority_class(),
no_resource_tracking(),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes)));
mutation_reader::forwarding::yes));
}
auto list_reader = make_combined_reader(s.schema(),
@@ -810,7 +809,7 @@ public:
const restricted_mutation_reader_config& config,
schema_ptr schema,
lw_shared_ptr<sstables::sstable> sst) : _reader(make_empty_flat_reader(schema)) {
auto ms = mutation_source([this, &config, sst=std::move(sst)] (schema_ptr schema, const dht::partition_range&, auto&&...) {
auto ms = mutation_source([this, &config, sst=std::move(sst)] (schema_ptr schema, const dht::partition_range&) {
auto tracker_ptr = std::make_unique<tracking_reader>(config.resources_sem, std::move(schema), std::move(sst));
_tracker = tracker_ptr.get();
return flat_mutation_reader(std::move(tracker_ptr));

View File

@@ -3671,7 +3671,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
for (auto&& mf : fragments) {
mut.apply(mf);
}
auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations({ std::move(mut) }, streamed_mutation::forwarding::no), cfg);
auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations({ std::move(mut) }), cfg);
auto ms = as_mutation_source(sst);
for (uint32_t i = 3; i < seq; i++) {
@@ -3732,7 +3732,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
tmpdir dir;
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1; // So that every fragment is indexed
auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations(partitions, streamed_mutation::forwarding::no), cfg);
auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations(partitions), cfg);
auto ms = as_mutation_source(sst);
auto rd = ms.make_flat_mutation_reader(table.schema(),