Merge "Improved cache preload"

I made the mistake of running scylla on a spinning disk.  Since a disk
can serve about 100 reads/second, that set the tone for the whole benchmark.

Fix by improving cache preload when flushing a memtable.  If we can detect
that a mutation is not part of any sstable (other than the one we just wrote),
we can add insert it into the cache.

After this, running a mixed cassandra-stress returns the expected results,
even on a spinning disk.
This commit is contained in:
Avi Kivity
2015-08-10 12:40:35 +03:00
6 changed files with 44 additions and 11 deletions

View File

@@ -58,6 +58,17 @@ column_family::column_family(schema_ptr schema, config config, no_commitlog cl)
add_memtable();
}
negative_mutation_reader
column_family::make_negative_mutation_reader(lw_shared_ptr<sstable_list> old_sstables) {
return [this, old_sstables = std::move(old_sstables)] (const partition_key& key) {
for (auto&& s : *old_sstables) {
if (s.second->filter_has_key(*_schema, key)) {
return negative_mutation_reader_result::maybe_exists;
}
}
return negative_mutation_reader_result::definitely_doesnt_exists;
};
}
mutation_source
column_family::sstables_as_mutation_source() {
@@ -369,9 +380,11 @@ void column_family::add_memtable() {
}
future<>
column_family::update_cache(memtable& m) {
column_family::update_cache(memtable& m, lw_shared_ptr<sstable_list> old_sstables) {
if (_config.enable_cache) {
return _cache.update(m);
// be careful to use the old sstable list, since the new one will hit every
// mutation in m.
return _cache.update(m, make_negative_mutation_reader(std::move(old_sstables)));
} else {
return make_ready_future<>();
}
@@ -417,8 +430,9 @@ column_family::seal_active_memtable() {
dblog.debug("Flushing done");
// We must add sstable before we call update_cache(), because
// memtable's data after moving to cache can be evicted at any time.
auto old_sstables = _sstables;
add_sstable(std::move(newtab));
return update_cache(*old);
return update_cache(*old, std::move(old_sstables));
}).then_wrapped([this, old] (future<> ret) {
try {
ret.get();
@@ -1319,4 +1333,4 @@ future<> column_family::flush() {
_stats.memtable_switch_count++;
return make_ready_future<>();
});
}
}

View File

@@ -123,7 +123,7 @@ private:
void update_stats_for_new_sstable(uint64_t new_sstable_data_size);
void add_sstable(sstables::sstable&& sstable);
void add_memtable();
future<> update_cache(memtable&);
future<> update_cache(memtable&, lw_shared_ptr<sstable_list> old_sstables);
struct merge_comparator;
private:
// Creates a mutation reader which covers sstables.
@@ -132,6 +132,7 @@ private:
mutation_reader make_sstable_reader(const query::partition_range& range) const;
mutation_source sstables_as_mutation_source();
negative_mutation_reader make_negative_mutation_reader(lw_shared_ptr<sstable_list> old_sstables);
public:
// Creates a mutation reader which covers all data sources for this column family.
// Caller needs to ensure that column_family remains live (FIXME: relax this).

View File

@@ -89,3 +89,16 @@ future<> consume(mutation_reader& reader, Consumer consumer) {
// can be queried multiple times and in parallel. For each query it returns
// independent mutation_reader.
using mutation_source = std::function<mutation_reader(const query::partition_range& range)>;
/// A negative_mutation_reader quickly returns whether a key is known not to exist
/// in a data source (it may return false positives, but not false negatives).
enum class negative_mutation_reader_result {
definitely_doesnt_exists,
maybe_exists
};
using negative_mutation_reader = std::function<negative_mutation_reader_result (const partition_key& key)>;
inline
negative_mutation_reader make_default_negative_mutation_reader() {
return [] (const partition_key& key) { return negative_mutation_reader_result::maybe_exists; };
}

View File

@@ -145,9 +145,9 @@ void row_cache::populate(const mutation& m) {
});
}
future<> row_cache::update(memtable& m) {
future<> row_cache::update(memtable& m, negative_mutation_reader underlying_negative) {
_tracker.region().merge(m._region); // Now all data in memtable belongs to cache
with_allocator(_tracker.allocator(), [this, &m] {
with_allocator(_tracker.allocator(), [this, &m, underlying_negative = std::move(underlying_negative)] {
auto i = m.partitions.begin();
const schema& s = *m.schema();
while (i != m.partitions.end()) {
@@ -155,14 +155,16 @@ future<> row_cache::update(memtable& m) {
// FIXME: Optimize knowing we lookup in-order.
auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema));
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
// FIXME: if the bloom filters say the data isn't in any sstable yet (other than the
// one we are caching now), we can.
// Alternatively, keep a bitmap indicating which sstables we do cover, so we don't have to
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
// search it.
if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) {
cache_entry& entry = *cache_i;
_tracker.touch(entry);
entry.partition().apply(s, std::move(mem_e.partition()));
} else if (underlying_negative(mem_e.key().key()) == negative_mutation_reader_result::definitely_doesnt_exists) {
cache_entry* entry = current_allocator().construct<cache_entry>(mem_e.key(), mem_e.partition());
_tracker.insert(*entry);
_partitions.insert(cache_i, *entry);
}
i = m.partitions.erase(i);
current_allocator().destroy(&mem_e);

View File

@@ -148,5 +148,5 @@ public:
// has just been flushed to the underlying data source.
// The memtable can be queried during the process, but must not be written.
// After the update is complete, memtable is empty.
future<> update(memtable&);
future<> update(memtable&, negative_mutation_reader underlying_negative);
};

View File

@@ -370,6 +370,9 @@ private:
void write_range_tombstone(file_writer& out, const composite& clustering_prefix, std::vector<bytes_view> suffix, const tombstone t);
void write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation::view collection);
public:
bool filter_has_key(const schema& s, const partition_key& key) {
return filter_has_key(key::from_partition_key(s, key));
}
// Allow the test cases from sstable_test.cc to test private methods. We use
// a placeholder to avoid cluttering this class too much. The sstable_test class
// will then re-export as public every method it needs.