mutation/mutation_compactor: consume_partition_end(): reset _stop

The purpose of `_stop` is to remember whether the consumption of the
last partition was interrupted or it was consumed fully. In the former
case, the compactor allows retreiving the compaction state for the given
partition, so that its compaction can be resumed at a later point in
time.
Currently, `_stop` is set to `stop_iteration::yes` whenever the return
value of any of the `consume()` methods is also `stop_iteration::yes`.
Meaning, if the consuming of the partition is interrupted, this is
remembered in `_stop`.
However, a partition whose consumption was interrupted is not always
continued later. Sometimes consumption of a partitions is interrputed
because the partition is not interesting and the downstream consumer
wants to stop it. In these cases the compactor should not return an
engagned optional from `detach_state()`, because there is not state to
detach, the state should be thrown away. This was incorrectly handled so
far and is fixed in this patch, but overwriting `_stop` in
`consume_partition_end()` with whatever the downstream consumer returns.
Meaning if they want to skip the partition, then `_stop` is reset to
`stop_partition::no` and `detach_state()` will return a disengaged
optional as it should in this case.

Fixes: #12629

Closes #13365
This commit is contained in:
Botond Dénes
2023-03-28 07:17:30 -04:00
committed by Nadav Har'El
parent 497dd7380f
commit bae62f899d
2 changed files with 106 additions and 1 deletions

View File

@@ -468,8 +468,16 @@ public:
_partition_limit -= _rows_in_current_partition > 0;
auto stop = consumer.consume_end_of_partition();
if (!sstable_compaction()) {
return _row_limit && _partition_limit && stop != stop_iteration::yes
stop = _row_limit && _partition_limit && stop != stop_iteration::yes
? stop_iteration::no : stop_iteration::yes;
// If we decided to stop earlier but decide to continue now, we
// are in effect skipping the partition. Do not leave `_stop` at
// `stop_iteration::yes` in this case, reset it back to
// `stop_iteration::no` as if we exhausted the partition.
if (_stop && !stop) {
_stop = stop_iteration::no;
}
return stop;
}
}
return stop_iteration::no;

View File

@@ -3383,3 +3383,100 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
}
SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) {
simple_schema ss;
auto pk = ss.make_pkey();
auto s = ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto expiry_point = gc_clock::now() + std::chrono::days(10);
const auto marker_ts = ss.new_timestamp();
const auto tomb_ts = ss.new_timestamp();
const auto row_ts = ss.new_timestamp();
const auto query_time = gc_clock::now();
const auto max_rows = std::numeric_limits<uint64_t>::max();
const auto max_partitions = std::numeric_limits<uint32_t>::max();
auto make_frags = [&] {
std::deque<mutation_fragment_v2> frags;
frags.emplace_back(*s, permit, partition_start(pk, {}));
frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row"));
const auto& v_def = *s->get_column_definition(to_bytes("v"));
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(0)), tombstone{tomb_ts, expiry_point}));
for (uint32_t ck = 0; ck < 1; ++ck) {
auto row = clustering_row(ss.make_ckey(ck));
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v")));
row.marker() = row_marker(marker_ts);
frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row)));
}
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{}));
frags.emplace_back(*s, permit, partition_end{});
return frags;
};
struct consumer_v2 {
uint64_t frags = 0;
const uint64_t frag_limit;
const bool final_stop;
consumer_v2(uint64_t stop_at, bool final_stop) : frag_limit(stop_at + 1), final_stop(final_stop) { }
void consume_new_partition(const dht::decorated_key& dk) { }
void consume(const tombstone& t) { }
stop_iteration consume(static_row&& sr, tombstone, bool) {
const auto ret = ++frags >= frag_limit;
testlog.trace("consume(static_row) ret={}", ret);
return stop_iteration(ret);
}
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) {
const auto ret = ++frags >= frag_limit;
testlog.trace("consume(clustering_row) ret={}", ret);
return stop_iteration(ret);
}
stop_iteration consume(range_tombstone_change&& rtc) {
const auto ret = ++frags >= frag_limit;
testlog.trace("consume(range_tombstone) ret={}", ret);
return stop_iteration(ret);
}
stop_iteration consume_end_of_partition() {
testlog.trace("consume_end_of_partition()");
return stop_iteration(final_stop);
}
void consume_end_of_stream() { }
};
// deduct 2 for partition start and end respectively
const auto inter_partition_frag_count = make_frags().size() - 2;
auto check = [&] (uint64_t stop_at, bool final_stop) {
testlog.debug("stop_at={}, final_stop={}", stop_at, final_stop);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, consumer_v2(stop_at, final_stop))).get();
const auto has_detached_state = bool(std::move(*compaction_state).detach_state());
if (stop_at < inter_partition_frag_count) {
BOOST_CHECK_EQUAL(has_detached_state, final_stop);
} else {
BOOST_CHECK(!has_detached_state);
}
};
for (unsigned stop_at = 0; stop_at < inter_partition_frag_count; ++stop_at) {
check(stop_at, true);
check(stop_at, false);
}
};