Merge 'Harden table truncate' from Benny Halevy

This series fixes a few issue on the table truncate path:
- "memtable_list: safely futurize clear_and_add"
  - reinstates an async version of table::clear_and_add, just safe against #10421
- a unit test reproducing #10421 was added to make sure the new version is indeed safe.
- "table: clear: serialize with ongoing flush" fixes #10423
- a unit test reproducing #10423 was added

Fixes #10281
Fixes #10423

Test: unit(dev), database_test. test_truncate_without_snapshot_during_{writes,flushes} (debug)

Closes #10424

* github.com:scylladb/scylla:
  test: database_test: add test_truncate_without_snapshot_during_writes
  memtable_list: safely futurize clear_and_add
  table: clear: serialize with ongoing flush
This commit is contained in:
Avi Kivity
2022-05-08 11:30:21 +03:00
5 changed files with 68 additions and 12 deletions

View File

@@ -93,6 +93,48 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
}, cfg);
}
// Reproducer for:
// https://github.com/scylladb/scylla/issues/10421
// https://github.com/scylladb/scylla/issues/10423
SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
auto cfg = make_shared<db::config>();
cfg->auto_snapshot.set(false);
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
auto& ks = db.find_keyspace("ks");
auto& cf = db.find_column_family("ks", "cf");
auto s = cf.schema();
int count = 0;
auto insert_data = [&] (uint32_t begin, uint32_t end) {
return parallel_for_each(boost::irange(begin, end), [&] (auto i) -> future<> {
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i)));
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
return do_with(freeze(m), [&] (const auto& fm) {
return db.apply(s, fm, tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).then([&] {
return cf.flush();
}).handle_exception([] (std::exception_ptr ex) {
BOOST_FAIL(format("db.apply failed: {}", ex));
});
}).then([&] {
++count;
});
});
};
uint32_t num_keys = 1000;
auto f0 = insert_data(0, num_keys);
auto f1 = do_until([&] { return count >= num_keys; }, [&] {
return db.truncate(ks, cf, [] { return make_ready_future<db_clock::time_point>(db_clock::now()); }, false /* with_snapshot */).then([] { return yield(); });
});
f0.get();
f1.get();
}, cfg);
}
SEASTAR_TEST_CASE(test_querying_with_limits) {
return do_with_cql_env([](cql_test_env& e) {
return seastar::async([&] {