Merge "Make in-memory partition version merging preemptable" from Tomasz

"
Partition snapshots go away when the last read using the snapshot is done.
Currently we will synchronously attempt to merge partition versions on this event.
If partitions are large, that may stall the reactor for a significant amount of time,
depending on the size of newer versions. Cache update on memtable flush can
create especially large versions.

The solution implemented in this series is to allow merging to be preemptable,
and continue in the background. Background merging is done by the mutation_cleaner
associated with the container (memtable, cache). There is a single merging process
per mutation_cleaner. The merging worker runs in a separate scheduling group,
introduced here, called "mem_compaction".

When the last user of a snapshot goes away the snapshot is slided to the
oldest unreferenced version first so that the version is no longer reachable
from partition_entry::read(). The cleaner will then keep merging preceding
(newer) versions into it, until it merges a version which is referenced. The
merging is preemtable. If the initial merging is preempted, the snapshot is
enqueued into the cleaner, the worker woken up, and merging will continue
asynchronously.

When memtable is merged with cache, its cleaner is merged with cache cleaner,
so any outstanding background merges will be continued by the cache cleaner
without disruption.

This reduces scheduling latency spikes in tests/perf_row_cache_update
for the case of large partition with many rows. For -c1 -m1G I saw
them dropping from >23ms to 1-2ms. System-level benchmark using scylla-bench
shows a similar improvement.
"

* tag 'tgrabiec/merge-snapshots-gradually-v4' of github.com:tgrabiec/scylla:
  tests: perf_row_cache_update: Test with an active reader surviving memtable flush
  memtable, cache: Run mutation_cleaner worker in its own scheduling group
  mutation_cleaner: Make merge() redirect old instance to the new one
  mvcc: Use RAII to ensure that partition versions are merged
  mvcc: Merge partition version versions gradually in the background
  mutation_partition: Make merging preemtable
  tests: mvcc: Use the standard maybe_merge_versions() to merge snapshots
This commit is contained in:
Avi Kivity
2018-07-01 15:32:51 +03:00
20 changed files with 440 additions and 156 deletions

View File

