diff --git a/mutation/mutation_compactor.hh b/mutation/mutation_compactor.hh index 841d55656c..6040eca627 100644 --- a/mutation/mutation_compactor.hh +++ b/mutation/mutation_compactor.hh @@ -468,8 +468,16 @@ public: _partition_limit -= _rows_in_current_partition > 0; auto stop = consumer.consume_end_of_partition(); if (!sstable_compaction()) { - return _row_limit && _partition_limit && stop != stop_iteration::yes + stop = _row_limit && _partition_limit && stop != stop_iteration::yes ? stop_iteration::no : stop_iteration::yes; + // If we decided to stop earlier but decide to continue now, we + // are in effect skipping the partition. Do not leave `_stop` at + // `stop_iteration::yes` in this case, reset it back to + // `stop_iteration::no` as if we exhausted the partition. + if (_stop && !stop) { + _stop = stop_iteration::no; + } + return stop; } } return stop_iteration::no; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index ef8819f782..d6e4a8070c 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -3383,3 +3383,100 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } } + +SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) { + simple_schema ss; + auto pk = ss.make_pkey(); + auto s = ss.schema(); + + tests::reader_concurrency_semaphore_wrapper semaphore; + + auto permit = semaphore.make_permit(); + + const auto expiry_point = gc_clock::now() + std::chrono::days(10); + + const auto marker_ts = ss.new_timestamp(); + const auto tomb_ts = ss.new_timestamp(); + const auto row_ts = ss.new_timestamp(); + + const auto query_time = gc_clock::now(); + const auto max_rows = std::numeric_limits::max(); + const auto max_partitions = std::numeric_limits::max(); + + auto make_frags = [&] { + std::deque frags; + + frags.emplace_back(*s, permit, partition_start(pk, {})); + + frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row")); + + const auto& v_def = *s->get_column_definition(to_bytes("v")); + + frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(0)), tombstone{tomb_ts, expiry_point})); + + for (uint32_t ck = 0; ck < 1; ++ck) { + auto row = clustering_row(ss.make_ckey(ck)); + row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v"))); + row.marker() = row_marker(marker_ts); + frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row))); + } + + frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{})); + + frags.emplace_back(*s, permit, partition_end{}); + + return frags; + }; + + struct consumer_v2 { + uint64_t frags = 0; + const uint64_t frag_limit; + const bool final_stop; + + consumer_v2(uint64_t stop_at, bool final_stop) : frag_limit(stop_at + 1), final_stop(final_stop) { } + void consume_new_partition(const dht::decorated_key& dk) { } + void consume(const tombstone& t) { } + stop_iteration consume(static_row&& sr, tombstone, bool) { + const auto ret = ++frags >= frag_limit; + testlog.trace("consume(static_row) ret={}", ret); + return stop_iteration(ret); + } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { + const auto ret = ++frags >= frag_limit; + testlog.trace("consume(clustering_row) ret={}", ret); + return stop_iteration(ret); + } + stop_iteration consume(range_tombstone_change&& rtc) { + const auto ret = ++frags >= frag_limit; + testlog.trace("consume(range_tombstone) ret={}", ret); + return stop_iteration(ret); + } + stop_iteration consume_end_of_partition() { + testlog.trace("consume_end_of_partition()"); + return stop_iteration(final_stop); + } + void consume_end_of_stream() { } + }; + + // deduct 2 for partition start and end respectively + const auto inter_partition_frag_count = make_frags().size() - 2; + + auto check = [&] (uint64_t stop_at, bool final_stop) { + testlog.debug("stop_at={}, final_stop={}", stop_at, final_stop); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + reader.consume(compact_for_query_v2(compaction_state, consumer_v2(stop_at, final_stop))).get(); + const auto has_detached_state = bool(std::move(*compaction_state).detach_state()); + if (stop_at < inter_partition_frag_count) { + BOOST_CHECK_EQUAL(has_detached_state, final_stop); + } else { + BOOST_CHECK(!has_detached_state); + } + }; + + for (unsigned stop_at = 0; stop_at < inter_partition_frag_count; ++stop_at) { + check(stop_at, true); + check(stop_at, false); + } +};