row_cache: Improve safety of cache updates

Cache imposes requirements on how updates to the on-disk mutation source
are made:
  1) each change to the on-disk muation source must be followed
     by cache synchronization reflecting that change
  2) The two must be serialized with other synchronizations
  3) must have strong failure guarantees (atomicity)

Because of that, sstable list update and cache synchronization must be
done under a lock, and cache synchronization cannot fail to synchronize.

Normally cache synchronization achieves no-failure thing by wiping the
cache (which is noexcept) in case failure is detect. There are some
setup steps hoever which cannot be skipped, e.g. taking a lock
followed by switching cache to use the new snapshot. That truly cannot
fail.  The lock inside cache synchronizers is redundant, since the
user needs to take it anyway around the combined operation.

In order to make ensuring strong exception guarantees easier, and
making the cache interface easier to use correctly, this patch moves
the control of the combined update into the cache. This is done by
having cache::update() et al accept a callback (external_updater)
which is supposed to perform modiciation of the underlying mutation
source when invoked.

This is in-line with the layering. Cache is layered on top of the
on-disk mutation source (it wraps it) and reading has to go through
cache. After the patch, modification also goes through cache. This way
more of cache's requirements can be confined to its implementation.

The failure semantics of update() and other synchronizers needed to
change due to strong exception guaratnees. Now if it fails, it means
the update was not performed, neither to the cache nor to the
underlying mutation source.

The database::_cache_update_sem goes away, serialization is done
internally by the cache.

The external_updater needs to have strong exception guarantees. This
requirement is not new. It is however currently violated in some
places. This patch marks those callbacks as noexcept and leaves a
FIXME. Those should be fixed, but that's not in the scope of this
patch. Aborting is still better than corrupting the state.

Fixes #2754.

Also fixes the following test failure:

  tests/row_cache_test.cc(949): fatal error: in "test_update_failure": critical check it->second.equal(*s, mopt->partition()) has failed

which started to trigger after commit 318423d50b. Thread stack
allocation may fail, in which case we did not do the necessary
invalidation.
This commit is contained in:
Tomasz Grabiec
2017-08-29 16:03:16 +02:00
parent b0f3efa577
commit d22fdf4261
8 changed files with 215 additions and 162 deletions

View File