@@ -64,7 +64,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
end_of_stream
};
lw_shared_ptr<partition_snapshot> _snp;
partition_snapshot_ptr _snp;
position_in_partition::tri_compare _position_cmp;
query::clustering_key_filter_ranges _ck_ranges;
@@ -129,7 +129,7 @@ public:
dht::decorated_key dk,
query::clustering_key_filter_ranges&& crr,
lw_shared_ptr<read_context> ctx,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
row_cache& cache)
: flat_mutation_reader::impl(std::move(s))
, _snp(std::move(snp))
@@ -149,9 +149,6 @@ public:
cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete;
cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual ~cache_flat_mutation_reader() {
maybe_merge_versions(_snp, _lsa_manager.region(), _lsa_manager.read_section());
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
@@ -667,7 +664,7 @@ inline flat_mutation_reader make_cache_flat_mutation_reader(schema_ptr s,
query::clustering_key_filter_ranges crr,
row_cache& cache,
lw_shared_ptr<cache::read_context> ctx,
lw_shared_ptr<partition_snapshot> snp)
partition_snapshot_ptr snp)
{
return make_flat_mutation_reader<cache::cache_flat_mutation_reader>(
std::move(s), std::move(dk), std::move(crr), std::move(ctx), std::move(snp), cache);

View File

@@ -182,7 +182,7 @@ thread_local dirty_memory_manager default_dirty_memory_manager;
lw_shared_ptr<memtable_list>
table::make_memory_only_memtable_list() {
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(get_schema), _config.dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(get_schema), _config.dirty_memory_manager, _config.memory_compaction_scheduling_group);
}
lw_shared_ptr<memtable_list>
@@ -191,7 +191,7 @@ table::make_memtable_list() {
return seal_active_memtable(std::move(permit));
};
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _config.memory_compaction_scheduling_group);
}
lw_shared_ptr<memtable_list>
@@ -200,7 +200,7 @@ table::make_streaming_memtable_list() {
return seal_active_streaming_memtable_immediate(std::move(permit));
};
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _config.streaming_scheduling_group);
}
lw_shared_ptr<memtable_list>
@@ -209,7 +209,7 @@ table::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
return seal_active_streaming_memtable_big(smb, std::move(permit));
};
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _config.streaming_scheduling_group);
}
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
@@ -2158,6 +2158,8 @@ database::database(const db::config& cfg, database_config dbcfg)
_compaction_manager->start();
setup_metrics();
_row_cache_tracker.set_compaction_scheduling_group(dbcfg.memory_compaction_scheduling_group);
dblog.info("Row: max_vector_size: {}, internal_count: {}", size_t(row::max_vector_size), size_t(row::internal_count));
}
@@ -2845,6 +2847,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.compaction_scheduling_group = _config.compaction_scheduling_group;
cfg.memory_compaction_scheduling_group = _config.memory_compaction_scheduling_group;
cfg.memtable_scheduling_group = _config.memtable_scheduling_group;
cfg.memtable_to_cache_scheduling_group = _config.memtable_to_cache_scheduling_group;
cfg.streaming_scheduling_group = _config.streaming_scheduling_group;
@@ -3396,7 +3399,7 @@ future<> memtable_list::request_flush() {
}
lw_shared_ptr<memtable> memtable_list::new_memtable() {
return make_lw_shared<memtable>(_current_schema(), *_dirty_memory_manager, this);
return make_lw_shared<memtable>(_current_schema(), *_dirty_memory_manager, this, _compaction_scheduling_group);
}
future<flush_permit> flush_permit::reacquire_sstable_write_permit() && {
@@ -3625,6 +3628,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.enable_incremental_backups = _enable_incremental_backups;
cfg.compaction_scheduling_group = _dbcfg.compaction_scheduling_group;
cfg.memory_compaction_scheduling_group = _dbcfg.memory_compaction_scheduling_group;
cfg.memtable_scheduling_group = _dbcfg.memtable_scheduling_group;
cfg.memtable_to_cache_scheduling_group = _dbcfg.memtable_to_cache_scheduling_group;
cfg.streaming_scheduling_group = _dbcfg.streaming_scheduling_group;

View File

@@ -164,29 +164,33 @@ private:
std::function<schema_ptr()> _current_schema;
dirty_memory_manager* _dirty_memory_manager;
std::experimental::optional<shared_promise<>> _flush_coalescing;
seastar::scheduling_group _compaction_scheduling_group;
public:
memtable_list(
seal_immediate_fn_type seal_immediate_fn,
seal_delayed_fn_type seal_delayed_fn,
std::function<schema_ptr()> cs,
dirty_memory_manager* dirty_memory_manager)
dirty_memory_manager* dirty_memory_manager,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group())
: _memtables({})
, _seal_immediate_fn(seal_immediate_fn)
, _seal_delayed_fn(seal_delayed_fn)
, _current_schema(cs)
, _dirty_memory_manager(dirty_memory_manager) {
, _dirty_memory_manager(dirty_memory_manager)
, _compaction_scheduling_group(compaction_scheduling_group) {
add_memtable();
}
memtable_list(
seal_immediate_fn_type seal_immediate_fn,
std::function<schema_ptr()> cs,
dirty_memory_manager* dirty_memory_manager)
: memtable_list(std::move(seal_immediate_fn), {}, std::move(cs), dirty_memory_manager) {
dirty_memory_manager* dirty_memory_manager,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group())
: memtable_list(std::move(seal_immediate_fn), {}, std::move(cs), dirty_memory_manager, compaction_scheduling_group) {
}
memtable_list(std::function<schema_ptr()> cs, dirty_memory_manager* dirty_memory_manager)
: memtable_list({}, {}, std::move(cs), dirty_memory_manager) {
memtable_list(std::function<schema_ptr()> cs, dirty_memory_manager* dirty_memory_manager, seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group())
: memtable_list({}, {}, std::move(cs), dirty_memory_manager, compaction_scheduling_group) {
}
bool may_flush() const {
@@ -312,6 +316,7 @@ public:
seastar::scheduling_group memtable_scheduling_group;
seastar::scheduling_group memtable_to_cache_scheduling_group;
seastar::scheduling_group compaction_scheduling_group;
seastar::scheduling_group memory_compaction_scheduling_group;
seastar::scheduling_group statement_scheduling_group;
seastar::scheduling_group streaming_scheduling_group;
bool enable_metrics_reporting = false;
@@ -1039,6 +1044,7 @@ public:
seastar::scheduling_group memtable_scheduling_group;
seastar::scheduling_group memtable_to_cache_scheduling_group;
seastar::scheduling_group compaction_scheduling_group;
seastar::scheduling_group memory_compaction_scheduling_group;
seastar::scheduling_group statement_scheduling_group;
seastar::scheduling_group streaming_scheduling_group;
bool enable_metrics_reporting = false;
@@ -1119,6 +1125,7 @@ struct database_config {
seastar::scheduling_group memtable_scheduling_group;
seastar::scheduling_group memtable_to_cache_scheduling_group; // FIXME: merge with memtable_scheduling_group
seastar::scheduling_group compaction_scheduling_group;
seastar::scheduling_group memory_compaction_scheduling_group;
seastar::scheduling_group statement_scheduling_group;
seastar::scheduling_group streaming_scheduling_group;
size_t available_memory;

View File

@@ -490,6 +490,7 @@ int main(int ac, char** av) {
}
};
dbcfg.compaction_scheduling_group = make_sched_group("compaction", 1000);
dbcfg.memory_compaction_scheduling_group = make_sched_group("mem_compaction", 1000);
dbcfg.streaming_scheduling_group = make_sched_group("streaming", 200);
dbcfg.statement_scheduling_group = make_sched_group("statement", 1000);
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);

View File

@@ -27,11 +27,11 @@
#include "schema_upgrader.hh"
#include "partition_builder.hh"
memtable::memtable(schema_ptr schema, dirty_memory_manager& dmm, memtable_list* memtable_list)
memtable::memtable(schema_ptr schema, dirty_memory_manager& dmm, memtable_list* memtable_list,
seastar::scheduling_group compaction_scheduling_group)
: logalloc::region(dmm.region_group())
, _dirty_mgr(dmm)
, _memtable_cleaner(*this, no_cache_tracker)
, _cleaner(&_memtable_cleaner)
, _cleaner(*this, no_cache_tracker, compaction_scheduling_group)
, _memtable_list(memtable_list)
, _schema(std::move(schema))
, partitions(memtable_entry::compare(_schema)) {
@@ -56,10 +56,9 @@ void memtable::clear() noexcept {
auto dirty_before = dirty_size();
with_allocator(allocator(), [this] {
partitions.clear_and_dispose([this] (memtable_entry* e) {
e->partition().evict(_memtable_cleaner);
e->partition().evict(_cleaner);
current_deleter<memtable_entry>()(e);
});
_memtable_cleaner.clear();
});
remove_flushed_memory(dirty_before - dirty_size());
}
@@ -322,7 +321,7 @@ public:
_delegate = delegate_reader(*_delegate_range, _slice, _pc, streamed_mutation::forwarding::no, _fwd_mr);
} else {
auto key_and_snp = read_section()(region(), [&] {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, lw_shared_ptr<partition_snapshot>>> {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry *e = fetch_entry();
if (!e) {
return { };
@@ -484,7 +483,7 @@ private:
void get_next_partition() {
uint64_t component_size = 0;
auto key_and_snp = read_section()(region(), [&] {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, lw_shared_ptr<partition_snapshot>>> {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry* e = fetch_entry();
if (e) {
auto dk = e->key();
@@ -550,7 +549,7 @@ public:
}
};
lw_shared_ptr<partition_snapshot> memtable_entry::snapshot(memtable& mtbl) {
partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker);
}
@@ -564,7 +563,7 @@ memtable::make_flat_reader(schema_ptr s,
mutation_reader::forwarding fwd_mr) {
if (query::is_single_partition(range)) {
const query::ring_position& pos = range.start()->value();
auto snp = _read_section(*this, [&] () -> lw_shared_ptr<partition_snapshot> {
auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr {
managed_bytes::linearization_context_guard lcg;
auto i = partitions.find(pos, memtable_entry::compare(_schema));
if (i != partitions.end()) {

View File

@@ -66,7 +66,7 @@ public:
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const { return _schema; }
schema_ptr& schema() { return _schema; }
lw_shared_ptr<partition_snapshot> snapshot(memtable& mtbl);
partition_snapshot_ptr snapshot(memtable& mtbl);
size_t external_memory_usage_without_rows() const {
return _key.key().external_memory_usage();
@@ -125,8 +125,7 @@ public:
bi::compare<memtable_entry::compare>>;
private:
dirty_memory_manager& _dirty_mgr;
mutation_cleaner _memtable_cleaner;
mutation_cleaner* _cleaner; // will switch to cache's cleaner after memtable is moved to cache.
mutation_cleaner _cleaner;
memtable_list *_memtable_list;
schema_ptr _schema;
logalloc::allocating_section _read_section;
@@ -254,7 +253,8 @@ private:
void clear() noexcept;
uint64_t dirty_size() const;
public:
explicit memtable(schema_ptr schema, dirty_memory_manager&, memtable_list *memtable_list = nullptr);
explicit memtable(schema_ptr schema, dirty_memory_manager&, memtable_list *memtable_list = nullptr,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group());
// Used for testing that want to control the flush process.
explicit memtable(schema_ptr schema);
~memtable();
@@ -294,7 +294,7 @@ public:
}
mutation_cleaner& cleaner() {
return *_cleaner;
return _cleaner;
}
public:
memtable_list* get_memtable_list() {

View File

@@ -26,6 +26,76 @@
#include "utils/logalloc.hh"
class mutation_cleaner_impl final {
using snapshot_list = boost::intrusive::slist<partition_snapshot,
boost::intrusive::member_hook<partition_snapshot, boost::intrusive::slist_member_hook<>, &partition_snapshot::_cleaner_hook>>;
struct worker {
condition_variable cv;
snapshot_list snapshots;
logalloc::allocating_section alloc_section;
bool done = false; // true means the worker was abandoned and cannot access the mutation_cleaner_impl instance.
};
private:
logalloc::region& _region;
cache_tracker* _tracker;
partition_version_list _versions;
lw_shared_ptr<worker> _worker_state;
seastar::scheduling_group _scheduling_group;
private:
stop_iteration merge_some(partition_snapshot& snp) noexcept;
stop_iteration merge_some() noexcept;
void start_worker();
public:
mutation_cleaner_impl(logalloc::region& r, cache_tracker* t, seastar::scheduling_group sg = seastar::current_scheduling_group())
: _region(r)
, _tracker(t)
, _worker_state(make_lw_shared<worker>())
, _scheduling_group(sg)
{
start_worker();
}
~mutation_cleaner_impl();
stop_iteration clear_gently() noexcept;
memory::reclaiming_result clear_some() noexcept;
void clear() noexcept;
void destroy_later(partition_version& v) noexcept;
void destroy_gently(partition_version& v) noexcept;
void merge(mutation_cleaner_impl& other) noexcept;
bool empty() const noexcept { return _versions.empty(); }
future<> drain();
void merge_and_destroy(partition_snapshot&) noexcept;
void set_scheduling_group(seastar::scheduling_group sg) {
_scheduling_group = sg;
_worker_state->cv.broadcast();
}
};
inline
void mutation_cleaner_impl::destroy_later(partition_version& v) noexcept {
_versions.push_back(v);
}
inline
void mutation_cleaner_impl::destroy_gently(partition_version& v) noexcept {
if (v.clear_gently(_tracker) == stop_iteration::no) {
destroy_later(v);
} else {
current_allocator().destroy(&v);
}
}
inline
void mutation_cleaner_impl::merge_and_destroy(partition_snapshot& ps) noexcept {
if (ps.slide_to_oldest() == stop_iteration::yes || merge_some(ps) == stop_iteration::yes) {
lw_shared_ptr<partition_snapshot>::dispose(&ps);
} else {
// The snapshot must not be reachable by partitino_entry::read() after this,
// which is ensured by slide_to_oldest() == stop_iteration::no.
_worker_state->snapshots.push_front(ps);
_worker_state->cv.signal();
}
}
// Container for garbage partition_version objects, used for freeing them incrementally.
//
// Mutation cleaner extends the lifetime of mutation_partition without doing
@@ -36,57 +106,71 @@
// mutation_cleaner should not be thread local objects (or members of thread
// local objects).
class mutation_cleaner final {
logalloc::region& _region;
cache_tracker* _tracker;
partition_version_list _versions;
lw_shared_ptr<mutation_cleaner_impl> _impl;
public:
mutation_cleaner(logalloc::region& r, cache_tracker* t) : _region(r), _tracker(t) {}
~mutation_cleaner();
mutation_cleaner(logalloc::region& r, cache_tracker* t, seastar::scheduling_group sg = seastar::current_scheduling_group())
: _impl(make_lw_shared<mutation_cleaner_impl>(r, t, sg)) {
}
void set_scheduling_group(seastar::scheduling_group sg) {
_impl->set_scheduling_group(sg);
}
// Frees some of the data. Returns stop_iteration::yes iff all was freed.
// Must be invoked under owning allocator.
stop_iteration clear_gently() noexcept;
stop_iteration clear_gently() noexcept {
return _impl->clear_gently();
}
// Must be invoked under owning allocator.
memory::reclaiming_result clear_some() noexcept;
memory::reclaiming_result clear_some() noexcept {
return _impl->clear_some();
}
// Must be invoked under owning allocator.
void clear() noexcept;
void clear() noexcept {
_impl->clear();
}
// Enqueues v for destruction.
// The object must not be part of any list, and must not be accessed externally any more.
// In particular, it must not be attached, even indirectly, to any snapshot or partition_entry,
// and must not be evicted from.
// Must be invoked under owning allocator.
void destroy_later(partition_version& v) noexcept;
void destroy_later(partition_version& v) noexcept {
return _impl->destroy_later(v);
}
// Destroys v now or later.
// Same requirements as destroy_later().
// Must be invoked under owning allocator.
void destroy_gently(partition_version& v) noexcept;
void destroy_gently(partition_version& v) noexcept {
return _impl->destroy_gently(v);
}
// Transfers objects from other to this.
// This and other must belong to the same logalloc::region, and the same cache_tracker.
// After the call bool(other) is false.
void merge(mutation_cleaner& other) noexcept;
// After the call other will refer to this cleaner.
void merge(mutation_cleaner& other) noexcept {
_impl->merge(*other._impl);
other._impl = _impl;
}
// Returns true iff contains no unfreed objects
bool empty() const noexcept { return _versions.empty(); }
bool empty() const noexcept {
return _impl->empty();
}
// Forces cleaning and returns a future which resolves when there is nothing to clean.
future<> drain();
};
inline
void mutation_cleaner::destroy_later(partition_version& v) noexcept {
_versions.push_back(v);
}
inline
void mutation_cleaner::destroy_gently(partition_version& v) noexcept {
if (v.clear_gently(_tracker) == stop_iteration::no) {
destroy_later(v);
} else {
current_allocator().destroy(&v);
future<> drain() {
return _impl->drain();
}
}
// Will merge given snapshot using partition_snapshot::merge_partition_versions() and then destroys it
// using destroy_from_this(), possibly deferring in between.
// This instance becomes the sole owner of the partition_snapshot object, the caller should not destroy it
// nor access it after calling this.
void merge_and_destroy(partition_snapshot& ps) {
return _impl->merge_and_destroy(ps);
}
};

View File

@@ -280,12 +280,15 @@ mutation_partition::apply(const schema& s, const mutation_fragment& mf) {
mf.visit(applier);
}
void mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker* tracker) {
stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker* tracker, is_preemptible preemptible) {
_tombstone.apply(p._tombstone);
_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones));
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
_static_row_continuous |= p._static_row_continuous;
if (_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones), preemptible) == stop_iteration::no) {
return stop_iteration::no;
}
rows_entry::compare less(s);
auto del = current_deleter<rows_entry>();
auto p_i = p._rows.begin();
@@ -317,22 +320,34 @@ void mutation_partition::apply_monotonically(const schema& s, mutation_partition
// Newer evictable versions store complete rows
i->_row = std::move(src_e._row);
} else {
memory::on_alloc_point();
i->_row.apply_monotonically(s, std::move(src_e._row));
}
i->set_continuous(continuous);
i->set_dummy(dummy);
p_i = p._rows.erase_and_dispose(p_i, del);
}
if (preemptible && need_preempt() && p_i != p._rows.end()) {
// We cannot leave p with the clustering range up to p_i->position()
// marked as continuous because some of its sub-ranges may have originally been discontinuous.
// This would result in the sum of this and p to have broader continuity after preemption,
// also possibly violating the invariant of non-overlapping continuity between MVCC versions,
// if that's what we're merging here.
// It's always safe to mark the range as discontinuous.
p_i->set_continuous(false);
return stop_iteration::no;
}
}
return stop_iteration::yes;
}
void mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema) {
stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema, is_preemptible preemptible) {
if (s.version() == p_schema.version()) {
apply_monotonically(s, std::move(p), no_cache_tracker);
return apply_monotonically(s, std::move(p), no_cache_tracker, preemptible);
} else {
mutation_partition p2(s, p);
p2.upgrade(p_schema, s);
apply_monotonically(s, std::move(p2), no_cache_tracker);
return apply_monotonically(s, std::move(p2), no_cache_tracker, is_preemptible::no); // FIXME: make preemptible
}
}
@@ -2305,17 +2320,20 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
return f.finally([r_a_r = std::move(r_a_r)] { });
}
mutation_cleaner::~mutation_cleaner() {
mutation_cleaner_impl::~mutation_cleaner_impl() {
_worker_state->done = true;
_worker_state->cv.signal();
_worker_state->snapshots.clear_and_dispose(typename lw_shared_ptr<partition_snapshot>::disposer());
with_allocator(_region.allocator(), [this] {
clear();
});
}
void mutation_cleaner::clear() noexcept {
void mutation_cleaner_impl::clear() noexcept {
while (clear_gently() == stop_iteration::no) ;
}
stop_iteration mutation_cleaner::clear_gently() noexcept {
stop_iteration mutation_cleaner_impl::clear_gently() noexcept {
while (clear_some() == memory::reclaiming_result::reclaimed_something) {
if (need_preempt()) {
return stop_iteration::no;
@@ -2324,7 +2342,7 @@ stop_iteration mutation_cleaner::clear_gently() noexcept {
return stop_iteration::yes;
}
memory::reclaiming_result mutation_cleaner::clear_some() noexcept {
memory::reclaiming_result mutation_cleaner_impl::clear_some() noexcept {
if (_versions.empty()) {
return memory::reclaiming_result::reclaimed_nothing;
}
@@ -2337,14 +2355,81 @@ memory::reclaiming_result mutation_cleaner::clear_some() noexcept {
return memory::reclaiming_result::reclaimed_something;
}
void mutation_cleaner::merge(mutation_cleaner& r) noexcept {
void mutation_cleaner_impl::merge(mutation_cleaner_impl& r) noexcept {
_versions.splice(r._versions);
_worker_state->snapshots.splice(_worker_state->snapshots.end(), r._worker_state->snapshots);
if (!_worker_state->snapshots.empty()) {
_worker_state->cv.signal();
}
}
future<> mutation_cleaner::drain() {
return repeat([this] {
return with_allocator(_region.allocator(), [this] {
return clear_gently();
void mutation_cleaner_impl::start_worker() {
auto f = repeat([w = _worker_state, this] () mutable noexcept {
if (w->done) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_scheduling_group(_scheduling_group, [w, this] {
return w->cv.wait([w] {
return w->done || !w->snapshots.empty();
}).then([this, w] () noexcept {
if (w->done) {
return stop_iteration::yes;
}
merge_some();
return stop_iteration::no;
});
});
});
if (f.failed()) {
f.get();
}
}
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
auto&& region = snp.region();
return with_allocator(region.allocator(), [&] {
return with_linearized_managed_bytes([&] {
// Allocating sections require the region to be reclaimable
// which means that they cannot be nested.
// It is, however, possible, that if the snapshot is taken
// inside an allocating section and then an exception is thrown
// this function will be called to clean up even though we
// still will be in the context of the allocating section.
if (!region.reclaiming_enabled()) {
return stop_iteration::no;
}
try {
return _worker_state->alloc_section(region, [&] {
return snp.merge_partition_versions();
});
} catch (...) {
// Merging failed, give up as there is no guarantee of forward progress.
return stop_iteration::yes;
}
});
});
}
stop_iteration mutation_cleaner_impl::merge_some() noexcept {
if (_worker_state->snapshots.empty()) {
return stop_iteration::yes;
}
partition_snapshot& snp = _worker_state->snapshots.front();
if (merge_some(snp) == stop_iteration::yes) {
_worker_state->snapshots.pop_front();
lw_shared_ptr<partition_snapshot>::dispose(&snp);
}
return stop_iteration::no;
}
future<> mutation_cleaner_impl::drain() {
return repeat([this] {
return merge_some();
}).then([this] {
return repeat([this] {
return with_allocator(_region.allocator(), [this] {
return clear_gently();
});
});
});
}

View File

@@ -46,6 +46,7 @@
#include "clustering_key_filter.hh"
#include "intrusive_set_external_comparator.hh"
#include "utils/with_relational_operators.hh"
#include "utils/preempt.hh"
class mutation_fragment;
class clustering_row;
@@ -987,8 +988,19 @@ public:
// This instance and p are governed by the same schema.
//
// Must be provided with a pointer to the cache_tracker, which owns both this and p.
void apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker*);
void apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema);
//
// Returns stop_iteration::no if the operation was preempted before finished, and stop_iteration::yes otherwise.
// On preemption the sum of this and p stays the same (represents the same set of writes), and the state of this
// object contains at least all the writes it contained before the call (monotonicity). It may contain partial writes.
// Also, some progress is always guaranteed (liveness).
//
// The operation can be drien to completion like this:
//
// while (apply_monotonically(..., is_preemtable::yes) == stop_iteration::no) { }
//
// If is_preemptible::no is passed as argument then stop_iteration::no is never returned.
stop_iteration apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker*, is_preemptible = is_preemptible::no);
stop_iteration apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema, is_preemptible = is_preemptible::no);
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker.

View File

@@ -33,34 +33,6 @@ struct partition_snapshot_reader_dummy_accounter {
};
extern partition_snapshot_reader_dummy_accounter no_accounter;
inline void maybe_merge_versions(lw_shared_ptr<partition_snapshot>& snp,
logalloc::region& lsa_region,
logalloc::allocating_section& read_section) {
if (!snp.owned()) {
return;
}
// If no one else is using this particular snapshot try to merge partition
// versions.
with_allocator(lsa_region.allocator(), [&snp, &lsa_region, &read_section] {
return with_linearized_managed_bytes([&snp, &lsa_region, &read_section] {
try {
// Allocating sections require the region to be reclaimable
// which means that they cannot be nested.
// It is, however, possible, that if the snapshot is taken
// inside an allocating section and then an exception is thrown
// this function will be called to clean up even though we
// still will be in the context of the allocating section.
if (lsa_region.reclaiming_enabled()) {
read_section(lsa_region, [&snp] {
snp->merge_partition_versions();
});
}
} catch (...) { }
snp = {};
});
});
}
template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public MemoryAccounter {
struct rows_position {
@@ -87,7 +59,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
position_in_partition::equal_compare _eq;
heap_compare _heap_cmp;
lw_shared_ptr<partition_snapshot> _snapshot;
partition_snapshot_ptr _snapshot;
logalloc::region& _region;
logalloc::allocating_section& _read_section;
@@ -99,7 +71,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
private:
template<typename Function>
decltype(auto) in_alloc_section(Function&& fn) {
return _read_section.with_reclaiming_disabled(_region, [&] {
return _read_section.with_reclaiming_disabled(_region, [&] {
return with_linearized_managed_bytes([&] {
return fn();
});
@@ -155,7 +127,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return !_clustering_rows.empty();
}
public:
explicit lsa_partition_reader(const schema& s, lw_shared_ptr<partition_snapshot> snp,
explicit lsa_partition_reader(const schema& s, partition_snapshot_ptr snp,
logalloc::region& region, logalloc::allocating_section& read_section,
bool digest_requested)
: _schema(s)
@@ -168,10 +140,6 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
, _digest_requested(digest_requested)
{ }
~lsa_partition_reader() {
maybe_merge_versions(_snapshot, _region, _read_section);
}
template<typename Function>
decltype(auto) with_reserve(Function&& fn) {
return _read_section.with_reserve(std::forward<Function>(fn));
@@ -187,7 +155,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return _snapshot->static_row(_digest_requested);
});
}
// Returns next clustered row in the range.
// If the ck_range is the same as the one used previously last_row needs
// to be engaged and equal the position of the row returned last time.
@@ -298,7 +266,7 @@ private:
}
public:
template <typename... Args>
partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, partition_snapshot_ptr snp,
query::clustering_key_filter_ranges crr, bool digest_requested,
logalloc::region& region, logalloc::allocating_section& read_section,
boost::any pointer_to_container, Args&&... args)
@@ -344,7 +312,7 @@ inline flat_mutation_reader
make_partition_snapshot_flat_reader(schema_ptr s,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
@@ -365,7 +333,7 @@ inline flat_mutation_reader
make_partition_snapshot_flat_reader(schema_ptr s,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,

View File

@@ -187,23 +187,49 @@ void merge_versions(const schema& s, mutation_partition& newer, mutation_partiti
newer = std::move(older);
}
void partition_snapshot::merge_partition_versions() {
stop_iteration partition_snapshot::merge_partition_versions() {
partition_version_ref& v = version();
if (!v.is_unique_owner()) {
auto first_used = &*v;
_version = { };
while (first_used->prev() && !first_used->is_referenced()) {
first_used = first_used->prev();
// Shift _version to the oldest unreferenced version and then keep merging left hand side into it.
// This is good for performance because in case we were at the latest version
// we leave it for incoming writes and they don't have to create a new one.
partition_version* current = &*v;
while (current->next() && !current->next()->is_referenced()) {
current = current->next();
_version = partition_version_ref(*current);
}
auto current = first_used->next();
while (current && !current->is_referenced()) {
auto next = current->next();
merge_versions(*_schema, first_used->partition(), std::move(current->partition()), _tracker);
current_allocator().destroy(current);
current = next;
while (auto prev = current->prev()) {
_region.allocator().invalidate_references();
if (current->partition().apply_monotonically(*schema(), std::move(prev->partition()), _tracker, is_preemptible::yes) == stop_iteration::no) {
return stop_iteration::no;
}
if (prev->is_referenced()) {
_version.release();
prev->back_reference() = partition_version_ref(*current, prev->back_reference().is_unique_owner());
current_allocator().destroy(prev);
return stop_iteration::yes;
}
current_allocator().destroy(prev);
}
}
return stop_iteration::yes;
}
stop_iteration partition_snapshot::slide_to_oldest() noexcept {
partition_version_ref& v = version();
if (v.is_unique_owner()) {
return stop_iteration::yes;
}
if (_entry) {
_entry->_snapshot = nullptr;
_entry = nullptr;
}
partition_version* current = &*v;
while (current->next() && !current->next()->is_referenced()) {
current = current->next();
_version = partition_version_ref(*current);
}
return current->prev() ? stop_iteration::no : stop_iteration::yes;
}
unsigned partition_snapshot::version_count()
@@ -460,16 +486,13 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
bool can_move = !pe._snapshot;
auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker);
lw_shared_ptr<partition_snapshot> prev_snp;
partition_snapshot_ptr prev_snp;
if (preemptible) {
// Reads must see prev_snp until whole update completes so that writes
// are not partially visible.
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
}
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
auto merge_dst_snp = defer([preemptible, dst_snp, &reg, &alloc] () mutable {
maybe_merge_versions(dst_snp, reg, alloc);
});
// Once we start updating the partition, we must keep all snapshots until the update completes,
// otherwise partial writes would be published. So the scope of snapshots must enclose the scope
@@ -477,7 +500,6 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
// give the caller a chance to store the coroutine object. The code inside coroutine below
// runs outside allocating section.
return coroutine([&tracker, &s, &alloc, &reg, &acc, can_move, preemptible,
merge_dst_snp = std::move(merge_dst_snp), // needs to go away last so that dst_snp is not owned by anyone else
cur = partition_snapshot_row_cursor(s, *dst_snp),
src_cur = partition_snapshot_row_cursor(s, *src_snp, can_move),
dst_snp = std::move(dst_snp),
@@ -584,7 +606,7 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&
remove_or_mark_as_unique_owner(old_version, &cleaner);
}
lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
partition_snapshot_ptr partition_entry::read(logalloc::region& r,
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
{
if (_snapshot) {
@@ -607,7 +629,7 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
_snapshot = snp.get();
return snp;
return partition_snapshot_ptr(std::move(snp));
}
std::vector<range_tombstone>
@@ -671,3 +693,13 @@ void partition_entry::evict(mutation_cleaner& cleaner) noexcept {
remove_or_mark_as_unique_owner(v, &cleaner);
}
}
partition_snapshot_ptr::~partition_snapshot_ptr() {
if (_snp) {
auto&& cleaner = _snp->cleaner();
auto snp = _snp.release();
if (snp) {
cleaner.merge_and_destroy(*snp.release());
}
}
}

View File

@@ -28,6 +28,7 @@
#include "utils/coroutine.hh"
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/intrusive/slist.hpp>
// This is MVCC implementation for mutation_partitions.
//
@@ -188,8 +189,9 @@ class partition_version_ref {
friend class partition_version;
public:
partition_version_ref() = default;
explicit partition_version_ref(partition_version& pv) noexcept
explicit partition_version_ref(partition_version& pv, bool unique_owner = false) noexcept
: _version(&pv)
, _unique_owner(unique_owner)
{
assert(!_version->_backref);
_version->_backref = this;
@@ -300,8 +302,9 @@ private:
logalloc::region& _region;
mutation_cleaner& _cleaner;
cache_tracker* _tracker;
boost::intrusive::slist_member_hook<> _cleaner_hook;
friend class partition_entry;
friend class mutation_cleaner_impl;
public:
explicit partition_snapshot(schema_ptr s,
logalloc::region& region,
@@ -329,10 +332,17 @@ public:
return container_of(v._backref);
}
// If possible merges the version pointed to by this snapshot with
// If possible, merges the version pointed to by this snapshot with
// adjacent partition versions. Leaves the snapshot in an unspecified state.
// Can be retried if previous merge attempt has failed.
void merge_partition_versions();
stop_iteration merge_partition_versions();
// Prepares the snapshot for cleaning by moving to the right-most unreferenced version.
// Returns stop_iteration::yes if there is nothing to merge with and the snapshot
// should be collected right away, and stop_iteration::no otherwise.
// When returns stop_iteration::no, the snapshots is guaranteed to not be attached
// to the latest version.
stop_iteration slide_to_oldest() noexcept;
~partition_snapshot();
@@ -357,6 +367,7 @@ public:
const schema_ptr& schema() const { return _schema; }
logalloc::region& region() const { return _region; }
cache_tracker* tracker() const { return _tracker; }
mutation_cleaner& cleaner() { return _cleaner; }
tombstone partition_tombstone() const;
::static_row static_row(bool digest_requested) const;
@@ -368,6 +379,36 @@ public:
std::vector<range_tombstone> range_tombstones();
};
class partition_snapshot_ptr {
lw_shared_ptr<partition_snapshot> _snp;
public:
using value_type = partition_snapshot;
partition_snapshot_ptr() = default;
partition_snapshot_ptr(partition_snapshot_ptr&&) = default;
partition_snapshot_ptr(const partition_snapshot_ptr&) = default;
partition_snapshot_ptr(lw_shared_ptr<partition_snapshot> snp) : _snp(std::move(snp)) {}
~partition_snapshot_ptr();
partition_snapshot_ptr& operator=(partition_snapshot_ptr&& other) noexcept {
if (this != &other) {
this->~partition_snapshot_ptr();
new (this) partition_snapshot_ptr(std::move(other));
}
return *this;
}
partition_snapshot_ptr& operator=(const partition_snapshot_ptr& other) noexcept {
if (this != &other) {
this->~partition_snapshot_ptr();
new (this) partition_snapshot_ptr(other);
}
return *this;
}
partition_snapshot& operator*() { return *_snp; }
const partition_snapshot& operator*() const { return *_snp; }
partition_snapshot* operator->() { return &*_snp; }
const partition_snapshot* operator->() const { return &*_snp; }
explicit operator bool() const { return bool(_snp); }
};
class real_dirty_memory_accounter;
// Represents mutation_partition with snapshotting support a la MVCC.
@@ -523,7 +564,7 @@ public:
void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*);
// Snapshots with different values of phase will point to different partition_version objects.
lw_shared_ptr<partition_snapshot> read(logalloc::region& region,
partition_snapshot_ptr read(logalloc::region& region,
mutation_cleaner&,
schema_ptr entry_schema,
cache_tracker*,

View File

@@ -437,14 +437,18 @@ bool range_tombstone_list::equal(const schema& s, const range_tombstone_list& ot
});
}
void range_tombstone_list::apply_monotonically(const schema& s, range_tombstone_list&& list) {
stop_iteration range_tombstone_list::apply_monotonically(const schema& s, range_tombstone_list&& list, is_preemptible preemptible) {
auto del = current_deleter<range_tombstone>();
auto it = list.begin();
while (it != list.end()) {
// FIXME: Optimize by stealing the entry
apply_monotonically(s, *it);
it = list._tombstones.erase_and_dispose(it, del);
if (preemptible && need_preempt()) {
return stop_iteration::no;
}
}
return stop_iteration::yes;
}
void range_tombstone_list::apply_monotonically(const schema& s, const range_tombstone_list& list) {

View File

@@ -24,6 +24,7 @@
#include "range_tombstone.hh"
#include "query-request.hh"
#include "position_in_partition.hh"
#include "utils/preempt.hh"
#include <iosfwd>
class range_tombstone_list final {
@@ -149,7 +150,7 @@ public:
/// Monotonic exception guarantees. In case of failure the object will contain at least as much information as before the call.
/// The other list will be left in a state such that it would still commute with this object to the same state as it
/// would if the call didn't fail.
void apply_monotonically(const schema& s, range_tombstone_list&& list);
stop_iteration apply_monotonically(const schema& s, range_tombstone_list&& list, is_preemptible = is_preemptible::no);
public:
tombstone search_tombstone_covering(const schema& s, const clustering_key_prefix& key) const;
// Returns range of tombstones which overlap with given range

View File

@@ -92,6 +92,11 @@ cache_tracker::~cache_tracker() {
clear();
}
void cache_tracker::set_compaction_scheduling_group(seastar::scheduling_group sg) {
_memtable_cleaner.set_scheduling_group(sg);
_garbage.set_scheduling_group(sg);
}
void
cache_tracker::setup_metrics() {
namespace sm = seastar::metrics;
@@ -933,8 +938,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
real_dirty_memory_accounter real_dirty_acc(m, _tracker);
m.on_detach_from_region_group();
_tracker.region().merge(m); // Now all data in memtable belongs to cache
_tracker.memtable_cleaner().merge(m._memtable_cleaner);
m._cleaner = &_tracker.memtable_cleaner();
_tracker.memtable_cleaner().merge(m._cleaner);
STAP_PROBE(scylla, row_cache_update_start);
auto cleanup = defer([&m, this] {
invalidate_sync(m);

View File

@@ -269,6 +269,7 @@ public:
mutation_cleaner& memtable_cleaner() { return _memtable_cleaner; }
uint64_t partitions() const { return _stats.partitions; }
const stats& get_stats() const { return _stats; }
void set_compaction_scheduling_group(seastar::scheduling_group);
};
inline

View File

@@ -103,7 +103,7 @@ mutation make_incomplete_mutation() {
return mutation(SCHEMA, DK, mutation_partition::make_incomplete(*SCHEMA));
}
static void assert_single_version(lw_shared_ptr<partition_snapshot> snp) {
static void assert_single_version(partition_snapshot_ptr snp) {
BOOST_REQUIRE(snp->at_latest_version());
BOOST_REQUIRE_EQUAL(snp->version_count(), 1);
}
@@ -140,7 +140,7 @@ struct expected_row {
}
};
static void assert_cached_rows(lw_shared_ptr<partition_snapshot> snp, std::deque<expected_row> expected) {
static void assert_cached_rows(partition_snapshot_ptr snp, std::deque<expected_row> expected) {
auto&& rows = snp->version()->partition().clustered_rows();
for (auto&& r : rows) {
BOOST_REQUIRE(!expected.empty());
@@ -173,7 +173,7 @@ struct expected_tombstone {
}
};
static void assert_cached_tombstones(lw_shared_ptr<partition_snapshot> snp, std::deque<range_tombstone> expected) {
static void assert_cached_tombstones(partition_snapshot_ptr snp, std::deque<range_tombstone> expected) {
const range_tombstone_list& rts = snp->version()->partition().row_tombstones();
for (auto&& rt : rts) {
BOOST_REQUIRE(!expected.empty());
@@ -187,7 +187,7 @@ static void assert_cached_tombstones(lw_shared_ptr<partition_snapshot> snp, std:
class cache_tester {
public:
static lw_shared_ptr<partition_snapshot> snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
return rc._read_section(rc._tracker.region(), [&] {
return with_linearized_managed_bytes([&] {
cache_entry& e = rc.find_or_create(dk, {}, rc.phase_of(dk));

View File

@@ -29,6 +29,7 @@
#include "partition_version.hh"
#include "partition_snapshot_row_cursor.hh"
#include "partition_snapshot_reader.hh"
#include "tests/test-utils.hh"
#include "tests/mutation_assertions.hh"
@@ -169,7 +170,7 @@ public:
partition_snapshot::phase_type phase() const { return _phase; }
real_dirty_memory_accounter& accounter() { return _acc; }
mutation_partition squashed(lw_shared_ptr<partition_snapshot>& snp) {
mutation_partition squashed(partition_snapshot_ptr& snp) {
logalloc::allocating_section as;
return as(_tracker.region(), [&] {
return snp->squashed();
@@ -220,7 +221,7 @@ public:
});
}
lw_shared_ptr<partition_snapshot> read() {
partition_snapshot_ptr read() {
logalloc::allocating_section as;
return as(region(), [&] {
return _e.read(region(), _container.cleaner(), schema(), &_container.tracker(), _container.phase());
@@ -466,7 +467,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
m1.partition().make_fully_continuous();
e += m1;
lw_shared_ptr<partition_snapshot> snap;
partition_snapshot_ptr snap;
if (with_active_reader) {
snap = e.read();
}
@@ -869,12 +870,11 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
auto snap2 = e.read(r, cleaner, s, nullptr);
snap1->merge_partition_versions();
snap1 = {};
snap2->merge_partition_versions();
snap2 = {};
cleaner.drain().get();
BOOST_REQUIRE_EQUAL(1, boost::size(e.versions()));
assert_that(s, e.squashed(*s)).is_equal_to((m1 + m2).partition());
}
@@ -890,12 +890,11 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
auto snap2 = e.read(r, cleaner, s, nullptr);
snap2->merge_partition_versions();
snap2 = {};
snap1->merge_partition_versions();
snap1 = {};
cleaner.drain().get();
BOOST_REQUIRE_EQUAL(1, boost::size(e.versions()));
assert_that(s, e.squashed(*s)).is_equal_to((m1 + m2).partition());
}

View File

@@ -148,11 +148,27 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
float((prev_compacted - prefill_compacted)) / (prev_allocated - prefill_allocated)
);
// Create a reader which tests the case of memtable snapshots
// going away after memtable was merged to cache.
auto rd = std::make_unique<flat_mutation_reader>(
make_combined_reader(s, cache.make_reader(s), mt->make_flat_reader(s)));
rd->set_max_buffer_size(1);
rd->fill_buffer().get();
scheduling_latency_measurer slm;
slm.start();
auto d = duration_in_seconds([&] {
cache.update([] {}, *mt).get();
});
rd->set_max_buffer_size(1024*1024);
rd->consume_pausable([] (mutation_fragment) {
return stop_iteration::no;
}).get();
mt = {};
rd = {};
slm.stop();
auto compacted = logalloc::memory_compacted() - prev_compacted;
@@ -282,7 +298,9 @@ int main(int argc, char** argv) {
test_small_partitions();
test_partition_with_few_small_rows();
test_partition_with_lots_of_small_rows();
test_partition_with_lots_of_range_tombstones();
// Takes a huge amount of time due to https://github.com/scylladb/scylla/issues/2581#issuecomment-398030186,
// disable until fixed.
// test_partition_with_lots_of_range_tombstones();
});
});
}

27
utils/preempt.hh Normal file
View File

@@ -0,0 +1,27 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/util/bool_class.hh>
class is_preemptible_tag;
using is_preemptible = bool_class<is_preemptible_tag>;