From aae532a96be58a869475e0ba679d70376337f152 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 24 Apr 2022 21:24:38 +0300 Subject: [PATCH 1/3] table: clear: serialize with ongoing flush Get all flush permits to serialize with any ongoing flushes and preventing further flushes during table::clear, in particular calling discard_completed_segments for every table and clearing the memtables in clear_and_add. Fixes #10423 Signed-off-by: Benny Halevy --- dirty_memory_manager.hh | 6 ++++++ replica/table.cc | 1 + 2 files changed, 7 insertions(+) diff --git a/dirty_memory_manager.hh b/dirty_memory_manager.hh index fa2072fa42..783629e158 100644 --- a/dirty_memory_manager.hh +++ b/dirty_memory_manager.hh @@ -202,6 +202,12 @@ public: }); } + future get_all_flush_permits() { + return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) { + return this->get_flush_permit(std::move(units)); + }); + } + bool has_extraneous_flushes_requested() const { return _extraneous_flushes > 0; } diff --git a/replica/table.cc b/replica/table.cc index 9fd0f6ecd7..a7d730ce7d 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1496,6 +1496,7 @@ bool table::can_flush() const { } future<> table::clear() { + auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits(); if (_commitlog) { for (auto& t : *_memtables) { _commitlog->discard_completed_segments(_schema->id(), t->get_and_discard_rp_set()); From b8263e550aca7e1a2cd905a2dca5b8012e915d30 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 25 Apr 2022 16:31:53 +0300 Subject: [PATCH 2/3] memtable_list: safely futurize clear_and_add Following a4be927e2314fd2cafdfc972555c345e8b5c6d90 that reverted 2325c566d9af608b1effc768edc174359f34558a due to #10421, this patch reintroduces an async version of memtable_list::clear_and_add that calls clear_gently safely after replacing the _memtables vector with a new one so that writes and flushes can continue in he foreground while the old memtables are cleared. Fixes #10281 Signed-off-by: Benny Halevy --- replica/database.cc | 14 ++++++++++++++ replica/database.hh | 13 +++---------- replica/table.cc | 4 ++-- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 92bf92304a..e0167d3b53 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1574,6 +1574,20 @@ lw_shared_ptr memtable_list::new_memtable() { return make_lw_shared(_current_schema(), *_dirty_memory_manager, _table_stats, this, _compaction_scheduling_group); } +// Synchronously swaps the active memtable with a new, empty one, +// then, clears the existing memtable(s) asynchronously. +// Exception safe. +future<> memtable_list::clear_and_add() { + std::vector new_memtables; + new_memtables.emplace_back(new_memtable()); + auto old_memtables = std::exchange(_memtables, std::move(new_memtables)); + // Now that the existing _memtables vector is swapped with new_memtables + // the function can yield for clearing the old_memtables. + for (auto& smt : old_memtables) { + co_await smt->clear_gently(); + } +} + } // namespace replica future flush_permit::reacquire_sstable_write_permit() && { diff --git a/replica/database.hh b/replica/database.hh index 6338ba639f..e7dfd73115 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -219,17 +219,10 @@ public: } } - // Clears the active memtable and adds a new, empty one. + // Synchronously swaps the active memtable with a new, empty one, + // then, clears the existing memtable(s) asynchronously. // Exception safe. - void clear_and_add() { - auto mt = new_memtable(); - _memtables.clear(); - // emplace_back might throw only if _memtables was empty - // on entry. Otherwise, we rely on clear() not to release - // the vector capacity (See https://en.cppreference.com/w/cpp/container/vector/clear) - // and lw_shared_ptr being nothrow move constructible. - _memtables.emplace_back(std::move(mt)); - } + future<> clear_and_add(); size_t size() const { return _memtables.size(); diff --git a/replica/table.cc b/replica/table.cc index a7d730ce7d..4ba3cc0fd1 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1502,8 +1502,8 @@ future<> table::clear() { _commitlog->discard_completed_segments(_schema->id(), t->get_and_discard_rp_set()); } } - _memtables->clear_and_add(); - return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ })); + co_await _memtables->clear_and_add(); + co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ })); } // NOTE: does not need to be futurized, but might eventually, depending on From c2f0d75d96dc2ba46a76fdeca9a4aef35ca0d757 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 25 Apr 2022 16:26:22 +0300 Subject: [PATCH 3/3] test: database_test: add test_truncate_without_snapshot_during_writes Reproduces https://github.com/scylladb/scylla/issues/10421 with 2325c566d9af608b1effc768edc174359f34558a (memtable_list: futurize clear_and_add) Signed-off-by: Benny Halevy --- test/boost/database_test.cc | 42 +++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index ee9468be4d..0a9eafadbd 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -93,6 +93,48 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) { }, cfg); } +// Reproducer for: +// https://github.com/scylladb/scylla/issues/10421 +// https://github.com/scylladb/scylla/issues/10423 +SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) { + auto cfg = make_shared(); + cfg->auto_snapshot.set(false); + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get(); + auto& db = e.local_db(); + auto& ks = db.find_keyspace("ks"); + auto& cf = db.find_column_family("ks", "cf"); + auto s = cf.schema(); + int count = 0; + + auto insert_data = [&] (uint32_t begin, uint32_t end) { + return parallel_for_each(boost::irange(begin, end), [&] (auto i) -> future<> { + auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i))); + mutation m(s, pkey); + m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {}); + return do_with(freeze(m), [&] (const auto& fm) { + return db.apply(s, fm, tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).then([&] { + return cf.flush(); + }).handle_exception([] (std::exception_ptr ex) { + BOOST_FAIL(format("db.apply failed: {}", ex)); + }); + }).then([&] { + ++count; + }); + }); + }; + + uint32_t num_keys = 1000; + + auto f0 = insert_data(0, num_keys); + auto f1 = do_until([&] { return count >= num_keys; }, [&] { + return db.truncate(ks, cf, [] { return make_ready_future(db_clock::now()); }, false /* with_snapshot */).then([] { return yield(); }); + }); + f0.get(); + f1.get(); + }, cfg); +} + SEASTAR_TEST_CASE(test_querying_with_limits) { return do_with_cql_env([](cql_test_env& e) { return seastar::async([&] {