mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-02 13:06:57 +00:00
memtable, cache: Eagerly compact data with tombstones
When memtable receives a tombstone it can happen under some workloads that it covers data which is still in the memtable. Some workloads may insert and delete data within a short time frame. We could reduce the rate of memtable flushes if we eagerly drpo tombstoned data. One workload which benefits is the raft log. It stores a row for each uncommitted raft entry. When entries are committed they are deleted. So the live set is expected to be short under normal conditions. Fixes #652.
This commit is contained in:
@@ -305,14 +305,104 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
|
||||
_static_row_continuous |= p._static_row_continuous;
|
||||
|
||||
rows_entry::tri_compare cmp(s);
|
||||
auto del = current_deleter<rows_entry>();
|
||||
|
||||
// Compacts rows in [i, end) with the tombstone.
|
||||
// Erases entries which are left empty by compaction.
|
||||
// Does not affect continuity.
|
||||
auto apply_tombstone_to_rows = [&] (apply_resume::stage stage, tombstone tomb, rows_type::iterator i, rows_type::iterator end) -> stop_iteration {
|
||||
if (!preemptible) {
|
||||
// Compaction is attempted only in preemptible contexts because it can be expensive to perform and is not
|
||||
// necessary for correctness.
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
while (i != end) {
|
||||
rows_entry& e = *i;
|
||||
can_gc_fn never_gc = [](tombstone) { return false; };
|
||||
|
||||
bool all_dead = e.dummy() || !e.row().compact_and_expire(s,
|
||||
tomb,
|
||||
gc_clock::time_point::min(), // no TTL expiration
|
||||
never_gc, // no GC
|
||||
gc_clock::time_point::min()); // no GC
|
||||
|
||||
auto next_i = std::next(i);
|
||||
bool inside_continuous_range = !tracker ||
|
||||
(e.continuous() && (next_i != _rows.end() && next_i->continuous()));
|
||||
|
||||
if (all_dead && e.row().empty() && inside_continuous_range) {
|
||||
i = _rows.erase(i);
|
||||
if (tracker) {
|
||||
tracker->on_remove();
|
||||
}
|
||||
del(&e);
|
||||
} else {
|
||||
i = next_i;
|
||||
}
|
||||
|
||||
if (need_preempt() && i != end) {
|
||||
res = apply_resume(stage, i->position());
|
||||
return stop_iteration::no;
|
||||
}
|
||||
}
|
||||
return stop_iteration::yes;
|
||||
};
|
||||
|
||||
if (res._stage <= apply_resume::stage::range_tombstone_compaction) {
|
||||
bool filtering_tombstones = res._stage == apply_resume::stage::range_tombstone_compaction;
|
||||
for (const range_tombstone_entry& rt : p._row_tombstones) {
|
||||
position_in_partition_view pos = rt.position();
|
||||
if (filtering_tombstones) {
|
||||
if (cmp(res._pos, rt.end_position()) >= 0) {
|
||||
continue;
|
||||
}
|
||||
filtering_tombstones = false;
|
||||
if (cmp(res._pos, rt.position()) > 0) {
|
||||
pos = res._pos;
|
||||
}
|
||||
}
|
||||
auto i = _rows.lower_bound(pos, cmp);
|
||||
if (i == _rows.end()) {
|
||||
break;
|
||||
}
|
||||
auto end = _rows.lower_bound(rt.end_position(), cmp);
|
||||
|
||||
auto tomb = _tombstone;
|
||||
tomb.apply(rt.tombstone().tomb);
|
||||
|
||||
if (apply_tombstone_to_rows(apply_resume::stage::range_tombstone_compaction, tomb, i, end) == stop_iteration::no) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones), preemptible) == stop_iteration::no) {
|
||||
app_stats.has_any_tombstones |= !_row_tombstones.empty();
|
||||
res = apply_resume::merging_range_tombstones();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
app_stats.has_any_tombstones |= !_row_tombstones.empty();
|
||||
|
||||
rows_entry::tri_compare cmp(s);
|
||||
auto del = current_deleter<rows_entry>();
|
||||
if (p._tombstone) {
|
||||
// p._tombstone is already applied to _tombstone
|
||||
rows_type::iterator i;
|
||||
if (res._stage == apply_resume::stage::partition_tombstone_compaction) {
|
||||
i = _rows.lower_bound(res._pos, cmp);
|
||||
} else {
|
||||
i = _rows.begin();
|
||||
}
|
||||
if (apply_tombstone_to_rows(apply_resume::stage::partition_tombstone_compaction,
|
||||
_tombstone, i, _rows.end()) == stop_iteration::no) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
// TODO: Drop redundant range tombstones
|
||||
p._tombstone = {};
|
||||
}
|
||||
|
||||
res = apply_resume::merging_rows();
|
||||
|
||||
auto p_i = p._rows.begin();
|
||||
auto i = _rows.begin();
|
||||
while (p_i != p._rows.end()) {
|
||||
|
||||
@@ -1046,6 +1046,10 @@ struct mutation_application_stats {
|
||||
struct apply_resume {
|
||||
enum class stage {
|
||||
start,
|
||||
range_tombstone_compaction,
|
||||
merging_range_tombstones,
|
||||
partition_tombstone_compaction,
|
||||
merging_rows,
|
||||
done
|
||||
};
|
||||
|
||||
@@ -1082,6 +1086,14 @@ struct apply_resume {
|
||||
|
||||
operator bool() const { return _stage != stage::done; }
|
||||
|
||||
static apply_resume merging_rows() {
|
||||
return {stage::merging_rows, position_in_partition::for_partition_start()};
|
||||
}
|
||||
|
||||
static apply_resume merging_range_tombstones() {
|
||||
return {stage::merging_range_tombstones, position_in_partition::for_partition_start()};
|
||||
}
|
||||
|
||||
static apply_resume done() {
|
||||
return {stage::done, position_in_partition::for_partition_start()};
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
@@ -571,6 +572,74 @@ SEASTAR_THREAD_TEST_CASE(test_tombstone_compaction_during_flush) {
|
||||
mt->cleaner().drain().get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_range_tombstones_are_compacted_with_data) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto mt = make_lw_shared<replica::memtable>(ss.schema());
|
||||
|
||||
auto pk = ss.make_pkey(0);
|
||||
auto pr = dht::partition_range::make_singular(pk);
|
||||
|
||||
auto old_tombstone = ss.new_tombstone(); // older than any write, does not cover anything
|
||||
|
||||
{
|
||||
mutation m(ss.schema(), pk);
|
||||
ss.add_row(m, ss.make_ckey(1), "v1");
|
||||
ss.add_row(m, ss.make_ckey(2), "v1");
|
||||
ss.add_row(m, ss.make_ckey(3), "v1");
|
||||
ss.add_row(m, ss.make_ckey(4), "v1");
|
||||
mt->apply(m);
|
||||
}
|
||||
|
||||
mutation rt_m(ss.schema(), pk);
|
||||
auto rt = ss.delete_range(rt_m, ss.make_ckey_range(2,3));
|
||||
mt->apply(rt_m);
|
||||
mt->cleaner().drain().get();
|
||||
|
||||
assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr))
|
||||
.produces_partition_start(pk)
|
||||
.produces_row_with_key(ss.make_ckey(1))
|
||||
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
||||
.produces_range_tombstone_change({rt.end_position(), {}})
|
||||
.produces_row_with_key(ss.make_ckey(4))
|
||||
.produces_partition_end()
|
||||
.produces_end_of_stream();
|
||||
|
||||
{
|
||||
mutation m(ss.schema(), pk);
|
||||
m.partition().apply(old_tombstone);
|
||||
mt->apply(m);
|
||||
mt->cleaner().drain().get();
|
||||
}
|
||||
|
||||
// No change
|
||||
assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr))
|
||||
.produces_partition_start(pk, {old_tombstone})
|
||||
.produces_row_with_key(ss.make_ckey(1))
|
||||
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
||||
.produces_range_tombstone_change({rt.end_position(), {}})
|
||||
.produces_row_with_key(ss.make_ckey(4))
|
||||
.produces_partition_end()
|
||||
.produces_end_of_stream();
|
||||
|
||||
auto new_tomb = ss.new_tombstone();
|
||||
|
||||
{
|
||||
mutation m(ss.schema(), pk);
|
||||
m.partition().apply(new_tomb);
|
||||
mt->apply(m);
|
||||
mt->cleaner().drain().get();
|
||||
}
|
||||
|
||||
assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr))
|
||||
.produces_partition_start(pk, {new_tomb})
|
||||
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
||||
.produces_range_tombstone_change({rt.end_position(), {}})
|
||||
.produces_partition_end()
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
|
||||
Reference in New Issue
Block a user