cache: Defer during partition merging

This commit is contained in:
Tomasz Grabiec
2018-04-28 17:25:12 +02:00
parent 051bb74583
commit 70c72773be
5 changed files with 156 additions and 79 deletions

View File

@@ -25,6 +25,7 @@
#include "partition_version.hh"
#include "row_cache.hh"
#include "partition_snapshot_row_cursor.hh"
#include "utils/coroutine.hh"
static void remove_or_mark_as_unique_owner(partition_version* current, mutation_cleaner* cleaner)
{
@@ -417,60 +418,110 @@ public:
};
coroutine partition_entry::apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema,
logalloc::region& reg, cache_tracker& tracker, partition_snapshot::phase_type phase)
logalloc::allocating_section& alloc, logalloc::region& reg, cache_tracker& tracker, partition_snapshot::phase_type phase)
{
// This flag controls whether this operation may defer. It is more
// expensive to apply with deferring due to construction of snapshots and
// two-pass application, with the first pass filtering and moving data to
// the new version and the second pass merging it back once all is done.
// We cannot merge into current version because if we defer in the middle
// that may publish partial writes. Also, snapshot construction results in
// creation of garbage objects, partition_version and rows_entry. Garbage
// will yield sparse segments and add overhead due to increased LSA
// segment compaction. This becomes especially significant for small
// partitions where I saw 40% slow down.
const bool preemptible = s.clustering_key_size() > 0;
if (s.version() != pe_schema.version()) {
pe.upgrade(pe_schema.shared_from_this(), s.shared_from_this(), tracker.cleaner(), &tracker);
}
bool can_move = !pe._snapshot;
partition_version* current = &*pe._version;
partition_version& dst = open_version(s, &tracker, phase);
auto snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
bool static_row_continuous = snp->static_row_continuous();
while (current) {
dst.partition().apply(current->partition().partition_tombstone());
if (static_row_continuous) {
row& static_row = dst.partition().static_row();
if (can_move) {
static_row.apply(s, column_kind::static_column, std::move(current->partition().static_row()));
} else {
static_row.apply(s, column_kind::static_column, current->partition().static_row());
}
}
range_tombstone_list& tombstones = dst.partition().row_tombstones();
if (can_move) {
tombstones.apply_monotonically(s, std::move(current->partition().row_tombstones()));
} else {
tombstones.apply_monotonically(s, current->partition().row_tombstones());
}
current = current->next();
can_move &= current && !current->is_referenced();
auto src_snp = pe.read(reg, tracker.cleaner(), s.shared_from_this(), &tracker);
lw_shared_ptr<partition_snapshot> prev_snp;
if (preemptible) {
// Reads must see prev_snp until whole update completes so that writes
// are not partially visible.
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
}
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
auto merge_dst_snp = defer([preemptible, dst_snp, &reg, &alloc] () mutable {
maybe_merge_versions(dst_snp, reg, alloc);
});
partition_entry::rows_iterator source(&*pe.version(), s);
partition_snapshot_row_cursor cur(s, *snp);
while (!source.done()) {
if (!source.is_dummy()) {
tracker.on_row_processed_from_memtable();
auto ropt = cur.ensure_entry_if_complete(source.position());
if (ropt) {
rows_entry& e = ropt->row;
source.consume_row([&] (deletable_row&& row) {
e.row().apply_monotonically(s, std::move(row));
});
if (!ropt->inserted) {
tracker.on_row_merged_from_memtable();
// Once we start updating the partition, we must keep all snapshots until the update completes,
// otherwise partial writes would be published. So the scope of snapshots must enclose the scope
// of allocating sections, so we return here to get out of the current allocating section and
// give the caller a chance to store the coroutine object. The code inside coroutine below
// runs outside allocating section.
return coroutine([&tracker, &s, &alloc, &reg, &acc, can_move, preemptible,
merge_dst_snp = std::move(merge_dst_snp), // needs to go away last so that dst_snp is not owned by anyone else
cur = partition_snapshot_row_cursor(s, *dst_snp),
src_cur = partition_snapshot_row_cursor(s, *src_snp),
dst_snp = std::move(dst_snp),
prev_snp = std::move(prev_snp),
src_snp = std::move(src_snp),
static_done = false] () mutable {
auto&& allocator = reg.allocator();
return alloc(reg, [&] {
return with_linearized_managed_bytes([&] {
if (!static_done) {
partition_version& dst = *dst_snp->version();
bool static_row_continuous = dst_snp->static_row_continuous();
auto current = &*src_snp->version();
while (current) {
dst.partition().apply(current->partition().partition_tombstone());
if (static_row_continuous) {
row& static_row = dst.partition().static_row();
if (can_move) {
static_row.apply(s, column_kind::static_column,
std::move(current->partition().static_row()));
} else {
static_row.apply(s, column_kind::static_column, current->partition().static_row());
}
}
range_tombstone_list& tombstones = dst.partition().row_tombstones();
// FIXME: defer while applying range tombstones
if (can_move) {
tombstones.apply_monotonically(s, std::move(current->partition().row_tombstones()));
} else {
tombstones.apply_monotonically(s, current->partition().row_tombstones());
}
current = current->next();
can_move &= current && !current->is_referenced();
}
static_done = true;
}
} else {
tracker.on_row_dropped_from_memtable();
}
}
source.remove_current_row_when_possible();
source.move_to_next_row();
}
return make_empty_coroutine();
if (!src_cur.maybe_refresh_static()) {
return stop_iteration::yes;
}
do {
if (!src_cur.dummy()) {
tracker.on_row_processed_from_memtable();
auto ropt = cur.ensure_entry_if_complete(src_cur.position());
if (ropt) {
if (!ropt->inserted) {
tracker.on_row_merged_from_memtable();
}
rows_entry& e = ropt->row;
src_cur.consume_row([&](deletable_row&& row) {
e.row().apply_monotonically(s, std::move(row));
});
} else {
tracker.on_row_dropped_from_memtable();
}
}
if (!src_cur.next()) {
return stop_iteration::yes;
}
} while (!preemptible || !need_preempt());
return stop_iteration::no;
});
});
});
}
mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
@@ -506,16 +557,27 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&
lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
{
with_allocator(r.allocator(), [&] {
open_version(*entry_schema, tracker, phase);
});
if (_snapshot) {
return _snapshot->shared_from_this();
} else {
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
_snapshot = snp.get();
return snp;
if (_snapshot->_phase == phase) {
return _snapshot->shared_from_this();
} else if (phase < _snapshot->_phase) {
// If entry is being updated, we will get reads for non-latest phase, and
// they must attach to the non-current version.
partition_version* second = _version->next();
assert(second && second->is_referenced());
auto snp = partition_snapshot::container_of(second->_backref).shared_from_this();
assert(phase == snp->_phase);
return snp;
} else { // phase > _snapshot->_phase
with_allocator(r.allocator(), [&] {
add_version(*entry_schema, tracker);
});
}
}
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
_snapshot = snp.get();
return snp;
}
std::vector<range_tombstone>

View File

@@ -147,6 +147,7 @@ class partition_version : public anchorless_list_base_hook<partition_version> {
mutation_partition _partition;
friend class partition_version_ref;
friend class partition_entry;
public:
static partition_version& container_of(mutation_partition& mp) {
return *boost::intrusive::get_parent_from_member(&mp, &partition_version::_partition);
@@ -260,7 +261,8 @@ class partition_snapshot : public enable_lw_shared_from_this<partition_snapshot>
public:
// Only snapshots created with the same value of phase can point to the same version.
using phase_type = uint64_t;
static constexpr phase_type default_phase = 0;
static constexpr phase_type default_phase = 0; // For use with non-evictable snapshots
static constexpr phase_type min_phase = 1; // Use 1 to prevent underflow on apply_to_incomplete()
static constexpr phase_type max_phase = std::numeric_limits<phase_type>::max();
public:
// Used for determining reference stability.
@@ -312,6 +314,10 @@ public:
partition_snapshot& operator=(const partition_snapshot&) = delete;
partition_snapshot& operator=(partition_snapshot&&) = delete;
static partition_snapshot& container_of(partition_version_ref* ref) {
return *boost::intrusive::get_parent_from_member(ref, &partition_snapshot::_version);
}
// If possible merges the version pointed to by this snapshot with
// adjacent partition versions. Leaves the snapshot in an unspecified state.
// Can be retried if previous merge attempt has failed.
@@ -461,8 +467,8 @@ public:
//
// Returns a coroutine object representing the operation.
// The coroutine must be resumed with the region being unlocked.
coroutine apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema, logalloc::region&, cache_tracker&,
partition_snapshot::phase_type);
coroutine apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema, logalloc::allocating_section&,
logalloc::region&, cache_tracker&, partition_snapshot::phase_type);
// If this entry is evictable, cache_tracker must be provided.
partition_version& add_version(const schema& s, cache_tracker*);

View File

@@ -947,6 +947,8 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
});
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
coroutine update;
size_t size_entry;
// In case updater fails, we must bring the cache to consistency without deferring.
auto cleanup = defer([&m, this] {
invalidate_sync(m);
@@ -955,36 +957,41 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
});
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
while (!m.partitions.empty()) {
coroutine update;
size_t size_entry;
with_allocator(_tracker.allocator(), [&] () {
auto cmp = cache_entry::compare(_schema);
{
size_t partition_count = 0;
_update_section(_tracker.region(), [&] {
{
STAP_PROBE(scylla, row_cache_update_one_batch_start);
// FIXME: we should really be checking should_yield() here instead of
// need_preempt(). However, should_yield() is currently quite
// expensive and we need to amortize it somehow.
do {
auto i = m.partitions.begin();
STAP_PROBE(scylla, row_cache_update_partition_start);
with_linearized_managed_bytes([&] {
memtable_entry& mem_e = *i;
if (!update) {
// FIXME: Optimize knowing we lookup in-order.
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
update = updater(cache_i, mem_e, is_present);
size_entry = mem_e.size_in_allocator(_tracker.allocator());
_update_section(_tracker.region(), [&] {
memtable_entry& mem_e = *m.partitions.begin();
size_entry = mem_e.size_in_allocator(_tracker.allocator());
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
update = updater(_update_section, cache_i, mem_e, is_present);
});
}
// We use cooperative deferring instead of futures so that
// this layer has a chance to restore invariants before deferring,
// in particular set _prev_snapshot_pos to the correct value.
if (update.run() == stop_iteration::no) {
return;
}
update = {};
real_dirty_acc.unpin_memory(size_entry);
i = m.partitions.erase(i);
mem_e.partition().evict(_tracker.memtable_cleaner());
current_allocator().destroy(&mem_e);
_update_section(_tracker.region(), [&] {
auto i = m.partitions.begin();
memtable_entry& mem_e = *i;
m.partitions.erase(i);
mem_e.partition().evict(_tracker.memtable_cleaner());
current_allocator().destroy(&mem_e);
});
++partition_count;
});
STAP_PROBE(scylla, row_cache_update_partition_end);
@@ -993,11 +1000,13 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
if (m.partitions.empty()) {
_prev_snapshot_pos = {};
} else {
_prev_snapshot_pos = dht::ring_position(m.partitions.begin()->key());
_update_section(_tracker.region(), [&] {
_prev_snapshot_pos = dht::ring_position(m.partitions.begin()->key());
});
}
});
STAP_PROBE1(scylla, row_cache_update_one_batch_end, partition_count);
});
}
}
});
seastar::thread::yield();
@@ -1007,8 +1016,8 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
}
future<> row_cache::update(external_updater eu, memtable& m) {
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
partition_presence_checker& is_present) mutable {
return do_update(std::move(eu), m, [this] (logalloc::allocating_section& alloc,
row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e, partition_presence_checker& is_present) mutable {
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
// search it.
@@ -1016,7 +1025,7 @@ future<> row_cache::update(external_updater eu, memtable& m) {
cache_entry& entry = *cache_i;
upgrade_entry(entry);
_tracker.on_partition_merge();
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.region(), _tracker,
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), alloc, _tracker.region(), _tracker,
_underlying_phase);
} else if (cache_i->continuous() || is_present(mem_e.key()) == partition_presence_checker_result::definitely_doesnt_exist) {
// Partition is absent in underlying. First, insert a neutral partition entry.
@@ -1026,7 +1035,7 @@ future<> row_cache::update(external_updater eu, memtable& m) {
entry->set_continuous(cache_i->continuous());
_tracker.insert(*entry);
_partitions.insert(cache_i, *entry);
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.region(), _tracker,
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), alloc, _tracker.region(), _tracker,
_underlying_phase);
} else {
return make_empty_coroutine();
@@ -1035,8 +1044,8 @@ future<> row_cache::update(external_updater eu, memtable& m) {
}
future<> row_cache::update_invalidating(external_updater eu, memtable& m) {
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
partition_presence_checker& is_present) {
return do_update(std::move(eu), m, [this] (logalloc::allocating_section& alloc,
row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e, partition_presence_checker& is_present) {
if (cache_i != partitions_end() && cache_i->key().equal(*_schema, mem_e.key())) {
// FIXME: Invalidate only affected row ranges.
// This invalidates all information about the partition.

View File

@@ -375,7 +375,7 @@ private:
// of the range at the time when reading began.
mutation_source _underlying;
phase_type _underlying_phase = 0;
phase_type _underlying_phase = partition_snapshot::min_phase;
mutation_source_opt _prev_snapshot;
// Positions >= than this are using _prev_snapshot, the rest is using _underlying.

View File

@@ -153,7 +153,7 @@ class mvcc_partition;
class mvcc_container {
cache_tracker _tracker;
schema_ptr _schema;
partition_snapshot::phase_type _phase = 0;
partition_snapshot::phase_type _phase = partition_snapshot::min_phase;
public:
mvcc_container(schema_ptr s) : _schema(s) {}
mvcc_container(mvcc_container&&) = delete;
@@ -234,7 +234,7 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc
with_allocator(region().allocator(), [&] {
logalloc::allocating_section as;
auto c = as(region(), [&] {
return _e.apply_to_incomplete(*schema(), std::move(src), *src_schema, region(),
return _e.apply_to_incomplete(*schema(), std::move(src), *src_schema, as, region(),
_container.tracker(), _container.next_phase());
});
repeat([&] {