@@ -803,11 +803,18 @@ void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable, std::v
}
future<>
column_family::update_cache(memtable& m) {
column_family::update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst) {
auto adder = [this, m, sst] {
auto newtab_ms = sst->as_mutation_source();
add_sstable(sst, {engine().cpu_id()});
m->mark_flushed(std::move(newtab_ms));
try_trigger_compaction();
};
if (_config.enable_cache) {
return _cache.update(m);
return _cache.update(adder, *m);
} else {
return m.clear_gently();
adder();
return m->clear_gently();
}
}
@@ -875,16 +882,16 @@ column_family::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
return write_memtable_to_sstable(*old, newtab, std::move(sstable_write_permit), incremental_backups_enabled(), priority, false, _config.background_writer_scheduling_group).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
return with_semaphore(_cache_update_sem, 1, [this, newtab, old] {
auto adder = [this, newtab] {
add_sstable(newtab, {engine().cpu_id()});
trigger_compaction();
// Cache synchronization must be started atomically with add_sstable()
if (_config.enable_cache) {
return _cache.update_invalidating(*old);
} else {
return old->clear_gently();
}
});
try_trigger_compaction();
};
if (_config.enable_cache) {
return _cache.update_invalidating(adder, *old);
} else {
adder();
return old->clear_gently();
}
}).handle_exception([old, permit = std::move(permit)] (auto ep) {
dblog.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
@@ -1009,37 +1016,20 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstabl
auto&& priority = service::get_local_memtable_flush_priority();
return write_memtable_to_sstable(*old, newtab, std::move(permit), incremental_backups_enabled(), priority, false, _config.memtable_scheduling_group).then([this, newtab, old] {
return newtab->open_data();
}).then_wrapped([this, old, newtab] (future<> ret) {
}).then([this, old, newtab] () {
dblog.debug("Flushing to {} done", newtab->get_filename());
try {
ret.get();
return with_semaphore(_cache_update_sem, 1, [this, old, newtab] {
add_sstable(newtab, {engine().cpu_id()});
old->mark_flushed(newtab->as_mutation_source());
trigger_compaction();
return update_cache(*old);
}).then_wrapped([this, newtab, old] (future<> f) {
try {
f.get();
} catch(...) {
dblog.error("failed to move memtable for {} to cache: {}", newtab->get_filename(), std::current_exception());
}
_memtables->erase(old);
dblog.debug("Memtable for {} replaced", newtab->get_filename());
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
} catch (...) {
newtab->mark_for_deletion();
dblog.error("failed to write sstable {}: {}", newtab->get_filename(), std::current_exception());
// If we failed this write we will try the write again and that will create a new flush reader
// that will decrease dirty memory again. So we need to reset the accounting.
old->revert_flushed_memory();
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return update_cache(old, newtab);
}).then([this, old, newtab] () noexcept {
_memtables->erase(old);
dblog.debug("Memtable for {} replaced", newtab->get_filename());
return stop_iteration::yes;
}).handle_exception([this, old, newtab] (auto e) {
newtab->mark_for_deletion();
dblog.error("failed to write sstable {}: {}", newtab->get_filename(), e);
// If we failed this write we will try the write again and that will create a new flush reader
// that will decrease dirty memory again. So we need to reset the accounting.
old->revert_flushed_memory();
return stop_iteration::no;
});
}
@@ -1796,16 +1786,14 @@ future<> distributed_loader::load_new_sstables(distributed<database>& db, sstrin
}).then([&db, ks, cf] {
return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) {
auto& cf = db.find_column_family(ks, cfname);
return with_semaphore(cf._cache_update_sem, 1, [&cf] {
return cf.get_row_cache().invalidate([&cf] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
// atomically load all opened sstables into column family.
for (auto& sst : cf._sstables_opened_but_not_loaded) {
cf.load_sstable(sst, true);
}
cf._sstables_opened_but_not_loaded.clear();
cf.trigger_compaction();
// Drop entire cache for this column family because it may be populated
// with stale data.
return cf.get_row_cache().invalidate();
});
});
}).then([&db, ks, cf] () mutable {
@@ -1838,9 +1826,9 @@ future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<da
}
return cf.open_sstable(std::move(info), sstdir, comps.generation, comps.version, comps.format).then([&cf] (sstables::shared_sstable sst) mutable {
if (sst) {
return with_semaphore(cf._cache_update_sem, 1, [&cf, sst] {
return cf.get_row_cache().invalidate([&cf, sst = std::move(sst)] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
cf.load_sstable(sst);
return cf.get_row_cache().invalidate();
});
}
return make_ready_future<>();
@@ -3852,13 +3840,13 @@ future<> column_family::flush_streaming_mutations(utils::UUID plan_id, dht::part
return _streaming_memtables->seal_active_memtable_delayed().then([this] {
return _streaming_flush_phaser.advance_and_await();
}).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
return with_semaphore(_cache_update_sem, 1, [this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
return _cache.invalidate([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
for (auto&& sst : sstables) {
// seal_active_streaming_memtable_big() ensures sst is unshared.
this->add_sstable(sst, {engine().cpu_id()});
}
this->trigger_compaction();
return _cache.invalidate(std::move(ranges));
this->try_trigger_compaction();
});
});
});
@@ -3908,7 +3896,7 @@ future<> column_family::clear() {
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
_streaming_memtables_big.clear();
return _cache.invalidate();
return _cache.invalidate([] { /* There is no underlying mutation source */ });
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -3917,32 +3905,42 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
assert(_compaction_disabled > 0);
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
return with_semaphore(_cache_update_sem, 1, [this] () mutable {
db::replay_position rp;
auto gc_trunc = to_gc_clock(truncated_at);
struct pruner {
column_family& cf;
db::replay_position rp;
std::vector<sstables::shared_sstable> remove;
auto pruned = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
std::vector<sstables::shared_sstable> remove;
pruner(column_family& cf)
: cf(cf) {}
for (auto&p : *_sstables->all()) {
if (p->max_data_age() <= gc_trunc) {
rp = std::max(p->get_stats_metadata().position, rp);
remove.emplace_back(p);
continue;
void prune(db_clock::time_point truncated_at) {
auto gc_trunc = to_gc_clock(truncated_at);
auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema));
for (auto& p : *cf._sstables->all()) {
if (p->max_data_age() <= gc_trunc) {
rp = std::max(p->get_stats_metadata().position, rp);
remove.emplace_back(p);
continue;
}
pruned->insert(p);
}
cf._sstables = std::move(pruned);
}
pruned->insert(p);
}
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
return _cache.invalidate().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
};
auto p = make_lw_shared<pruner>(*this);
return _cache.invalidate([p, truncated_at] {
p->prune(truncated_at);
dblog.debug("cleaning out row cache");
}).then([p]() mutable {
return parallel_for_each(p->remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
}).then([p] {
return make_ready_future<db::replay_position>(p->rp);
});
});
});
});
}

View File

@@ -415,7 +415,6 @@ private:
utils::phased_barrier _flush_barrier;
seastar::gate _streaming_flush_gate;
std::vector<view_ptr> _views;
semaphore _cache_update_sem{1};
std::unique_ptr<cell_locker> _counter_cell_locks;
void set_metrics();
@@ -453,7 +452,8 @@ private:
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
future<> update_cache(memtable&);
// Caller must keep m alive.
future<> update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst);
struct merge_comparator;
// update the sstable generation, making sure that new new sstables don't overwrite this one.

