From 024e01ad9e49613df0c3e02ab63c85121ce6be9e Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:04:43 +0100 Subject: [PATCH 01/16] mutation_source: Add constructors for sources that ignore forwarding Signed-off-by: Piotr Jastrzebski --- mutation_reader.hh | 19 +++++++++++++++++++ tests/mutation_reader_test.cc | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/mutation_reader.hh b/mutation_reader.hh index e82e6bbc1f..29168f1fc9 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -604,6 +604,25 @@ public: assert(!fwd); return fn(s, range); }) {} + mutation_source(std::function fn) + : mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) { + return fn(s, range, slice, pc, std::move(tr), fwd); + }) {} + mutation_source(std::function fn) + : mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) { + assert(!fwd); + return fn(s, range, slice, pc); + }) {} + mutation_source(std::function fn) + : mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) { + assert(!fwd); + return fn(s, range, slice); + }) {} + mutation_source(std::function fn) + : mutation_source([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) { + assert(!fwd); + return fn(s, range); + }) {} mutation_source(const mutation_source& other) = default; mutation_source& operator=(const mutation_source& other) = default; diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 056405d44c..fc9bea719e 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -810,7 +810,7 @@ public: const restricted_mutation_reader_config& config, schema_ptr schema, lw_shared_ptr sst) : _reader(make_empty_flat_reader(schema)) { - auto ms = mutation_source([this, &config, sst=std::move(sst)] (schema_ptr schema, const dht::partition_range&, auto&&...) { + auto ms = mutation_source([this, &config, sst=std::move(sst)] (schema_ptr schema, const dht::partition_range&) { auto tracker_ptr = std::make_unique(config.resources_sem, std::move(schema), std::move(sst)); _tracker = tracker_ptr.get(); return flat_mutation_reader(std::move(tracker_ptr)); From b1c1709127b3523fd3fcadf82fec8a91f9e43c97 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:06:18 +0100 Subject: [PATCH 02/16] Migrate make_combined_mutation_source to flat reader Signed-off-by: Piotr Jastrzebski --- mutation_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index be5ab6f137..e4178fc63b 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -690,10 +690,10 @@ mutation_source make_combined_mutation_source(std::vector adden const io_priority_class& pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd) { - std::vector rd; + std::vector rd; rd.reserve(addends.size()); for (auto&& ms : addends) { - rd.emplace_back(ms(s, pr, slice, pc, tr, fwd)); + rd.emplace_back(ms.make_flat_mutation_reader(s, pr, slice, pc, tr, fwd)); } return make_combined_reader(s, std::move(rd), fwd); }); From 13551e6f5046bdf563564505133184d33f502e8c Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:08:00 +0100 Subject: [PATCH 03/16] Migrate test_combining_two_readers_with_the_same_row to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index fc9bea719e..6d29e97342 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -59,7 +59,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_the_same_row) { mutation m2(partition_key::from_single_value(*s, "key1"), s); m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2); - assert_that(make_combined_reader(s, make_reader_returning(m1), make_reader_returning(m2))) + assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), flat_mutation_reader_from_mutations({m2}))) .produces(m2) .produces_end_of_stream(); }); From 9a5d6bd8af264cf96fac08eb212e0f6c61f62f01 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:08:53 +0100 Subject: [PATCH 04/16] Migrate test_combining_one_reader_with_many_partitions to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 6d29e97342..0c3c1cc5bf 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -118,8 +118,8 @@ SEASTAR_TEST_CASE(test_combining_one_reader_with_many_partitions) { mutation m3(partition_key::from_single_value(*s, "keyC"), s); m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1); - std::vector v; - v.push_back(make_reader_returning_many({m1, m2, m3})); + std::vector v; + v.push_back(flat_mutation_reader_from_mutations({m1, m2, m3})); assert_that(make_combined_reader(s, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) .produces(m1) .produces(m2) From a702d0ec3ff0e66b248228fb9716aa8747ad81ea Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:09:42 +0100 Subject: [PATCH 05/16] Migrate test_combining_two_readers_with_one_reader_empty to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 0c3c1cc5bf..f6186ac551 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -211,7 +211,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) { mutation m1(partition_key::from_single_value(*s, "key1"), s); m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1); - assert_that(make_combined_reader(s, make_reader_returning(m1), make_empty_reader())) + assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), make_empty_flat_reader(s))) .produces(m1) .produces_end_of_stream(); }); From 1f77370d9e88430190e781a62a6b4dc338e4dd0f Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:10:30 +0100 Subject: [PATCH 06/16] Migrate test_combining_two_empty_readers to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index f6186ac551..eca7eb5dea 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -219,7 +219,8 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) { SEASTAR_TEST_CASE(test_combining_two_empty_readers) { return seastar::async([] { - assert_that(make_combined_reader(make_schema(), make_empty_reader(), make_empty_reader())) + auto s = make_schema(); + assert_that(make_combined_reader(s, make_empty_flat_reader(s), make_empty_flat_reader(s))) .produces_end_of_stream(); }); } From 17e6f6b0895867321a3f091aba92a4269cac5288 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:10:50 +0100 Subject: [PATCH 07/16] Migrate test_combining_one_empty_reader to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index eca7eb5dea..6e4f50ca87 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -227,9 +227,10 @@ SEASTAR_TEST_CASE(test_combining_two_empty_readers) { SEASTAR_TEST_CASE(test_combining_one_empty_reader) { return seastar::async([] { - std::vector v; - v.push_back(make_empty_reader()); - assert_that(make_combined_reader(make_schema(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + std::vector v; + auto s = make_schema(); + v.push_back(make_empty_flat_reader(s)); + assert_that(make_combined_reader(s, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) .produces_end_of_stream(); }); } From 19d4bce624f27cc1ff3b1989558a27472c838d34 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:11:25 +0100 Subject: [PATCH 08/16] Migrate test_sm_fast_forwarding_combining_reader to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 6e4f50ca87..0d67c367a3 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -348,9 +348,7 @@ SEASTAR_TEST_CASE(test_sm_fast_forwarding_combining_reader) { std::vector readers; for (auto& mutations : readers_mutations) { - readers.emplace_back(flat_mutation_reader_from_mutation_reader(s.schema(), - make_reader_returning_many(mutations, s.schema()->full_slice(), streamed_mutation::forwarding::yes), - streamed_mutation::forwarding::yes)); + readers.emplace_back(flat_mutation_reader_from_mutations(mutations, streamed_mutation::forwarding::yes)); } assert_that(make_combined_reader(s.schema(), std::move(readers), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) From bef2cf8ed92cad8ed3e8036cbf1c850a549706b2 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:12:09 +0100 Subject: [PATCH 09/16] Migrate combined_mutation_reader_test to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 0d67c367a3..bd5edecd98 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -478,20 +478,20 @@ SEASTAR_TEST_CASE(combined_mutation_reader_test) { auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {}); auto sstables = make_lw_shared(cs.make_sstable_set(s.schema())); - std::vector sstable_mutation_readers; + std::vector sstable_mutation_readers; for (auto table : tables) { sstables->insert(table); sstable_mutation_readers.emplace_back( - mutation_reader_from_flat_mutation_reader(table->read_range_rows_flat( + table->read_range_rows_flat( s.schema(), query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), no_resource_tracking(), streamed_mutation::forwarding::no, - mutation_reader::forwarding::yes))); + mutation_reader::forwarding::yes)); } auto list_reader = make_combined_reader(s.schema(), From 6c6245407616e7b3a2061978692a06feda07fca9 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:33:40 +0100 Subject: [PATCH 10/16] Migrate test_combining_two_non_overlapping_readers to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index bd5edecd98..f4c772778c 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) { mutation m2(partition_key::from_single_value(*s, "keyA"), s); m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2); - auto cr = make_combined_reader(s, make_reader_returning(m1), make_reader_returning(m2)); + auto cr = make_combined_reader(s, flat_mutation_reader_from_mutations({m1}), flat_mutation_reader_from_mutations({m2})); assert_that(std::move(cr)) .produces(m2) .produces(m1) From 202c562f689ff93ba3312f59490f1b18621d9c5a Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:34:05 +0100 Subject: [PATCH 11/16] Migrate test_combining_two_partially_overlapping_readers to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index f4c772778c..ac9a31838f 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -86,7 +86,6 @@ SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) { SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) { return seastar::async([] { auto s = make_schema(); - auto& slice = s->full_slice(); mutation m1(partition_key::from_single_value(*s, "keyA"), s); m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1); @@ -97,7 +96,7 @@ SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) { mutation m3(partition_key::from_single_value(*s, "keyC"), s); m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1); - assert_that(make_combined_reader(s, make_reader_returning_many({m1, m2}, slice), make_reader_returning_many({m2, m3}, slice))) + assert_that(make_combined_reader(s, flat_mutation_reader_from_mutations({m1, m2}), flat_mutation_reader_from_mutations({m2, m3}))) .produces(m1) .produces(m2) .produces(m3) From b3b6db4f503f5fb8b7cfb4e296ff056758d802f1 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 18:49:35 +0100 Subject: [PATCH 12/16] Remove unused make_combined_reader overload. Signed-off-by: Piotr Jastrzebski --- mutation_reader.cc | 13 ------------- mutation_reader.hh | 5 ----- 2 files changed, 18 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index e4178fc63b..4d2132a5c2 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -317,19 +317,6 @@ make_combined_reader(schema_ptr schema, fwd_mr)); } -mutation_reader -make_combined_reader(schema_ptr schema, - mutation_reader&& a, - mutation_reader&& b, - streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) { - std::vector v; - v.reserve(2); - v.push_back(std::move(a)); - v.push_back(std::move(b)); - return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr); -} - flat_mutation_reader make_combined_reader(schema_ptr schema, std::vector readers, streamed_mutation::forwarding fwd_sm, diff --git a/mutation_reader.hh b/mutation_reader.hh index 29168f1fc9..abbcfb1e83 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -351,11 +351,6 @@ mutation_reader make_combined_reader(schema_ptr schema, std::vector readers, streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes); -mutation_reader make_combined_reader(schema_ptr schema, - mutation_reader&& a, - mutation_reader&& b, - streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no, - mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes); flat_mutation_reader make_combined_reader(schema_ptr schema, std::vector, streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no, From 9e3da50ed1fca2c8a674e3a055289094e65f3695 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 21:29:18 +0100 Subject: [PATCH 13/16] Don't pass fwd to flat_mutation_reader_from_mutations if it's no Default value for fwd is no so there's no need to pass it explicitly. This is important because we will add additional parameter to flat_mutation_reader_from_mutations in next patch. Signed-off-by: Piotr Jastrzebski --- tests/flat_mutation_reader_test.cc | 30 +++++++++++++++--------------- tests/sstable_datafile_test.cc | 4 ++-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/flat_mutation_reader_test.cc b/tests/flat_mutation_reader_test.cc index aa5172939e..d5f214d976 100644 --- a/tests/flat_mutation_reader_test.cc +++ b/tests/flat_mutation_reader_test.cc @@ -92,7 +92,7 @@ static void test_conversion_to_flat_mutation_reader_through_mutation_reader(cons static void test_conversion(const std::vector& mutations) { BOOST_REQUIRE(!mutations.empty()); - auto flat_reader = flat_mutation_reader_from_mutations(std::vector(mutations), streamed_mutation::forwarding::no); + auto flat_reader = flat_mutation_reader_from_mutations(std::vector(mutations)); for (auto& m : mutations) { mutation_opt m2 = read_mutation_from_flat_mutation_reader(flat_reader).get0(); BOOST_REQUIRE(m2); @@ -197,7 +197,7 @@ struct mock_consumer { }; static size_t count_fragments(mutation m) { - auto r = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no); + auto r = flat_mutation_reader_from_mutations({m}); size_t res = 0; auto mfopt = r().get0(); while (bool(mfopt)) { @@ -212,13 +212,13 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) { for_each_mutation([&] (const mutation& m) { size_t fragments_in_m = count_fragments(m); for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) { - auto r = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no); + auto r = flat_mutation_reader_from_mutations({m}); auto result = r.consume(mock_consumer(depth)).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); - auto r2 = flat_mutation_reader_from_mutations({m}, streamed_mutation::forwarding::no); + auto r2 = flat_mutation_reader_from_mutations({m}); auto start = r2().get0(); BOOST_REQUIRE(start); BOOST_REQUIRE(start->is_partition_start()); @@ -238,13 +238,13 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { size_t fragments_in_m1 = count_fragments(m1); size_t fragments_in_m2 = count_fragments(m2); for (size_t depth = 1; depth < fragments_in_m1; ++depth) { - auto r = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no); + auto r = flat_mutation_reader_from_mutations({m1, m2}); auto result = r.consume(mock_consumer(depth)).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); - auto r2 = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no); + auto r2 = flat_mutation_reader_from_mutations({m1, m2}); auto start = r2().get0(); BOOST_REQUIRE(start); BOOST_REQUIRE(start->is_partition_start()); @@ -255,7 +255,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { } } for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) { - auto r = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no); + auto r = flat_mutation_reader_from_mutations({m1, m2}); auto result = r.consume(mock_consumer(depth)).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count); @@ -268,7 +268,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { ++tombstones_count; } BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count); - auto r2 = flat_mutation_reader_from_mutations({m1, m2}, streamed_mutation::forwarding::no); + auto r2 = flat_mutation_reader_from_mutations({m1, m2}); auto start = r2().get0(); BOOST_REQUIRE(start); BOOST_REQUIRE(start->is_partition_start()); @@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) { return seastar::async([] { for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) { auto get_hash = [] (mutation m) { - return partition_checksum::compute(flat_mutation_reader_from_mutations({ m }, streamed_mutation::forwarding::no), + return partition_checksum::compute(flat_mutation_reader_from_mutations({ m }), repair_checksum::streamed).get0(); }; auto h1 = get_hash(m1); @@ -418,7 +418,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) { auto muts2 = muts; std::vector checksum; while (!muts2.empty()) { - auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(muts2, streamed_mutation::forwarding::no), + auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(muts2), repair_checksum::streamed).get0(); BOOST_REQUIRE(boost::count(checksum, chk) == 0); checksum.emplace_back(chk); @@ -426,7 +426,7 @@ SEASTAR_TEST_CASE(test_partition_checksum) { } std::vector individually_computed_checksums(muts.size()); for (auto k = 0u; k < muts.size(); k++) { - auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations({ muts[k] }, streamed_mutation::forwarding::no), + auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations({ muts[k] }), repair_checksum::streamed).get0(); for (auto j = 0u; j < (muts.size() - k); j++) { individually_computed_checksums[j].add(chk); @@ -598,12 +598,12 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti }; BOOST_TEST_MESSAGE(sprint("Consume all%s", reversed_msg)); - auto fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no); + auto fmr = flat_mutation_reader_from_mutations(muts); auto muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed)); BOOST_REQUIRE_EQUAL(muts, muts2); BOOST_TEST_MESSAGE(sprint("Consume first fragment from partition%s", reversed_msg)); - fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no); + fmr = flat_mutation_reader_from_mutations(muts); muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed, skip_after_first_fragment::yes)); BOOST_REQUIRE_EQUAL(muts.size(), muts2.size()); for (auto j = 0u; j < muts.size(); j++) { @@ -616,7 +616,7 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti } BOOST_TEST_MESSAGE(sprint("Consume first partition%s", reversed_msg)); - fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no); + fmr = flat_mutation_reader_from_mutations(muts); muts2 = consume_fn(fmr, flat_stream_consumer(s, reversed, skip_after_first_fragment::no, skip_after_first_partition::yes)); BOOST_REQUIRE_EQUAL(muts2.size(), 1); @@ -632,7 +632,7 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti return true; }; BOOST_TEST_MESSAGE("Consume all, filtered"); - fmr = flat_mutation_reader_from_mutations(muts, streamed_mutation::forwarding::no); + fmr = flat_mutation_reader_from_mutations(muts); muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter)); BOOST_REQUIRE_EQUAL(muts.size() / 2, muts2.size()); for (auto j = 1; j < muts.size(); j += 2) { diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 97c1d83eef..edbf8c608b 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -3671,7 +3671,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) { for (auto&& mf : fragments) { mut.apply(mf); } - auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations({ std::move(mut) }, streamed_mutation::forwarding::no), cfg); + auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations({ std::move(mut) }), cfg); auto ms = as_mutation_source(sst); for (uint32_t i = 3; i < seq; i++) { @@ -3732,7 +3732,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) { tmpdir dir; sstable_writer_config cfg; cfg.promoted_index_block_size = 1; // So that every fragment is indexed - auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations(partitions, streamed_mutation::forwarding::no), cfg); + auto sst = make_sstable(dir.path, flat_mutation_reader_from_mutations(partitions), cfg); auto ms = as_mutation_source(sst); auto rd = ms.make_flat_mutation_reader(table.schema(), From 83e55283f71bde537bf3338172a7d44c9d0cbcb9 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 21:30:49 +0100 Subject: [PATCH 14/16] flat_mutation_reader_from_mutations: support partition_range This is needed to make it possible for flat_mutation_reader_from_mutations to replace make_reader_returning_many. Signed-off-by: Piotr Jastrzebski --- flat_mutation_reader.cc | 69 +++++++++++++++++++++++++++++++++++------ flat_mutation_reader.hh | 5 ++- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 10c3cd5801..6c0bd51841 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -421,7 +421,7 @@ flat_mutation_reader make_empty_flat_reader(schema_ptr s) { } flat_mutation_reader -flat_mutation_reader_from_mutations(std::vector mutations, streamed_mutation::forwarding fwd) { +flat_mutation_reader_from_mutations(std::vector mutations, const dht::partition_range& pr, streamed_mutation::forwarding fwd) { class reader final : public flat_mutation_reader::impl { std::vector _mutations; std::vector::iterator _cur; @@ -511,20 +511,59 @@ flat_mutation_reader_from_mutations(std::vector mutations, streamed_mu rt = rts.unlink_leftmost_without_rebalance(); } } + struct cmp { + bool operator()(const mutation& m, const dht::ring_position& p) const { + return m.decorated_key().tri_compare(*m.schema(), p) < 0; + } + bool operator()(const dht::ring_position& p, const mutation& m) const { + return m.decorated_key().tri_compare(*m.schema(), p) > 0; + } + }; + static std::vector::iterator find_first_partition(std::vector& ms, const dht::partition_range& pr) { + if (!pr.start()) { + return std::begin(ms); + } + if (pr.is_singular()) { + return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{}); + } else { + if (pr.start()->is_inclusive()) { + return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{}); + } else { + return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{}); + } + } + } + static std::vector::iterator find_last_partition(std::vector& ms, const dht::partition_range& pr) { + if (!pr.end()) { + return std::end(ms); + } + if (pr.is_singular()) { + return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{}); + } else { + if (pr.end()->is_inclusive()) { + return std::upper_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{}); + } else { + return std::lower_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{}); + } + } + } public: - reader(schema_ptr s, std::vector&& mutations) + reader(schema_ptr s, std::vector&& mutations, const dht::partition_range& pr) : impl(std::move(s)) , _mutations(std::move(mutations)) - , _cur(_mutations.begin()) - , _end(_mutations.end()) + , _cur(find_first_partition(_mutations, pr)) + , _end(find_last_partition(_mutations, pr)) , _cmp(*_cur->schema()) { - auto mutation_destroyer = defer([this] { destroy_mutations(); }); - start_new_partition(); + _end_of_stream = _cur == _end; + if (!_end_of_stream) { + auto mutation_destroyer = defer([this] { destroy_mutations(); }); + start_new_partition(); - do_fill_buffer(); + do_fill_buffer(); - mutation_destroyer.cancel(); + mutation_destroyer.cancel(); + } } void destroy_mutations() noexcept { // After unlink_leftmost_without_rebalance() was called on a bi::set @@ -557,7 +596,17 @@ flat_mutation_reader_from_mutations(std::vector mutations, streamed_mu } } virtual future<> fast_forward_to(const dht::partition_range& pr) override { - throw std::runtime_error("This reader can't be fast forwarded to another partition."); + clear_buffer(); + _cur = find_first_partition(_mutations, pr); + _end = find_last_partition(_mutations, pr); + _static_row_done = false; + _cr = {}; + _rt = {}; + _end_of_stream = _cur == _end; + if (!_end_of_stream) { + start_new_partition(); + } + return make_ready_future<>(); }; virtual future<> fast_forward_to(position_range cr) override { throw std::runtime_error("This reader can't be fast forwarded to another position."); @@ -565,7 +614,7 @@ flat_mutation_reader_from_mutations(std::vector mutations, streamed_mu }; assert(!mutations.empty()); schema_ptr s = mutations[0].schema(); - auto res = make_flat_mutation_reader(std::move(s), std::move(mutations)); + auto res = make_flat_mutation_reader(std::move(s), std::move(mutations), pr); if (fwd) { return make_forwardable(std::move(res)); } diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 6fe8ce9236..b2099da7f9 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -477,7 +477,10 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader, bool); flat_mutation_reader make_empty_flat_reader(schema_ptr s); -flat_mutation_reader flat_mutation_reader_from_mutations(std::vector, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no); +flat_mutation_reader flat_mutation_reader_from_mutations(std::vector, const dht::partition_range& pr = query::full_partition_range, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no); +inline flat_mutation_reader flat_mutation_reader_from_mutations(std::vector ms, streamed_mutation::forwarding fwd) { + return flat_mutation_reader_from_mutations(std::move(ms), query::full_partition_range, fwd); +} flat_mutation_reader make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges, From 759baa3a11f487e25f9acbf48b9f2b5fa28afb75 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 23:48:19 +0100 Subject: [PATCH 15/16] Migrate test_fast_forwarding_combining_reader to flat reader Signed-off-by: Piotr Jastrzebski --- tests/mutation_reader_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index ac9a31838f..ca007ea435 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -280,9 +280,9 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combining_reader) { }; auto make_reader = [&] (const dht::partition_range& pr) { - std::vector readers; + std::vector readers; boost::range::transform(mutations, std::back_inserter(readers), [&pr] (auto& ms) { - return make_reader_returning_many(ms, pr); + return flat_mutation_reader_from_mutations({ms}, pr); }); return make_combined_reader(s, std::move(readers)); }; From 04ce7dfb84e1051894f5ff3097a009a5a942fb11 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 20 Dec 2017 23:49:36 +0100 Subject: [PATCH 16/16] Remove unused make_combined_reader overload. Signed-off-by: Piotr Jastrzebski --- mutation_reader.cc | 18 ------------------ mutation_reader.hh | 4 ---- 2 files changed, 22 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 4d2132a5c2..e6bf060a66 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -299,24 +299,6 @@ future<> combined_mutation_reader::fast_forward_to(position_range pr) { return _producer.fast_forward_to(std::move(pr)); } -mutation_reader -make_combined_reader(schema_ptr schema, - std::vector readers, - streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) { - std::vector flat_readers; - flat_readers.reserve(readers.size()); - for (auto& reader : readers) { - flat_readers.emplace_back(flat_mutation_reader_from_mutation_reader(schema, std::move(reader), fwd_sm)); - } - - return mutation_reader_from_flat_mutation_reader(make_flat_mutation_reader( - schema, - std::make_unique(std::move(flat_readers)), - fwd_sm, - fwd_mr)); -} - flat_mutation_reader make_combined_reader(schema_ptr schema, std::vector readers, streamed_mutation::forwarding fwd_sm, diff --git a/mutation_reader.hh b/mutation_reader.hh index abbcfb1e83..e27b5582c5 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -347,10 +347,6 @@ public: // Creates a mutation reader which combines data return by supplied readers. // Returns mutation of the same schema only when all readers return mutations // of the same schema. -mutation_reader make_combined_reader(schema_ptr schema, - std::vector readers, - streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no, - mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes); flat_mutation_reader make_combined_reader(schema_ptr schema, std::vector, streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,