cache: Defer during partition merging
This commit is contained in:
@@ -25,6 +25,7 @@
|
||||
#include "partition_version.hh"
|
||||
#include "row_cache.hh"
|
||||
#include "partition_snapshot_row_cursor.hh"
|
||||
#include "utils/coroutine.hh"
|
||||
|
||||
static void remove_or_mark_as_unique_owner(partition_version* current, mutation_cleaner* cleaner)
|
||||
{
|
||||
@@ -417,60 +418,110 @@ public:
|
||||
};
|
||||
|
||||
coroutine partition_entry::apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema,
|
||||
logalloc::region& reg, cache_tracker& tracker, partition_snapshot::phase_type phase)
|
||||
logalloc::allocating_section& alloc, logalloc::region& reg, cache_tracker& tracker, partition_snapshot::phase_type phase)
|
||||
{
|
||||
// This flag controls whether this operation may defer. It is more
|
||||
// expensive to apply with deferring due to construction of snapshots and
|
||||
// two-pass application, with the first pass filtering and moving data to
|
||||
// the new version and the second pass merging it back once all is done.
|
||||
// We cannot merge into current version because if we defer in the middle
|
||||
// that may publish partial writes. Also, snapshot construction results in
|
||||
// creation of garbage objects, partition_version and rows_entry. Garbage
|
||||
// will yield sparse segments and add overhead due to increased LSA
|
||||
// segment compaction. This becomes especially significant for small
|
||||
// partitions where I saw 40% slow down.
|
||||
const bool preemptible = s.clustering_key_size() > 0;
|
||||
|
||||
if (s.version() != pe_schema.version()) {
|
||||
pe.upgrade(pe_schema.shared_from_this(), s.shared_from_this(), tracker.cleaner(), &tracker);
|
||||
}
|
||||
|
||||
bool can_move = !pe._snapshot;
|
||||
partition_version* current = &*pe._version;
|
||||
partition_version& dst = open_version(s, &tracker, phase);
|
||||
auto snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
|
||||
bool static_row_continuous = snp->static_row_continuous();
|
||||
while (current) {
|
||||
dst.partition().apply(current->partition().partition_tombstone());
|
||||
if (static_row_continuous) {
|
||||
row& static_row = dst.partition().static_row();
|
||||
if (can_move) {
|
||||
static_row.apply(s, column_kind::static_column, std::move(current->partition().static_row()));
|
||||
} else {
|
||||
static_row.apply(s, column_kind::static_column, current->partition().static_row());
|
||||
}
|
||||
}
|
||||
range_tombstone_list& tombstones = dst.partition().row_tombstones();
|
||||
if (can_move) {
|
||||
tombstones.apply_monotonically(s, std::move(current->partition().row_tombstones()));
|
||||
} else {
|
||||
tombstones.apply_monotonically(s, current->partition().row_tombstones());
|
||||
}
|
||||
current = current->next();
|
||||
can_move &= current && !current->is_referenced();
|
||||
|
||||
auto src_snp = pe.read(reg, tracker.cleaner(), s.shared_from_this(), &tracker);
|
||||
lw_shared_ptr<partition_snapshot> prev_snp;
|
||||
if (preemptible) {
|
||||
// Reads must see prev_snp until whole update completes so that writes
|
||||
// are not partially visible.
|
||||
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
|
||||
}
|
||||
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
|
||||
auto merge_dst_snp = defer([preemptible, dst_snp, ®, &alloc] () mutable {
|
||||
maybe_merge_versions(dst_snp, reg, alloc);
|
||||
});
|
||||
|
||||
partition_entry::rows_iterator source(&*pe.version(), s);
|
||||
partition_snapshot_row_cursor cur(s, *snp);
|
||||
|
||||
while (!source.done()) {
|
||||
if (!source.is_dummy()) {
|
||||
tracker.on_row_processed_from_memtable();
|
||||
auto ropt = cur.ensure_entry_if_complete(source.position());
|
||||
if (ropt) {
|
||||
rows_entry& e = ropt->row;
|
||||
source.consume_row([&] (deletable_row&& row) {
|
||||
e.row().apply_monotonically(s, std::move(row));
|
||||
});
|
||||
if (!ropt->inserted) {
|
||||
tracker.on_row_merged_from_memtable();
|
||||
// Once we start updating the partition, we must keep all snapshots until the update completes,
|
||||
// otherwise partial writes would be published. So the scope of snapshots must enclose the scope
|
||||
// of allocating sections, so we return here to get out of the current allocating section and
|
||||
// give the caller a chance to store the coroutine object. The code inside coroutine below
|
||||
// runs outside allocating section.
|
||||
return coroutine([&tracker, &s, &alloc, ®, &acc, can_move, preemptible,
|
||||
merge_dst_snp = std::move(merge_dst_snp), // needs to go away last so that dst_snp is not owned by anyone else
|
||||
cur = partition_snapshot_row_cursor(s, *dst_snp),
|
||||
src_cur = partition_snapshot_row_cursor(s, *src_snp),
|
||||
dst_snp = std::move(dst_snp),
|
||||
prev_snp = std::move(prev_snp),
|
||||
src_snp = std::move(src_snp),
|
||||
static_done = false] () mutable {
|
||||
auto&& allocator = reg.allocator();
|
||||
return alloc(reg, [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
if (!static_done) {
|
||||
partition_version& dst = *dst_snp->version();
|
||||
bool static_row_continuous = dst_snp->static_row_continuous();
|
||||
auto current = &*src_snp->version();
|
||||
while (current) {
|
||||
dst.partition().apply(current->partition().partition_tombstone());
|
||||
if (static_row_continuous) {
|
||||
row& static_row = dst.partition().static_row();
|
||||
if (can_move) {
|
||||
static_row.apply(s, column_kind::static_column,
|
||||
std::move(current->partition().static_row()));
|
||||
} else {
|
||||
static_row.apply(s, column_kind::static_column, current->partition().static_row());
|
||||
}
|
||||
}
|
||||
range_tombstone_list& tombstones = dst.partition().row_tombstones();
|
||||
// FIXME: defer while applying range tombstones
|
||||
if (can_move) {
|
||||
tombstones.apply_monotonically(s, std::move(current->partition().row_tombstones()));
|
||||
} else {
|
||||
tombstones.apply_monotonically(s, current->partition().row_tombstones());
|
||||
}
|
||||
current = current->next();
|
||||
can_move &= current && !current->is_referenced();
|
||||
}
|
||||
static_done = true;
|
||||
}
|
||||
} else {
|
||||
tracker.on_row_dropped_from_memtable();
|
||||
}
|
||||
}
|
||||
source.remove_current_row_when_possible();
|
||||
source.move_to_next_row();
|
||||
}
|
||||
return make_empty_coroutine();
|
||||
|
||||
if (!src_cur.maybe_refresh_static()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
do {
|
||||
if (!src_cur.dummy()) {
|
||||
tracker.on_row_processed_from_memtable();
|
||||
auto ropt = cur.ensure_entry_if_complete(src_cur.position());
|
||||
if (ropt) {
|
||||
if (!ropt->inserted) {
|
||||
tracker.on_row_merged_from_memtable();
|
||||
}
|
||||
rows_entry& e = ropt->row;
|
||||
src_cur.consume_row([&](deletable_row&& row) {
|
||||
e.row().apply_monotonically(s, std::move(row));
|
||||
});
|
||||
} else {
|
||||
tracker.on_row_dropped_from_memtable();
|
||||
}
|
||||
}
|
||||
if (!src_cur.next()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
} while (!preemptible || !need_preempt());
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
|
||||
@@ -506,16 +557,27 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&
|
||||
lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
|
||||
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
|
||||
{
|
||||
with_allocator(r.allocator(), [&] {
|
||||
open_version(*entry_schema, tracker, phase);
|
||||
});
|
||||
if (_snapshot) {
|
||||
return _snapshot->shared_from_this();
|
||||
} else {
|
||||
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
|
||||
_snapshot = snp.get();
|
||||
return snp;
|
||||
if (_snapshot->_phase == phase) {
|
||||
return _snapshot->shared_from_this();
|
||||
} else if (phase < _snapshot->_phase) {
|
||||
// If entry is being updated, we will get reads for non-latest phase, and
|
||||
// they must attach to the non-current version.
|
||||
partition_version* second = _version->next();
|
||||
assert(second && second->is_referenced());
|
||||
auto snp = partition_snapshot::container_of(second->_backref).shared_from_this();
|
||||
assert(phase == snp->_phase);
|
||||
return snp;
|
||||
} else { // phase > _snapshot->_phase
|
||||
with_allocator(r.allocator(), [&] {
|
||||
add_version(*entry_schema, tracker);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
|
||||
_snapshot = snp.get();
|
||||
return snp;
|
||||
}
|
||||
|
||||
std::vector<range_tombstone>
|
||||
|
||||
@@ -147,6 +147,7 @@ class partition_version : public anchorless_list_base_hook<partition_version> {
|
||||
mutation_partition _partition;
|
||||
|
||||
friend class partition_version_ref;
|
||||
friend class partition_entry;
|
||||
public:
|
||||
static partition_version& container_of(mutation_partition& mp) {
|
||||
return *boost::intrusive::get_parent_from_member(&mp, &partition_version::_partition);
|
||||
@@ -260,7 +261,8 @@ class partition_snapshot : public enable_lw_shared_from_this<partition_snapshot>
|
||||
public:
|
||||
// Only snapshots created with the same value of phase can point to the same version.
|
||||
using phase_type = uint64_t;
|
||||
static constexpr phase_type default_phase = 0;
|
||||
static constexpr phase_type default_phase = 0; // For use with non-evictable snapshots
|
||||
static constexpr phase_type min_phase = 1; // Use 1 to prevent underflow on apply_to_incomplete()
|
||||
static constexpr phase_type max_phase = std::numeric_limits<phase_type>::max();
|
||||
public:
|
||||
// Used for determining reference stability.
|
||||
@@ -312,6 +314,10 @@ public:
|
||||
partition_snapshot& operator=(const partition_snapshot&) = delete;
|
||||
partition_snapshot& operator=(partition_snapshot&&) = delete;
|
||||
|
||||
static partition_snapshot& container_of(partition_version_ref* ref) {
|
||||
return *boost::intrusive::get_parent_from_member(ref, &partition_snapshot::_version);
|
||||
}
|
||||
|
||||
// If possible merges the version pointed to by this snapshot with
|
||||
// adjacent partition versions. Leaves the snapshot in an unspecified state.
|
||||
// Can be retried if previous merge attempt has failed.
|
||||
@@ -461,8 +467,8 @@ public:
|
||||
//
|
||||
// Returns a coroutine object representing the operation.
|
||||
// The coroutine must be resumed with the region being unlocked.
|
||||
coroutine apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema, logalloc::region&, cache_tracker&,
|
||||
partition_snapshot::phase_type);
|
||||
coroutine apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema, logalloc::allocating_section&,
|
||||
logalloc::region&, cache_tracker&, partition_snapshot::phase_type);
|
||||
|
||||
// If this entry is evictable, cache_tracker must be provided.
|
||||
partition_version& add_version(const schema& s, cache_tracker*);
|
||||
|
||||
49
row_cache.cc
49
row_cache.cc
@@ -947,6 +947,8 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
|
||||
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
// In case updater fails, we must bring the cache to consistency without deferring.
|
||||
auto cleanup = defer([&m, this] {
|
||||
invalidate_sync(m);
|
||||
@@ -955,36 +957,41 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
|
||||
while (!m.partitions.empty()) {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
with_allocator(_tracker.allocator(), [&] () {
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
{
|
||||
size_t partition_count = 0;
|
||||
_update_section(_tracker.region(), [&] {
|
||||
{
|
||||
STAP_PROBE(scylla, row_cache_update_one_batch_start);
|
||||
// FIXME: we should really be checking should_yield() here instead of
|
||||
// need_preempt(). However, should_yield() is currently quite
|
||||
// expensive and we need to amortize it somehow.
|
||||
do {
|
||||
auto i = m.partitions.begin();
|
||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||
with_linearized_managed_bytes([&] {
|
||||
memtable_entry& mem_e = *i;
|
||||
if (!update) {
|
||||
// FIXME: Optimize knowing we lookup in-order.
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(cache_i, mem_e, is_present);
|
||||
size_entry = mem_e.size_in_allocator(_tracker.allocator());
|
||||
_update_section(_tracker.region(), [&] {
|
||||
memtable_entry& mem_e = *m.partitions.begin();
|
||||
size_entry = mem_e.size_in_allocator(_tracker.allocator());
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(_update_section, cache_i, mem_e, is_present);
|
||||
});
|
||||
}
|
||||
// We use cooperative deferring instead of futures so that
|
||||
// this layer has a chance to restore invariants before deferring,
|
||||
// in particular set _prev_snapshot_pos to the correct value.
|
||||
if (update.run() == stop_iteration::no) {
|
||||
return;
|
||||
}
|
||||
update = {};
|
||||
real_dirty_acc.unpin_memory(size_entry);
|
||||
i = m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
_update_section(_tracker.region(), [&] {
|
||||
auto i = m.partitions.begin();
|
||||
memtable_entry& mem_e = *i;
|
||||
m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
});
|
||||
++partition_count;
|
||||
});
|
||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||
@@ -993,11 +1000,13 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
if (m.partitions.empty()) {
|
||||
_prev_snapshot_pos = {};
|
||||
} else {
|
||||
_prev_snapshot_pos = dht::ring_position(m.partitions.begin()->key());
|
||||
_update_section(_tracker.region(), [&] {
|
||||
_prev_snapshot_pos = dht::ring_position(m.partitions.begin()->key());
|
||||
});
|
||||
}
|
||||
});
|
||||
STAP_PROBE1(scylla, row_cache_update_one_batch_end, partition_count);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
seastar::thread::yield();
|
||||
@@ -1007,8 +1016,8 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
}
|
||||
|
||||
future<> row_cache::update(external_updater eu, memtable& m) {
|
||||
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
|
||||
partition_presence_checker& is_present) mutable {
|
||||
return do_update(std::move(eu), m, [this] (logalloc::allocating_section& alloc,
|
||||
row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e, partition_presence_checker& is_present) mutable {
|
||||
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
|
||||
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
|
||||
// search it.
|
||||
@@ -1016,7 +1025,7 @@ future<> row_cache::update(external_updater eu, memtable& m) {
|
||||
cache_entry& entry = *cache_i;
|
||||
upgrade_entry(entry);
|
||||
_tracker.on_partition_merge();
|
||||
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.region(), _tracker,
|
||||
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), alloc, _tracker.region(), _tracker,
|
||||
_underlying_phase);
|
||||
} else if (cache_i->continuous() || is_present(mem_e.key()) == partition_presence_checker_result::definitely_doesnt_exist) {
|
||||
// Partition is absent in underlying. First, insert a neutral partition entry.
|
||||
@@ -1026,7 +1035,7 @@ future<> row_cache::update(external_updater eu, memtable& m) {
|
||||
entry->set_continuous(cache_i->continuous());
|
||||
_tracker.insert(*entry);
|
||||
_partitions.insert(cache_i, *entry);
|
||||
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.region(), _tracker,
|
||||
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), alloc, _tracker.region(), _tracker,
|
||||
_underlying_phase);
|
||||
} else {
|
||||
return make_empty_coroutine();
|
||||
@@ -1035,8 +1044,8 @@ future<> row_cache::update(external_updater eu, memtable& m) {
|
||||
}
|
||||
|
||||
future<> row_cache::update_invalidating(external_updater eu, memtable& m) {
|
||||
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
|
||||
partition_presence_checker& is_present) {
|
||||
return do_update(std::move(eu), m, [this] (logalloc::allocating_section& alloc,
|
||||
row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e, partition_presence_checker& is_present) {
|
||||
if (cache_i != partitions_end() && cache_i->key().equal(*_schema, mem_e.key())) {
|
||||
// FIXME: Invalidate only affected row ranges.
|
||||
// This invalidates all information about the partition.
|
||||
|
||||
@@ -375,7 +375,7 @@ private:
|
||||
// of the range at the time when reading began.
|
||||
|
||||
mutation_source _underlying;
|
||||
phase_type _underlying_phase = 0;
|
||||
phase_type _underlying_phase = partition_snapshot::min_phase;
|
||||
mutation_source_opt _prev_snapshot;
|
||||
|
||||
// Positions >= than this are using _prev_snapshot, the rest is using _underlying.
|
||||
|
||||
@@ -153,7 +153,7 @@ class mvcc_partition;
|
||||
class mvcc_container {
|
||||
cache_tracker _tracker;
|
||||
schema_ptr _schema;
|
||||
partition_snapshot::phase_type _phase = 0;
|
||||
partition_snapshot::phase_type _phase = partition_snapshot::min_phase;
|
||||
public:
|
||||
mvcc_container(schema_ptr s) : _schema(s) {}
|
||||
mvcc_container(mvcc_container&&) = delete;
|
||||
@@ -234,7 +234,7 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc
|
||||
with_allocator(region().allocator(), [&] {
|
||||
logalloc::allocating_section as;
|
||||
auto c = as(region(), [&] {
|
||||
return _e.apply_to_incomplete(*schema(), std::move(src), *src_schema, region(),
|
||||
return _e.apply_to_incomplete(*schema(), std::move(src), *src_schema, as, region(),
|
||||
_container.tracker(), _container.next_phase());
|
||||
});
|
||||
repeat([&] {
|
||||
|
||||
Reference in New Issue
Block a user