diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index eb3de57351..de45f16354 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -159,9 +159,9 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // There is a semaphore inside the compaction manager in run_resharding_jobs. So we // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time - auto& t = table.as_table_state(); + auto& t = table.try_get_table_state_with_static_sharding(); co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> { + return table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> { auto erm = table.get_effective_replication_map(); // keep alive around compaction. sstables::compaction_descriptor desc(sstlist); @@ -604,12 +604,12 @@ future<> shard_reshaping_compaction_task_impl::run() { } // reshape sstables individually within the compaction groups - for (auto& sstables_in_cg : sstables_grouped_by_compaction_group | boost::adaptors::map_values) { - co_await reshape_compaction_group(sstables_in_cg, table, info); + for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) { + co_await reshape_compaction_group(sstables_in_cg.first, sstables_in_cg.second, table, info); } } -future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) { +future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(size_t compaction_group_id, std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) { while (true) { auto reshape_candidates = boost::copy_range>(sstables_in_cg @@ -635,8 +635,9 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::uno desc.creator = _creator; try { - co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { - sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state(), progress_monitor); + auto& t = table.get_compaction_group(compaction_group_id)->as_table_state(); + co_await table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, &t] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { + sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, t, progress_monitor); // update the sstables_in_cg set with new sstables and remove the reshaped ones for (auto& sst : sstlist) { sstables_in_cg.erase(sst); diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index c88c6130cb..b1c0764209 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -606,7 +606,7 @@ private: std::function _filter; uint64_t& _total_shard_size; - future<> reshape_compaction_group(std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info); + future<> reshape_compaction_group(size_t compaction_group_id, std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info); public: shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 057aea43dd..40b77f7fc2 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -292,7 +292,7 @@ stateDiagram-v2 use_new --> cleanup cleanup --> end_migration end_migration --> [*] - allow_write_both_read_old --> revert_migration: error + allow_write_both_read_old --> cleanup_target: error write_both_read_old --> cleanup_target: error streaming --> cleanup_target: error write_both_read_new --> if_state: error diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index ad328f65d1..61ffcc80ba 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -8,6 +8,7 @@ #include #include +#include #include "database_fwd.hh" #include "compaction/compaction_descriptor.hh" @@ -233,10 +234,16 @@ protected: // The list entries are unlinked automatically when the storage group, they belong to, is removed. compaction_group_list _compaction_groups; storage_group_map _storage_groups; - + // Prevents _storage_groups from having its elements inserted or deleted while other layer iterates + // over them (or over _compaction_groups). + seastar::rwlock _lock; public: virtual ~storage_group_manager(); + seastar::rwlock& get_rwlock() noexcept { + return _lock; + } + const compaction_group_list& compaction_groups() const noexcept { return _compaction_groups; } @@ -244,19 +251,12 @@ public: return _compaction_groups; } - const storage_group_map& storage_groups() const noexcept { - return _storage_groups; - } - storage_group_map& storage_groups() noexcept { - return _storage_groups; - } + future<> for_each_storage_group_gently(std::function(size_t, storage_group&)> f); + void for_each_storage_group(std::function f) const; + void remove_storage_group(size_t id); // FIXME: Cannot return nullptr, signature can be changed to return storage_group&. storage_group* storage_group_for_id(const schema_ptr&, size_t i) const; - compaction_group* single_compaction_group_if_available() noexcept { - return _compaction_groups.size() == 1 ? &_compaction_groups.front() : nullptr; - } - // Caller must keep the current effective_replication_map_ptr valid // until the storage_group_manager finishes update_effective_replication_map virtual future<> update_effective_replication_map(const locator::effective_replication_map& erm) = 0; diff --git a/replica/database.hh b/replica/database.hh index cd4c72bd42..df01578558 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -114,6 +114,10 @@ namespace gms { class gossiper; } +namespace compaction { +class shard_reshaping_compaction_task_impl; +} + namespace db { class commitlog; class config; @@ -591,8 +595,6 @@ private: storage_group* storage_group_for_id(size_t i) const; std::unique_ptr make_storage_group_manager(); - // Return compaction group if table owns a single one. Otherwise, null is returned. - compaction_group* single_compaction_group_if_available() const noexcept; compaction_group* get_compaction_group(size_t id) const noexcept; // Select a compaction group from a given token. compaction_group& compaction_group_for_token(dht::token token) const noexcept; @@ -604,8 +606,6 @@ private: compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept; // Returns a list of all compaction groups. compaction_group_list& compaction_groups() const noexcept; - // Returns a list of all storage groups. - const storage_group_map& storage_groups() const noexcept; // Safely iterate through compaction groups, while performing async operations on them. future<> parallel_foreach_compaction_group(std::function(compaction_group&)> action); @@ -838,7 +838,15 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } future<> update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; +private: + future<> clear_inactive_reads_for_tablet(database& db, storage_group* sg); + future<> stop_compaction_groups(storage_group* sg); + future<> flush_compaction_groups(storage_group* sg); + future<> cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg); +public: future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id); + // For tests only. + future<> cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; shard_id shard_of(const mutation& m) const { @@ -1198,6 +1206,7 @@ public: friend class distributed_loader; friend class table_populator; + friend class compaction::shard_reshaping_compaction_task_impl; private: timer<> _off_strategy_trigger; @@ -1207,8 +1216,7 @@ public: void update_off_strategy_trigger(); void enable_off_strategy_trigger(); - // FIXME: get rid of it once no users. - compaction::table_state& as_table_state() const noexcept; + compaction::table_state& try_get_table_state_with_static_sharding() const; // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); diff --git a/replica/table.cc b/replica/table.cc index 94fe11d43a..1fac797a3f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -548,10 +548,31 @@ void table::enable_off_strategy_trigger() { storage_group_manager::~storage_group_manager() = default; +future<> storage_group_manager::for_each_storage_group_gently(std::function(size_t, storage_group&)> f) { + rwlock::holder shared_lock = co_await get_rwlock().hold_read_lock(); + for (auto& [id, sg]: _storage_groups) { + co_await f(id, *sg); + } +} + +void storage_group_manager::for_each_storage_group(std::function f) const { + for (auto& [id, sg]: _storage_groups) { + f(id, *sg); + } +} + +void storage_group_manager::remove_storage_group(size_t id) { + if (auto it = _storage_groups.find(id); it != _storage_groups.end()) { + _storage_groups.erase(it); + } else { + throw std::out_of_range(format("remove_storage_group: storage group with id={} not found", id)); + } +} + storage_group* storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const { - auto it = storage_groups().find(i); - if (it == storage_groups().end()) [[unlikely]] { - on_internal_error(tlogger, format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name())); + auto it = _storage_groups.find(i); + if (it == _storage_groups.end()) [[unlikely]] { + throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name())); } return it->second.get(); } @@ -661,10 +682,9 @@ public: auto shard = tmap.get_shard(tid, _my_host_id); if (shard && *shard == this_shard_id()) { tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name()); + auto cg = std::make_unique(_t, tid.value(), std::move(range)); + ret[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); } - // FIXME: don't allocate compaction groups for tablets that aren't present in this shard. - auto cg = std::make_unique(_t, tid.value(), std::move(range)); - ret[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); } _storage_groups = std::move(ret); } @@ -805,7 +825,7 @@ bool tablet_storage_group_manager::all_storage_groups_split() { return true; } - auto split_ready = std::ranges::all_of(storage_groups() | boost::adaptors::map_values, + auto split_ready = std::ranges::all_of(_storage_groups | boost::adaptors::map_values, std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(compaction_groups()))); // The table replica will say to coordinator that its split status is ready by @@ -834,9 +854,9 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com future<> tablet_storage_group_manager::split_all_storage_groups() { sstables::compaction_type_options::split opt = split_compaction_options(); - for (auto& storage_group : storage_groups() | boost::adaptors::map_values) { - co_await storage_group->split(compaction_groups(), opt); - } + co_await for_each_storage_group_gently([this, opt] (size_t i, storage_group& storage_group) { + return storage_group.split(compaction_groups(), opt); + }); } future<> table::split_all_storage_groups() { @@ -849,7 +869,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id return make_ready_future<>(); } - auto& sg = storage_groups()[idx]; + auto& sg = _storage_groups[idx]; if (!sg) { on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard", idx, schema()->ks_name(), schema()->cf_name())); @@ -873,10 +893,6 @@ std::unique_ptr table::make_storage_group_manager() { return ret; } -compaction_group* table::single_compaction_group_if_available() const noexcept { - return _sg_manager->single_compaction_group_if_available(); -} - compaction_group* table::get_compaction_group(size_t id) const noexcept { return storage_group_for_id(id)->main_compaction_group().get(); } @@ -912,8 +928,8 @@ utils::chunked_vector tablet_storage_group_manager::compactio size_t candidate_end = tr.end() ? storage_group_id_for_token(tr.end()->value()) : (tablet_count() - 1); while (candidate_start <= candidate_end) { - auto it = storage_groups().find(candidate_start++); - if (it == storage_groups().end()) { + auto it = _storage_groups.find(candidate_start++); + if (it == _storage_groups.end()) { continue; } auto& sg = it->second; @@ -965,12 +981,8 @@ compaction_group_list& table::compaction_groups() const noexcept { return _sg_manager->compaction_groups(); } -const storage_group_map& table::storage_groups() const noexcept { - return _sg_manager->storage_groups(); -} - future<> table::parallel_foreach_compaction_group(std::function(compaction_group&)> action) { - // TODO: place a barrier here when we allow dynamic groups. + rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock(); co_await coroutine::parallel_for_each(compaction_groups(), [&] (compaction_group& cg) { return action(cg); }); @@ -1009,11 +1021,14 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no future<> table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) { + compaction_group& cg = compaction_group_for_sstable(sst); + // Hold gate to make share compaction group is alive. + auto holder = cg.async_gate().hold(); + auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. // atomically load all opened sstables into column family. - compaction_group& cg = compaction_group_for_sstable(sst); if (!offstrategy) { add_sstable(cg, sst); } else { @@ -1789,7 +1804,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o co_await flush(); } - auto& cg = *get_compaction_group(0); + auto& cg = *storage_group_for_id(0)->main_compaction_group().get(); co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state(), info); } @@ -2024,13 +2039,13 @@ locator::table_load_stats table::table_load_stats(std::functionsplit_ready_seq_number(); - for (auto& [id, sg] : storage_groups()) { + _sg_manager->for_each_storage_group([&] (size_t id, storage_group& sg) { locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) }; if (!tablet_filter(gid)) { - continue; + return; } - stats.size_in_bytes += sg->live_disk_space_used(); - } + stats.size_in_bytes += sg.live_disk_space_used(); + }); return stats; } @@ -2058,7 +2073,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca // Stop the released main compaction groups asynchronously future<> stop_fut = make_ready_future<>(); - for (auto& [id, sg] : storage_groups()) { + for (auto& [id, sg] : _storage_groups) { if (!sg->main_compaction_group()->empty()) { on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ "therefore groups cannot be remapped with the new tablet count.", @@ -2103,12 +2118,35 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo if (new_tablet_count > old_tablet_count) { tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); - return handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); + co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); + co_return; } - return make_ready_future(); + + // Allocate storage group if tablet is migrating in. + auto this_replica = locator::tablet_replica{ + .host = erm.get_token_metadata().get_my_id(), + .shard = this_shard_id() + }; + auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) { + return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica; + }; + for (auto& transition : new_tablet_map->transitions()) { + auto tid = transition.first; + auto transition_info = transition.second; + if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) { + auto range = new_tablet_map->get_token_range(tid); + auto cg = std::make_unique(_t, tid.value(), std::move(range)); + _storage_groups[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); + } + } + co_return; } future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { + // Exclusive lock is meant to protect storage groups, but we hold it here to prevent preemption + // between erm and storage groups updates as they need to be consistent. + rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock(); + auto old_erm = std::exchange(_erm, std::move(erm)); if (uses_tablets()) { @@ -3330,8 +3368,10 @@ compaction::table_state& compaction_group::as_table_state() const noexcept { return *_table_state; } -compaction::table_state& table::as_table_state() const noexcept { - // FIXME: kill it once we're done with all remaining users. +compaction::table_state& table::try_get_table_state_with_static_sharding() const { + if (!uses_static_sharding()) { + throw std::runtime_error("Getting table state is allowed only with static sharding"); + } return get_compaction_group(0)->as_table_state(); } @@ -3412,25 +3452,30 @@ future<> compaction_group::cleanup() { tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.", p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name()); co_await _t._cache.invalidate(std::move(updater), p_range); + _t._cache.refresh_snapshot(); } -future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { - auto holder = async_gate().hold(); - - auto* sg = storage_group_for_id(tid.value()); - +future<> table::clear_inactive_reads_for_tablet(database& db, storage_group* sg) { for (auto& cg_ptr : sg->compaction_groups()) { - if (!cg_ptr) { - throw std::runtime_error(format("Cannot cleanup tablet {} of table {}.{} because it is not allocated in this shard", - tid, _schema->ks_name(), _schema->cf_name())); - } - co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range()); + } +} - // Synchronizes with in-flight writes if any, and also takes care of flushing if needed. - // FIXME: to be able to stop group and provide guarantee above, we must first be able to reallocate a new group if tablet is migrated back. - //co_await _cg.stop(); +future<> table::stop_compaction_groups(storage_group* sg) { + // Synchronizes with in-flight writes if any, and also takes care of flushing if needed. + for (auto& cg_ptr : sg->compaction_groups()) { + co_await cg_ptr->stop(); + } +} + +future<> table::flush_compaction_groups(storage_group* sg) { + for (auto& cg_ptr : sg->compaction_groups()) { co_await cg_ptr->flush(); + } +} + +future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg) { + for (auto& cg_ptr : sg->compaction_groups()) { co_await cg_ptr->cleanup(); // FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation, // and can cover some mutations which weren't cleaned, causing them to be lost during replay. @@ -3448,8 +3493,29 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato } tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name()); +} - // FIXME: Deallocate compaction group in this shard +future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { + auto holder = async_gate().hold(); + rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock(); + auto* sg = storage_group_for_id(tid.value()); + + co_await clear_inactive_reads_for_tablet(db, sg); + // compaction_group::stop takes care of flushing. + co_await stop_compaction_groups(sg); + co_await cleanup_compaction_groups(db, sys_ks, tid, sg); + _sg_manager->remove_storage_group(tid.value()); +} + +future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { + auto holder = async_gate().hold(); + // Hold shared lock to keep storage group alive. + rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock(); + auto* sg = storage_group_for_id(tid.value()); + + co_await clear_inactive_reads_for_tablet(db, sg); + co_await flush_compaction_groups(sg); + co_await cleanup_compaction_groups(db, sys_ks, tid, sg); } } // namespace replica diff --git a/replica/tablets.cc b/replica/tablets.cc index 4ec702e1da..68dd51c053 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -367,15 +367,13 @@ public: : _schema(std::move(s)) , _tablet_map(tmap.tablet_count()) { - for (const auto& [id, sg] : sgm.storage_groups()) { - if (sg) { - auto set = sg->make_sstable_set(); - _size += set->size(); - _bytes_on_disk += set->bytes_on_disk(); - _sstable_sets[id] = std::move(set); - _sstable_set_ids.insert(id); - } - } + sgm.for_each_storage_group([this] (size_t id, storage_group& sg) { + auto set = sg.make_sstable_set(); + _size += set->size(); + _bytes_on_disk += set->bytes_on_disk(); + _sstable_sets[id] = std::move(set); + _sstable_set_ids.insert(id); + }); } tablet_sstable_set(const tablet_sstable_set& o) @@ -568,7 +566,7 @@ public: auto token = pos.token(); if (!_cur_set || pos.token() >= _lowest_next_token) { auto idx = _tset.group_of(token); - if (!token.is_maximum()) { + if (!token.is_maximum() && _tset._sstable_set_ids.contains(idx)) { _cur_set = _tset.find_sstable_set(idx); } // Set the next token to point to the next engaged storage group. diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 47bb420b2f..b2a7adc1cb 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1047,7 +1047,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case locator::tablet_transition_stage::allow_write_both_read_old: if (action_failed(tablet_state.barriers[trinfo.stage])) { if (check_excluded_replicas()) { - transition_to_with_barrier(locator::tablet_transition_stage::revert_migration); + transition_to_with_barrier(locator::tablet_transition_stage::cleanup_target); break; } } diff --git a/test/boost/commitlog_cleanup_test.cc b/test/boost/commitlog_cleanup_test.cc index e156a792bc..942b852b5b 100644 --- a/test/boost/commitlog_cleanup_test.cc +++ b/test/boost/commitlog_cleanup_test.cc @@ -134,7 +134,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanups) { // Cleanup the tablet. e.db().invoke_on_all([&] (replica::database& db) { - return db.find_column_family("ks", "cf").cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)); + return db.find_column_family("ks", "cf").cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0)); }).get(); BOOST_REQUIRE_EQUAL(get_num_rows(), 0); @@ -179,7 +179,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanup_record_gc) { }; auto cleanup_tablet = [&] (std::string cf) { auto& db = e.local_db(); - db.find_column_family("ks", cf).cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get(); + db.find_column_family("ks", cf).cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get(); }; auto get_num_records = [&] { auto res = e.execute_cql("select * from system.commitlog_cleanups;").get(); diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 8060e7c682..b1397b7d90 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -124,7 +124,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { flush(e); e.db().invoke_on_all([] (replica::database& dbi) { return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { - return dbi.get_compaction_manager().perform_major_compaction(t->as_table_state()); + return dbi.get_compaction_manager().perform_major_compaction(t->try_get_table_state_with_static_sharding()); }); }).get(); diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 43aad33be0..7f598f4371 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -3027,7 +3027,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt desc.replacer = replacer_fn_no_op(); auto cdata = compaction_manager::create_compaction_data(); compaction_progress_monitor progress_monitor; - sstables::compact_sstables(std::move(desc), cdata, cf->as_table_state(), progress_monitor).get(); + sstables::compact_sstables(std::move(desc), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get(); return compacted_sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice()); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index edfb00e09f..11403ec402 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4991,7 +4991,7 @@ static future<> run_incremental_compaction_test(sstables::offstrategy offstrateg ssts = {}; // releases references auto owned_ranges_ptr = make_lw_shared(std::move(owned_token_ranges)); run_compaction(t, std::move(owned_ranges_ptr)).get(); - BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty()); + BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty()); testlog.info("Cleanup has finished"); } @@ -5099,7 +5099,7 @@ SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) { ssts = {}; // releases references auto owned_ranges_ptr = make_lw_shared(std::move(owned_token_ranges)); t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get(); - BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty()); + BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty()); testlog.info("Cleanup has finished"); } diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index fc12c53cfb..92e7b83c88 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -38,7 +38,7 @@ public: return _cf->add_sstable_and_update_cache(sstable, offstrategy); } auto new_sstables = { sstable }; - return _cf->as_table_state().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no); + return _cf->try_get_table_state_with_static_sharding().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no); } future<> rebuild_sstable_list(compaction::table_state& table_s, const std::vector& new_sstables, diff --git a/test/boost/view_complex_test.cc b/test/boost/view_complex_test.cc index 4e1337bfbf..408b42955b 100644 --- a/test/boost/view_complex_test.cc +++ b/test/boost/view_complex_test.cc @@ -842,7 +842,7 @@ void test_commutative_row_deletion(cql_test_env& e, std::function&& mayb }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get(); } SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) { @@ -1078,7 +1078,7 @@ void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::func }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get(); eventually([&] { auto msg = e.execute_cql("select * from vcf limit 1").get(); assert_that(msg).is_rows().with_rows({{ diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index e0e0590a4e..73bfdbdb4a 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -70,10 +70,10 @@ public: return true; } const sstables::sstable_set& main_sstable_set() const override { - return table().as_table_state().main_sstable_set(); + return table().try_get_table_state_with_static_sharding().main_sstable_set(); } const sstables::sstable_set& maintenance_sstable_set() const override { - return table().as_table_state().maintenance_sstable_set(); + return table().try_get_table_state_with_static_sharding().maintenance_sstable_set(); } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return sstables::get_fully_expired_sstables(*this, sstables, query_time); @@ -105,7 +105,7 @@ public: } bool memtable_has_key(const dht::decorated_key& key) const override { return false; } future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { - return table().as_table_state().on_compaction_completion(std::move(desc), offstrategy); + return table().try_get_table_state_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy); } bool is_auto_compaction_disabled_by_user() const noexcept override { return table().is_auto_compaction_disabled_by_user(); diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index 219f1e7b4f..b9f8422e60 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -1762,7 +1762,7 @@ void populate(const std::vector& datasets, cql_test_env& env, const ta output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names()); - db.get_compaction_manager().run_with_compaction_disabled(cf.as_table_state(), [&] { + db.get_compaction_manager().run_with_compaction_disabled(cf.try_get_table_state_with_static_sharding(), [&] { return seastar::async([&] { auto gen = ds.make_generator(s, cfg); while (auto mopt = gen()) { @@ -1869,7 +1869,7 @@ auto make_compaction_disabling_guard(replica::database& db, std::vector pr; for (auto&& t : tables) { // FIXME: discarded future. - (void)db.get_compaction_manager().run_with_compaction_disabled(t->as_table_state(), [f = shared_future<>(pr.get_shared_future())] { + (void)db.get_compaction_manager().run_with_compaction_disabled(t->try_get_table_state_with_static_sharding(), [f = shared_future<>(pr.get_shared_future())] { return f.get_future(); }); } diff --git a/test/perf/perf_sstable.hh b/test/perf/perf_sstable.hh index 50cf521547..3acfd00c51 100644 --- a/test/perf/perf_sstable.hh +++ b/test/perf/perf_sstable.hh @@ -240,7 +240,7 @@ public: descriptor.replacer = sstables::replacer_fn_no_op(); auto cdata = compaction_manager::create_compaction_data(); compaction_progress_monitor progress_monitor; - auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->as_table_state(), progress_monitor).get(); + auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get(); auto end = perf_sstable_test_env::now(); auto partitions_per_sstable = _cfg.partitions / _cfg.sstables; diff --git a/test/topology_custom/test_tablets_migration.py b/test/topology_custom/test_tablets_migration.py index 25c2a73a85..e24ed7fc7c 100644 --- a/test/topology_custom/test_tablets_migration.py +++ b/test/topology_custom/test_tablets_migration.py @@ -242,3 +242,51 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail assert len(replicas) == 1 for r in replicas[0].replicas: assert r[0] != host_ids[failer.fail_idx] + +@pytest.mark.asyncio +async def test_tablet_back_and_forth_migration(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cfg = {'enable_user_defined_functions': False, 'experimental_features': ['tablets', 'consistent-topology-changes']} + host_ids = [] + servers = [] + + async def make_server(): + s = await manager.server_add(config=cfg) + servers.append(s) + host_ids.append(await manager.get_host_id(s.server_id)) + await manager.api.disable_tablet_balancing(s.ip_addr) + + async def assert_rows(num): + res = await cql.run_async(f"SELECT * FROM test.test") + assert len(res) == num + + await make_server() + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + await make_server() + + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({1}, {1});") + await assert_rows(1) + + replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + logger.info(f"Tablet is on [{replicas}]") + assert len(replicas) == 1 and len(replicas[0].replicas) == 1 + + old_replica = replicas[0].replicas[0] + assert old_replica[0] != host_ids[1] + new_replica = (host_ids[1], 0) + + logger.info(f"Moving tablet {old_replica} -> {new_replica}") + manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + + await assert_rows(1) + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({2}, {2});") + await assert_rows(2) + + logger.info(f"Moving tablet {new_replica} -> {old_replica}") + manager.api.move_tablet(servers[0].ip_addr, "test", "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0) + + await assert_rows(2) + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({3}, {3});") + await assert_rows(3)