diff --git a/row_cache.cc b/row_cache.cc index a603de3207..e8151d58af 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -120,17 +120,17 @@ row_cache::make_reader(const query::partition_range& range) { } return _read_section(_tracker.region(), [&] { - const dht::decorated_key& dk = pos.as_decorated_key(); - auto i = _partitions.find(dk, cache_entry::compare(_schema)); - if (i != _partitions.end()) { - cache_entry& e = *i; - _tracker.touch(e); - ++_stats.hits; - return make_reader_returning(mutation(_schema, dk, e.partition())); - } else { - ++_stats.misses; - return make_mutation_reader(*this, _underlying(range)); - } + const dht::decorated_key& dk = pos.as_decorated_key(); + auto i = _partitions.find(dk, cache_entry::compare(_schema)); + if (i != _partitions.end()) { + cache_entry& e = *i; + _tracker.touch(e); + ++_stats.hits; + return make_reader_returning(mutation(_schema, dk, e.partition())); + } else { + ++_stats.misses; + return make_mutation_reader(*this, _underlying(range)); + } }); } @@ -166,32 +166,33 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec return with_allocator(_tracker.allocator(), [this, &m, &presence_checker] () { unsigned quota = 30; try { - _update_section(_tracker.region(), [&] { - auto i = m.partitions.begin(); - const schema& s = *m.schema(); - while (i != m.partitions.end() && quota) { - partition_entry& mem_e = *i; - // FIXME: Optimize knowing we lookup in-order. - auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema)); - // If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete. - // FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to - // search it. - if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) { - cache_entry& entry = *cache_i; - _tracker.touch(entry); - entry.partition().apply(s, std::move(mem_e.partition())); - } else if (presence_checker(mem_e.key().key()) == partition_presence_checker_result::definitely_doesnt_exist) { - cache_entry* entry = current_allocator().construct( - std::move(mem_e.key()), std::move(mem_e.partition())); - _tracker.insert(*entry); - _partitions.insert(cache_i, *entry); - } - i = m.partitions.erase(i); - current_allocator().destroy(&mem_e); - --quota; - } - }); - return m.partitions.empty() ? stop_iteration::yes : stop_iteration::no; + _update_section(_tracker.region(), [&] { + auto i = m.partitions.begin(); + const schema& s = *m.schema(); + while (i != m.partitions.end() && quota) { + partition_entry& mem_e = *i; + // FIXME: Optimize knowing we lookup in-order. + auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema)); + // If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete. + // FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to + // search it. + if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) { + cache_entry& entry = *cache_i; + _tracker.touch(entry); + entry.partition().apply(s, std::move(mem_e.partition())); + } else if (presence_checker(mem_e.key().key()) == + partition_presence_checker_result::definitely_doesnt_exist) { + cache_entry* entry = current_allocator().construct( + std::move(mem_e.key()), std::move(mem_e.partition())); + _tracker.insert(*entry); + _partitions.insert(cache_i, *entry); + } + i = m.partitions.erase(i); + current_allocator().destroy(&mem_e); + --quota; + } + }); + return m.partitions.empty() ? stop_iteration::yes : stop_iteration::no; } catch (const std::bad_alloc&) { // Cache entry may be in an incomplete state if // _update_section fails due to weak exception guarantees of