View File

@@ -762,19 +762,21 @@ row_cache::phase_type row_cache::phase_of(dht::ring_position_view pos) {
}
template <typename Updater>
future<> row_cache::do_update(memtable& m, Updater updater) {
future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater) {
return do_update(std::move(eu), [this, &m, updater = std::move(updater)] {
m.on_detach_from_region_group();
_tracker.region().merge(m); // Now all data in memtable belongs to cache
STAP_PROBE(scylla, row_cache_update_start);
auto cleanup = defer([&m, this] {
invalidate_sync(m);
STAP_PROBE(scylla, row_cache_update_end);
});
auto attr = seastar::thread_attributes();
attr.scheduling_group = &_update_thread_scheduling_group;
STAP_PROBE(scylla, row_cache_update_start);
auto t = seastar::thread(attr, [this, &m, updater = std::move(updater)] () mutable {
auto cleanup = defer([&] { invalidate_sync(m); });
auto permit = get_units(_update_sem, 1).get0();
++_underlying_phase;
_prev_snapshot = std::exchange(_underlying, _snapshot_source());
_prev_snapshot_pos = dht::ring_position::min();
auto cleanup_prev_snapshot = defer([this] {
// In case updater fails, we must bring the cache to consistency without deferring.
auto cleanup = defer([&m, this] {
invalidate_sync(m);
_prev_snapshot_pos = {};
_prev_snapshot = {};
});
@@ -823,14 +825,14 @@ future<> row_cache::do_update(memtable& m, Updater updater) {
seastar::thread::yield();
}
});
STAP_PROBE(scylla, row_cache_update_end);
return do_with(std::move(t), [] (seastar::thread& t) {
return t.join();
});
}).then([cleanup = std::move(cleanup)] {});
});
}
future<> row_cache::update(memtable& m) {
return do_update(m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
future<> row_cache::update(external_updater eu, memtable& m) {
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
partition_presence_checker& is_present) mutable {
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
@@ -851,8 +853,8 @@ future<> row_cache::update(memtable& m) {
});
}
future<> row_cache::update_invalidating(memtable& m) {
return do_update(m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
future<> row_cache::update_invalidating(external_updater eu, memtable& m) {
return do_update(std::move(eu), m, [this] (row_cache::partitions_type::iterator cache_i, memtable_entry& mem_e,
partition_presence_checker& is_present) {
if (cache_i != partitions_end() && cache_i->key().equal(*_schema, mem_e.key())) {
// FIXME: Invalidate only affected row ranges.
@@ -895,26 +897,25 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
}
}
future<> row_cache::invalidate(const dht::decorated_key& dk) {
return invalidate(dht::partition_range::make_singular(dk));
future<> row_cache::invalidate(external_updater eu, const dht::decorated_key& dk) {
return invalidate(std::move(eu), dht::partition_range::make_singular(dk));
}
future<> row_cache::invalidate(const dht::partition_range& range) {
return invalidate(dht::partition_range_vector({range}));
future<> row_cache::invalidate(external_updater eu, const dht::partition_range& range) {
return invalidate(std::move(eu), dht::partition_range_vector({range}));
}
future<> row_cache::invalidate(dht::partition_range_vector&& ranges) {
return get_units(_update_sem, 1).then([this, ranges = std::move(ranges)] (auto permit) mutable {
_underlying = _snapshot_source();
++_underlying_phase;
auto on_failure = defer([this] { this->clear_now(); });
with_linearized_managed_bytes([&] {
for (auto&& range : ranges) {
this->invalidate_unwrapped(range);
}
});
on_failure.cancel();
});
future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&& ranges) {
return do_update(std::move(eu), [this, ranges = std::move(ranges)] {
auto on_failure = defer([this] { this->clear_now(); });
with_linearized_managed_bytes([&] {
for (auto&& range : ranges) {
this->invalidate_unwrapped(range);
}
});
on_failure.cancel();
return make_ready_future<>();
});
}
void row_cache::evict(const dht::partition_range& range) {
@@ -1026,6 +1027,29 @@ std::ostream& operator<<(std::ostream& out, row_cache& rc) {
return out;
}
future<> row_cache::do_update(row_cache::external_updater eu, row_cache::internal_updater iu) noexcept {
return futurize_apply([this] {
return get_units(_update_sem, 1);
}).then([this, eu = std::move(eu), iu = std::move(iu)] (auto permit) mutable {
auto pos = dht::ring_position::min();
eu();
[&] () noexcept {
_prev_snapshot_pos = std::move(pos);
_prev_snapshot = std::exchange(_underlying, _snapshot_source());
++_underlying_phase;
}();
return futurize_apply([&iu] {
return iu();
}).then_wrapped([this, permit = std::move(permit)] (auto f) {
_prev_snapshot_pos = {};
_prev_snapshot = {};
if (f.failed()) {
clogger.warn("Failure during cache update: {}", f.get_exception());
}
});
});
}
std::ostream& operator<<(std::ostream& out, cache_entry& e) {
return out << "{cache_entry: " << e.position()
<< ", cont=" << e.continuous()

View File

@@ -254,9 +254,11 @@ cache_tracker& global_cache_tracker();
//
// Cache populates itself automatically during misses.
//
// Cache represents a snapshot of the underlying mutation source. When the
// underlying mutation source changes, cache needs to be explicitly synchronized
// to the latest snapshot. This is done by calling update() or invalidate().
// All updates to the underlying mutation source must be performed through one of the synchronizing methods.
// Those are the methods which accept external_updater, e.g. update(), invalidate().
// All synchronizers have strong exception guarantees. If they fail, the set of writes represented by
// cache didn't change.
// Synchronizers can be invoked concurrently with each other and other operations on cache.
//
class row_cache final {
public:
@@ -273,6 +275,12 @@ public:
friend class cache::read_context;
friend class partition_range_cursor;
friend class cache_tester;
// A function which adds new writes to the underlying mutation source.
// All invocations of external_updater on given cache instance are serialized internally.
// Must have strong exception guarantees. If throws, the underlying mutation source
// must be left in the state in which it was before the call.
using external_updater = std::function<void()>;
public:
struct stats {
utils::timed_rate_moving_average hits;
@@ -403,10 +411,25 @@ private:
// It is invoked inside allocating section and in the context of cache's allocator.
// All memtable entries will be removed.
template <typename Updater>
future<> do_update(memtable& m, Updater func);
future<> do_update(external_updater, memtable& m, Updater func);
// Clears given memtable invalidating any affected cache elements.
void invalidate_sync(memtable&) noexcept;
// A function which updates cache to the current snapshot.
// It's responsible for advancing _prev_snapshot_pos between deferring points.
//
// Must have strong failure guarantees. Upon failure, it should still leave the cache
// in a state consistent with the update it is performing.
using internal_updater = std::function<future<>()>;
// Atomically updates the underlying mutation source and synchronizes the cache.
//
// Strong failure guarantees. If returns a failed future, the underlying mutation
// source was and cache are not modified.
//
// internal_updater is only kept alive until its invocation returns.
future<> do_update(external_updater eu, internal_updater iu) noexcept;
public:
~row_cache();
row_cache(schema_ptr, snapshot_source, cache_tracker&, is_continuous = is_continuous::no);
@@ -437,13 +460,13 @@ public:
// has just been flushed to the underlying data source.
// The memtable can be queried during the process, but must not be written.
// After the update is complete, memtable is empty.
future<> update(memtable&);
future<> update(external_updater, memtable&);
// Like update(), synchronizes cache with an incremental change to the underlying
// mutation source, but instead of inserting and merging data, invalidates affected ranges.
// Can be thought of as a more fine-grained version of invalidate(), which invalidates
// as few elements as possible.
future<> update_invalidating(memtable&);
future<> update_invalidating(external_updater, memtable&);
// Refreshes snapshot. Must only be used if logical state in the underlying data
// source hasn't changed.
@@ -465,9 +488,9 @@ public:
// Guarantees that readers created after invalidate()
// completes will see all writes from the underlying
// mutation source made prior to the call to invalidate().
future<> invalidate(const dht::decorated_key&);
future<> invalidate(const dht::partition_range& = query::full_partition_range);
future<> invalidate(dht::partition_range_vector&&);
future<> invalidate(external_updater, const dht::decorated_key&);
future<> invalidate(external_updater, const dht::partition_range& = query::full_partition_range);
future<> invalidate(external_updater, dht::partition_range_vector&&);
// Evicts entries from given range in cache.
//

View File

@@ -96,7 +96,7 @@ int main(int argc, char** argv) {
}
if (update_cache) {
cache.update(*mt).get();
cache.update([] {}, *mt).get();
}
}, 5, 1);
});

View File

@@ -181,7 +181,7 @@ int main(int argc, char** argv) {
fragment_free_space();
cache.update(*mt).get();
cache.update([] {}, *mt).get();
stuffing.clear();
cache_stuffing.clear();

View File

@@ -95,9 +95,10 @@ struct table {
auto flushed = make_lw_shared<memtable>(s.schema());
flushed->apply(*prev_mt).get();
prev_mt->mark_flushed(flushed->as_data_source());
underlying.apply(flushed);
test_log.trace("updating cache");
cache.update(*prev_mt).get();
cache.update([&] {
underlying.apply(flushed);
}, *prev_mt).get();
test_log.trace("flush done");
prev_mt = {};
}

View File

@@ -458,7 +458,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
test(ds, query::full_partition_range, partitions.size() + 1);
test(ds, query::full_partition_range, partitions.size() + 1);
cache->invalidate(key_after_all);
cache->invalidate([] {}, key_after_all);
assert_that(ds(s, query::full_partition_range))
.produces(slice(partitions, query::full_partition_range))
@@ -623,8 +623,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) {
// Merge m2 into cache
auto mt = make_lw_shared<memtable>(gen.schema());
mt->apply(m2);
underlying.apply(m2);
cache.update(*mt).get();
cache.update([&] { underlying.apply(m2); }, *mt).get();
auto rd2 = cache.make_reader(gen.schema());
auto sm2 = rd2().get0();
@@ -651,8 +650,9 @@ SEASTAR_TEST_CASE(test_random_partition_population) {
.produces(m1)
.produces_end_of_stream();
underlying.apply(m2);
cache.invalidate().get();
cache.invalidate([&] {
underlying.apply(m2);
}).get();
auto pr = dht::partition_range::make_singular(m2.decorated_key());
assert_that(cache.make_reader(gen.schema(), pr))
@@ -775,13 +775,12 @@ SEASTAR_TEST_CASE(test_single_partition_update) {
}
auto mt = make_lw_shared<memtable>(s);
{
cache.update([&] {
mutation m(pk, s);
m.set_clustered_cell(ck3, "v", data_value(101), 1);
mt->apply(m);
cache_mt.apply(m);
}
cache.update(*mt).get();
}, *mt).get();
{
auto reader = cache.make_reader(s, range);
@@ -820,7 +819,7 @@ SEASTAR_TEST_CASE(test_update) {
mt->apply(m);
}
cache.update(*mt).get();
cache.update([] {}, *mt).get();
for (auto&& key : keys_not_in_cache) {
verify_has(cache, key);
@@ -842,9 +841,10 @@ SEASTAR_TEST_CASE(test_update) {
auto m = make_new_mutation(s);
keys_not_in_cache.push_back(m.decorated_key());
mt2->apply(m);
cache.invalidate([] {}, m.decorated_key()).get();
}
cache.update(*mt2).get();
cache.update([] {}, *mt2).get();
for (auto&& key : keys_not_in_cache) {
verify_does_not_have(cache, key);
@@ -861,7 +861,7 @@ SEASTAR_TEST_CASE(test_update) {
mt3->apply(m);
}
cache.update(*mt3).get();
cache.update([] {}, *mt3).get();
for (auto&& m : new_mutations) {
verify_has(cache, m);
@@ -881,14 +881,16 @@ SEASTAR_TEST_CASE(test_update_failure) {
int partition_count = 1000;
// populate cache with some partitions
using partitions_type = std::map<partition_key, mutation_partition, partition_key::less_compare>;
auto original_partitions = partitions_type(partition_key::less_compare(*s));
for (int i = 0; i < partition_count / 2; i++) {
auto m = make_new_mutation(s, i + partition_count / 2);
original_partitions.emplace(m.key(), m.partition());
cache.populate(m);
}
// populate memtable with more updated partitions
auto mt = make_lw_shared<memtable>(s);
using partitions_type = std::map<partition_key, mutation_partition, partition_key::less_compare>;
auto updated_partitions = partitions_type(partition_key::less_compare(*s));
for (int i = 0; i < partition_count; i++) {
auto m = make_new_large_mutation(s, i);
@@ -910,7 +912,8 @@ SEASTAR_TEST_CASE(test_update_failure) {
}
auto ev = tracker.region().evictor();
tracker.region().make_evictable([ev, evicitons_left = int(10)] () mutable {
int evicitons_left = 10;
tracker.region().make_evictable([&] () mutable {
if (evicitons_left == 0) {
return memory::reclaiming_result::reclaimed_nothing;
}
@@ -918,27 +921,35 @@ SEASTAR_TEST_CASE(test_update_failure) {
return ev();
});
bool failed = false;
try {
cache.update(*mt).get();
BOOST_FAIL("updating cache should have failed");
cache.update([] { }, *mt).get();
} catch (const std::bad_alloc&) {
// expected
failed = true;
}
BOOST_REQUIRE(!evicitons_left); // should have happened
memory_hog.clear();
// verify that there are no stale partitions
auto reader = cache.make_reader(s, query::full_partition_range);
for (int i = 0; i < partition_count; i++) {
auto mopt = mutation_from_streamed_mutation(reader().get0()).get0();
if (!mopt) {
break;
auto has_only = [&] (const partitions_type& partitions) {
auto reader = cache.make_reader(s, query::full_partition_range);
for (int i = 0; i < partition_count; i++) {
auto mopt = mutation_from_streamed_mutation(reader().get0()).get0();
if (!mopt) {
break;
}
auto it = partitions.find(mopt->key());
BOOST_REQUIRE(it != partitions.end());
BOOST_REQUIRE(it->second.equal(*s, mopt->partition()));
}
auto it = updated_partitions.find(mopt->key());
BOOST_REQUIRE(it != updated_partitions.end());
BOOST_REQUIRE(it->second.equal(*s, mopt->partition()));
BOOST_REQUIRE(!reader().get0());
};
if (failed) {
has_only(original_partitions);
} else {
has_only(updated_partitions);
}
BOOST_REQUIRE(!reader().get0());
});
}
#endif
@@ -1062,7 +1073,7 @@ SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) {
rd.produces(ring[0]);
// Invalidate ring[2] and ring[3]
cache.invalidate(dht::partition_range::make_starting_with({ ring[2].ring_position(), true })).get();
cache.invalidate([] {}, dht::partition_range::make_starting_with({ ring[2].ring_position(), true })).get();
// Continue previous reader.
rd.produces(ring[1])
@@ -1077,7 +1088,7 @@ SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) {
.produces(ring[2]);
// Invalidate whole cache.
cache.invalidate().get();
cache.invalidate([] {}).get();
rd.produces(ring[3])
.produces_end_of_stream();
@@ -1128,10 +1139,8 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
sleep(10ms).get();
memtables.apply(*mt2);
// This update should miss on all partitions
auto update_future = cache.update(*mt2);
auto update_future = cache.update([&] { memtables.apply(*mt2); }, *mt2);
auto rd3 = cache.make_reader(s);
@@ -1191,7 +1200,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
auto some_element = keys_in_cache.begin() + 547;
std::vector<dht::decorated_key> keys_not_in_cache;
keys_not_in_cache.push_back(*some_element);
cache.invalidate(*some_element).get();
cache.invalidate([] {}, *some_element).get();
keys_in_cache.erase(some_element);
for (auto&& key : keys_in_cache) {
@@ -1211,7 +1220,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
{ *some_range_begin, true }, { *some_range_end, false }
);
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
cache.invalidate(range).get();
cache.invalidate([] {}, range).get();
keys_in_cache.erase(some_range_begin, some_range_end);
for (auto&& key : keys_in_cache) {
@@ -1255,11 +1264,11 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
sleep(10ms).get();
memtables.clear();
memtables.apply(*mt2);
// This update should miss on all partitions
auto cache_cleared = cache.invalidate();
auto cache_cleared = cache.invalidate([&] {
memtables.clear();
memtables.apply(*mt2);
});
auto rd2 = cache.make_reader(s);
@@ -1328,8 +1337,7 @@ SEASTAR_TEST_CASE(test_mvcc) {
BOOST_REQUIRE(*mt1_reader_sm_opt);
}
underlying.apply(*mt1);
cache.update(*mt1).get();
cache.update([&] { underlying.apply(*mt1); }, *mt1).get();
auto sm3 = cache.make_reader(s)().get0();
BOOST_REQUIRE(sm3);
@@ -1358,7 +1366,7 @@ SEASTAR_TEST_CASE(test_mvcc) {
auto m_1 = mutation_from_streamed_mutation(std::move(sm1)).get0();
assert_that(*m_1).is_equal_to(m1);
cache.invalidate().get0();
cache.invalidate([] {}).get0();
auto m_2 = mutation_from_streamed_mutation(std::move(sm2)).get0();
assert_that(*m_2).is_equal_to(m1);
@@ -1412,7 +1420,7 @@ SEASTAR_TEST_CASE(test_slicing_mutation_reader) {
row_cache cache(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);
auto run_tests = [&] (auto& ps, std::deque<int> expected) {
cache.invalidate().get0();
cache.invalidate([] {}).get0();
auto reader = cache.make_reader(s, query::full_partition_range, ps);
test_sliced_read_row_presence(std::move(reader), s, expected);
@@ -1426,7 +1434,7 @@ SEASTAR_TEST_CASE(test_slicing_mutation_reader) {
reader = cache.make_reader(s, singular_range, ps);
test_sliced_read_row_presence(std::move(reader), s, expected);
cache.invalidate().get0();
cache.invalidate([] {}).get0();
reader = cache.make_reader(s, singular_range, ps);
test_sliced_read_row_presence(std::move(reader), s, expected);
@@ -1572,8 +1580,7 @@ SEASTAR_TEST_CASE(test_update_invalidating) {
mt->apply(m4);
mt->apply(m5);
underlying.apply(*mt);
cache.update_invalidating(*mt).get();
cache.update_invalidating([&] { underlying.apply(*mt); }, *mt).get();
assert_that(cache.make_reader(s.schema()))
.produces(m5)