prevent populating cache with expired rows from sstables
change row purge condition for compacting_reader to remove all expired rows to avoid read perfomance problems when there are many expired tombstones in row cache Refs #2252 Closes #12565
This commit is contained in:
committed by
Kamil Braun
parent
5bc7f0732e
commit
ce96b472d3
@@ -1466,7 +1466,7 @@ table::sstables_as_snapshot_source() {
|
||||
return make_compacting_reader(
|
||||
std::move(reader),
|
||||
gc_clock::now(),
|
||||
[](const dht::decorated_key&) { return api::min_timestamp; },
|
||||
[](const dht::decorated_key&) { return api::max_timestamp; },
|
||||
_compaction_manager.get_tombstone_gc_state(),
|
||||
fwd);
|
||||
}, [this, sst_set] {
|
||||
|
||||
@@ -4219,3 +4219,60 @@ SEASTAR_TEST_CASE(test_reading_of_nonfull_keys) {
|
||||
.produces(m1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones) {
|
||||
return do_with_cql_env_thread([](cql_test_env& env) {
|
||||
sstring ks_name = "ks";
|
||||
sstring table_name = "test_pop_cache_tomb_table";
|
||||
|
||||
env.execute_cql(format(
|
||||
"CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = "
|
||||
"{{'class' : 'SimpleStrategy', 'replication_factor' : 1}};", ks_name)).get();
|
||||
env.execute_cql(format(
|
||||
"CREATE TABLE {}.{} (pk int, ck int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
|
||||
|
||||
env.require_table_exists(ks_name, table_name).get();
|
||||
|
||||
replica::table& t = env.local_db().find_column_family(ks_name, table_name);
|
||||
schema_ptr s = t.schema();
|
||||
|
||||
int32_t pk = 0;
|
||||
dht::decorated_key dk(dht::token(), partition_key::make_empty());
|
||||
do {
|
||||
dk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(++pk)));
|
||||
} while (dht::shard_of(*s, dk.token()) != this_shard_id());
|
||||
|
||||
auto ck1 = clustering_key::from_deeply_exploded(*s, {1});
|
||||
auto ck1_prefix = clustering_key_prefix::from_deeply_exploded(*s, {1});
|
||||
auto ck2 = clustering_key::from_deeply_exploded(*s, {2});
|
||||
auto ck2_prefix = clustering_key_prefix::from_deeply_exploded(*s, {2});
|
||||
|
||||
auto dt_noexp = gc_clock::now();
|
||||
auto dt_exp = gc_clock::now() - std::chrono::seconds(s->gc_grace_seconds().count() + 1);
|
||||
|
||||
mutation m(s, dk);
|
||||
m.partition().apply_delete(*s, ck1_prefix, tombstone(1, dt_noexp)); // create non-expired tombstone
|
||||
m.partition().apply_delete(*s, ck2_prefix, tombstone(2, dt_exp)); // create expired tombstone
|
||||
t.apply(m);
|
||||
t.flush().get();
|
||||
|
||||
// Clear the cache and repopulate it by reading sstables
|
||||
t.get_row_cache().evict();
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto reader = t.get_row_cache().make_reader(s, semaphore.make_permit());
|
||||
deferred_close dc{reader};
|
||||
reader.consume_pausable([s](mutation_fragment_v2&& mf) {
|
||||
return stop_iteration::no;
|
||||
}).get();
|
||||
|
||||
cache_entry& entry = t.get_row_cache().lookup(dk);
|
||||
auto& cp = entry.partition().version()->partition();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cp.tombstone_for_row(*s, ck1), row_tombstone(tombstone(1, dt_noexp))); // non-expired tombstone is in cache
|
||||
BOOST_REQUIRE(cp.find_row(*s, ck2) == nullptr); // expired tombstone isn't in cache
|
||||
|
||||
const auto rows = cp.non_dummy_rows();
|
||||
BOOST_REQUIRE(std::distance(rows.begin(), rows.end()) == 1); // cache contains non-expired row only
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3391,9 +3391,11 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
|
||||
cf->start();
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::null);
|
||||
|
||||
// since we use compacting_reader expired tombstones shouldn't be read from sstables
|
||||
// so we just check there are no live (resurrected for this test) rows in partition
|
||||
auto is_partition_dead = [&s, &cf, &env] (partition_key& pkey) {
|
||||
replica::column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, env.make_reader_permit(), pkey).get0();
|
||||
return mp && bool(mp->partition_tombstone());
|
||||
return mp && mp->live_row_count(*s, gc_clock::time_point::max()) == 0;
|
||||
};
|
||||
|
||||
cf->add_sstable_and_update_cache(non_expired_sst).get();
|
||||
|
||||
Reference in New Issue
Block a user