diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 20256594e9..6908ceef6b 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -333,7 +333,7 @@ Invariants: on behalf of previous transitions can still run in the cluster, but they can have no side effects. This is ensured by the proper use of the topology guard mechanism (see the "Topology guards" section). -# Tablet splitting +# Tablet resize Each table has its resize metadata stored in group0. @@ -349,6 +349,8 @@ for a given table, which can be done by dividing average table size[1] by the ta [1]: The average size of a table is the total size across all DCs divided by the number of replicas across all DCs. +## Tablet splitting + A table will need split if its average size surpasses the split threshold, which is 100% of the target tablet size, which defaults to 5G. The reasoning is that after split we want average size to return to the target size. By the same reason, merge threshold is 50% of target size. @@ -373,13 +375,62 @@ emits a decision to finalize the split request. The finalization is serialized w doubling tablet count would interfere with the migration process. When the state machine leaves the migration track, and there are tablets waiting for tablet split to -be finalized, the topology will transition into `tablet_split_finalization` state. At this moment, there will +be finalized, the topology will transition into `tablet_resize_finalization` state. At this moment, there will be no migration running in the system. A global token metadata barrier is executed to make sure that no process e.g. repair will be holding stale metadata when finalizing split. After that, the new tablet map, which is a result of splitting each preexisting tablet into two, is committed to group0. The replicas will react to that by remapping its compaction groups into a new set which is, at least, twice as large as the old one. +## Tablet merging + +A table will need merge if its average size is below the merge threshold, which is 50% of the target +tablet size, which defaults to 5G. The reasoning is that after merge we want average size to return +to the target size. This hysteresis is important to avoid oscillations between splits and merges. + +The initial tablet count (the parameter in schema) is respected while the table is in "growing mode". +Every table starts in this mode and will leave it if for example there was a need to split beyond +the initial tablet count. After a table leaves the mode, the average size can be trusted to determine +that the table is shrinking. + +When the load balancer decides to merge a table, the resize_type field in tablet metadata will be set +to 'merge' and resize_seq_number is bumped to the next sequence number. +Similar to split, the load balancer might decide to revoke an ongoing merge if it realizes that after +merge, a split will be needed. + +The merge preparation phase is done by co-locating replicas of sibling tablets on the same node:shard, +through migrations (the mechanism). Unlike split, all the preparation is done by the coordinator. +We say that a pair of tablets are siblings if they will become one after merge. This is built on the +power-of-two constraint. For example, if a table has 4 tablets, the siblings are (0, 1) and (2, 3). +The co-location algorithm is simple. The balancer will produce a migration for "odd" tablet to follow the +"even" one. For example, a replica of tablet 1 will be moved to where a replica of tablet 0 lives. +If the "odd" tablet lives on the same node but on different shard, an intra-node migration is performed. + +Without co-location, the merge completion handler wouldn't be able to find data of replicas to be merged +in the same location. Making it impossible for coordinator to merge the replica sets, and the replica +layer to combine the data together. + +Merge has low priority, so the co-location migrations will be emitted when there's no more important +work to do (e.g. node draining or regular balancing). The regular balancing will not undo the co-location +work done so far by migrating co-located replicas together (treating them as merged). + +Once the balancer realizes replicas of all sibling tablets are co-located, a decision will be emitted +to finalize the merge. A pair of sibling tablets is considered co-located if their replica sets are +equal, i.e. (s1 + s2) == s1. The finalization is serialized with migration, as shrinking tablet count +would interfere with the migration process that requires tablet id stability. + +When the coordinator leaves the migration track, and there are tables waiting for merge to be finalized, +the state machine will transition into `tablet_resize_finalization` state. At this moment, there will +be no migration running in the system. A global token metadata barrier is executed to make sure that no +process will hold stale topology when resizing the tablet map. That's important since the requests must +find a replica state consistent with the one in group0. +The handler of `tablet_resize_finalization` state will check if the decision is still to merge for a +table, and if so, the tablet map will have its size reduced by a factor of 2. When replicas of sibling +tablets are co-located, their replica sets can be merged into one, since (s1 + s2) == s1. +Once the new map is committed to group0, replicas will react to that by resizing their internal structure +to match the new tablet count, and also merging the compaction groups (sstable(s) + memtable) that +belonged to sibling tablets together. + # Sharding with tablets Each table can have different shard assignment for a given token computed from the placement of tablet replicas, diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 5f1d4e6951..21d52fe2ad 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -145,6 +145,7 @@ public: gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv }; gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv }; + gms::feature tablet_merge { *this, "TABLET_MERGE"sv }; // A feature just for use in tests. It must not be advertised unless // the "features_enable_test_feature" injection is enabled. diff --git a/locator/tablets.cc b/locator/tablets.cc index 48bf85b3f4..ff42aa1922 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -31,6 +31,14 @@ namespace locator { seastar::logger tablet_logger("tablets"); +std::optional> tablet_map::sibling_tablets(tablet_id t) const { + if (tablet_count() == 1) { + return std::nullopt; + } + auto first_sibling = tablet_id(t.value() & ~0x1); + return std::make_pair(first_sibling, *next_tablet(first_sibling)); +} + static write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) { @@ -152,6 +160,32 @@ bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info& return false; } +tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info) + : replicas(std::move(replicas)) + , repair_time(repair_time) + , repair_task_info(std::move(repair_task_info)) +{} + +tablet_info::tablet_info(tablet_replica_set replicas) + : tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}) +{} + +std::optional merge_tablet_info(tablet_info a, tablet_info b) { + if (a.repair_task_info.is_valid() || b.repair_task_info.is_valid()) { + return {}; + } + + auto sorted = [] (tablet_replica_set rs) { + std::ranges::sort(rs, std::less()); + return rs; + }; + if (sorted(a.replicas) != sorted(b.replicas)) { + return {}; + } + + auto repair_time = std::max(a.repair_time, b.repair_time); + return tablet_info(std::move(a.replicas), repair_time, a.repair_task_info); +} std::optional get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) { auto leaving = substract_sets(tinfo.replicas, trinfo.next); @@ -407,6 +441,24 @@ future<> tablet_map::for_each_tablet(seastar::noncopyable_function(tabl } } +future<> tablet_map::for_each_sibling_tablets(seastar::noncopyable_function(tablet_desc, std::optional)> func) const { + auto make_desc = [this] (tablet_id tid) { + return tablet_desc{tid, &get_tablet_info(tid), get_tablet_transition_info(tid)}; + }; + if (_tablets.size() == 1) { + co_return co_await func(make_desc(first_tablet()), std::nullopt); + } + for (std::optional tid = first_tablet(); tid; tid = next_tablet(*tid)) { + auto tid1 = tid; + auto tid2 = tid = next_tablet(*tid); + if (!tid2) { + // Cannot happen with power-of-two invariant. + throw std::logic_error(format("Cannot retrieve sibling tablet with tablet count {}", tablet_count())); + } + co_await func(make_desc(*tid1), make_desc(*tid2)); + } +} + void tablet_map::clear_transitions() { _transitions.clear(); } @@ -541,6 +593,10 @@ bool tablet_map::needs_split() const { return std::holds_alternative(_resize_decision.way); } +bool tablet_map::needs_merge() const { + return std::holds_alternative(_resize_decision.way); +} + const locator::resize_decision& tablet_map::resize_decision() const { return _resize_decision; } @@ -581,6 +637,10 @@ resize_decision::seq_number_t resize_decision::next_sequence_number() const { return (sequence_number == std::numeric_limits::max()) ? 0 : sequence_number + 1; } +bool resize_decision::initial_decision() const { + return sequence_number == 0; +} + table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept { size_in_bytes = size_in_bytes + s.size_in_bytes; split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number); diff --git a/locator/tablets.hh b/locator/tablets.hh index f1c13493f2..f039fb2385 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -47,7 +47,7 @@ struct tablet_id { explicit tablet_id(size_t id) : id(id) {} size_t value() const { return id; } explicit operator size_t() const { return id; } - bool operator<=>(const tablet_id&) const = default; + auto operator<=>(const tablet_id&) const = default; }; /// Identifies tablet (not be confused with tablet replica) in the scope of the whole cluster. @@ -55,14 +55,14 @@ struct global_tablet_id { table_id table; tablet_id tablet; - bool operator<=>(const global_tablet_id&) const = default; + auto operator<=>(const global_tablet_id&) const = default; }; struct tablet_replica { host_id host; shard_id shard; - bool operator==(const tablet_replica&) const = default; + auto operator<=>(const tablet_replica&) const = default; }; using tablet_replica_set = utils::small_vector; @@ -171,9 +171,19 @@ struct tablet_info { db_clock::time_point repair_time; locator::tablet_task_info repair_task_info; + tablet_info() = default; + tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info); + tablet_info(tablet_replica_set); + bool operator==(const tablet_info&) const = default; }; +// Merges tablet_info b into a, but with following constraints: +// - they cannot have active repair task, since each task has a different id +// - their replicas must be all co-located. +// If tablet infos are mergeable, merged info is returned. Otherwise, nullopt. +std::optional merge_tablet_info(tablet_info a, tablet_info b); + /// Represents states of the tablet migration state machine. /// /// The stage serves two major purposes: @@ -312,6 +322,8 @@ struct resize_decision { bool operator==(const resize_decision&) const; sstring type_name() const; seq_number_t next_sequence_number() const; + // Returns true if this is the initial decision, before split or merge was emitted. + bool initial_decision() const; }; struct table_load_stats { @@ -346,6 +358,12 @@ struct repair_scheduler_config { using load_stats_ptr = lw_shared_ptr; +struct tablet_desc { + tablet_id tid; + const tablet_info* info; // cannot be null. + const tablet_transition_info* transition; // null if there's no transition. +}; + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). @@ -440,6 +458,11 @@ public: return tablet_id(size_t(t) + 1); } + // Returns the pair of sibling tablets for a given tablet id. + // For example, if id 1 is provided, a pair of 0 and 1 is returned. + // Returns disengaged optional when sibling pair cannot be found. + std::optional> sibling_tablets(tablet_id t) const; + /// Returns true iff tablet has a given replica. /// If tablet is in transition, considers both previous and next replica set. bool has_replica(tablet_id, tablet_replica) const; @@ -451,6 +474,10 @@ public: /// Calls a given function for each tablet in the map in token ownership order. future<> for_each_tablet(seastar::noncopyable_function(tablet_id, const tablet_info&)> func) const; + /// Calls a given function for each sibling tablet in the map in token ownership order. + /// If tablet count == 1, then there will be only one call and 2nd tablet_desc is disengaged. + future<> for_each_sibling_tablets(seastar::noncopyable_function(tablet_desc, std::optional)> func) const; + const auto& transitions() const { return _transitions; } @@ -480,6 +507,7 @@ public: bool operator==(const tablet_map&) const = default; bool needs_split() const; + bool needs_merge() const; /// Returns the token_range in which the given token will belong to after a tablet split dht::token_range get_token_range_after_split(const token& t) const noexcept; diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 5970fac7d9..7d71ba70a0 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -91,6 +91,9 @@ public: compaction_group(table& t, size_t gid, dht::token_range token_range); ~compaction_group(); + void update_id(size_t id) { + _group_id = id; + } void update_id_and_range(size_t id, dht::token_range token_range) { _group_id = id; _token_range = std::move(token_range); @@ -155,6 +158,9 @@ public: // invalidated and statistics are updated. future<> update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc); + // Merges all sstables from another group into this one. + future<> merge_sstables_from(compaction_group& group); + const lw_shared_ptr& main_sstables() const noexcept; void set_main_sstables(lw_shared_ptr new_main_sstables); @@ -163,6 +169,7 @@ public: // Makes a sstable set, which includes all sstables managed by this group lw_shared_ptr make_sstable_set() const; + std::vector all_sstables() const; const std::vector& compacted_undeleted_sstables() const noexcept; // Triggers regular compaction. @@ -203,6 +210,9 @@ using const_compaction_group_ptr = lw_shared_ptr; // shard will have as many groups as there are tablet replicas owned by that shard. class storage_group { compaction_group_ptr _main_cg; + // Holds compaction groups that now belongs to same tablet after merge. Compaction groups here will + // eventually have all their data moved into main group. + std::vector _merging_groups; std::vector _split_ready_groups; seastar::gate _async_gate; private: @@ -231,6 +241,13 @@ public: utils::small_vector compaction_groups() noexcept; utils::small_vector compaction_groups() const noexcept; + utils::small_vector split_unready_groups() const; + bool split_unready_groups_are_empty() const; + + void add_merging_group(compaction_group_ptr); + const std::vector& merging_groups() const; + future<> remove_empty_merging_groups(); + // Puts the storage group in split mode, in which it internally segregates data // into two sstable sets and two memtable sets corresponding to the two adjacent // tablets post-split. @@ -271,6 +288,8 @@ using storage_group_map = absl::flat_hash_map stop() = 0; public: virtual ~storage_group_manager(); diff --git a/replica/table.cc b/replica/table.cc index 4beffab0d5..4f8322cc30 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -636,7 +637,8 @@ const storage_group_map& storage_group_manager::storage_groups() const { } future<> storage_group_manager::stop_storage_groups() noexcept { - return parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); }); + co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); }); + co_await stop(); } void storage_group_manager::clear_storage_groups() { @@ -687,6 +689,10 @@ public: _storage_groups = std::move(r); } + future<> stop() override { + return make_ready_future<>(); + } + future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override { return make_ready_future(); } compaction_group& compaction_group_for_token(dht::token token) const noexcept override { @@ -738,6 +744,8 @@ class tablet_storage_group_manager final : public storage_group_manager { // current split, and not a previously revoked (stale) decision. // The minimum value, which is a negative number, is not used by coordinator for first decision. locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); + future<> _merge_completion_fiber; + condition_variable _merge_completion_event; private: const schema_ptr& schema() const { return _t.schema(); @@ -758,6 +766,17 @@ private: // that were previously split. future<> handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); + // Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into + // the new tablet id (X >> 1). In practice, that means storage groups for X and X+1 + // are merged into a new storage group with id (X >> 1). + future<> handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); + + // When merge completes, compaction groups of sibling tablets are added to same storage + // group, but they're not merged yet into one, since the merge completion handler happens + // inside the erm updater which must complete ASAP. Therefore, those groups will be merged + // into a single one (main) in background. + future<> merge_completion_fiber(); + storage_group& storage_group_for_id(size_t i) const { return storage_group_manager::storage_group_for_id(schema(), i); } @@ -796,6 +815,7 @@ public: : _t(t) , _my_host_id(erm.get_token_metadata().get_my_id()) , _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id())) + , _merge_completion_fiber(merge_completion_fiber()) { storage_group_map ret; @@ -813,6 +833,11 @@ public: _storage_groups = std::move(ret); } + future<> stop() override { + _merge_completion_event.signal(); + return std::exchange(_merge_completion_fiber, make_ready_future<>()); + } + future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override; compaction_group& compaction_group_for_token(dht::token token) const noexcept override; @@ -875,6 +900,9 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran void storage_group::for_each_compaction_group(std::function action) const noexcept { action(_main_cg); + for (auto& cg : _merging_groups) { + action(cg); + } for (auto& cg : _split_ready_groups) { action(cg); } @@ -896,6 +924,17 @@ utils::small_vector storage_group::compaction_gro return cgs; } +utils::small_vector storage_group::split_unready_groups() const { + utils::small_vector cgs; + cgs.push_back(_main_cg); + std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs)); + return cgs; +} + +bool storage_group::split_unready_groups_are_empty() const { + return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty)); +} + bool storage_group::set_split_mode() { if (!splitting_mode()) { auto create_cg = [this] () -> compaction_group_ptr { @@ -908,8 +947,23 @@ bool storage_group::set_split_mode() { _split_ready_groups = std::move(split_ready_groups); } - // The storage group is considered "split ready" if its main compaction group is empty. - return _main_cg->empty(); + // The storage group is considered "split ready" if all split unready groups (main + merging) are empty. + return split_unready_groups_are_empty(); +} + +void storage_group::add_merging_group(compaction_group_ptr cg) { + _merging_groups.push_back(std::move(cg)); +} + +const std::vector& storage_group::merging_groups() const { + return _merging_groups; +} + +future<> storage_group::remove_empty_merging_groups() { + for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) { + co_await group->stop("tablet merge"); + } + std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty)); } future<> storage_group::split(sstables::compaction_type_options::split opt) { @@ -918,24 +972,34 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) { } co_await utils::get_local_injector().inject("delay_split_compaction", 5s); - if (_main_cg->empty()) { + if (split_unready_groups_are_empty()) { co_return; } - auto holder = _main_cg->async_gate().hold(); - co_await _main_cg->flush(); - // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. - co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); - co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); + for (auto cg : split_unready_groups()) { + if (cg->async_gate().is_closed()) { + continue; + } + auto holder = cg->async_gate().hold(); + co_await cg->flush(); + // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. + co_await cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); + co_await cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); + } } lw_shared_ptr storage_group::make_sstable_set() const { - if (!splitting_mode()) { + if (_split_ready_groups.empty() && _merging_groups.empty()) { return _main_cg->make_sstable_set(); } const auto& schema = _main_cg->_t.schema(); std::vector> underlying; - underlying.reserve(1 + _split_ready_groups.size()); + underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size()); underlying.emplace_back(_main_cg->make_sstable_set()); + for (const auto& cg : _merging_groups) { + if (!cg->empty()) { + underlying.emplace_back(cg->make_sstable_set()); + } + } for (const auto& cg : _split_ready_groups) { underlying.emplace_back(cg->make_sstable_set()); } @@ -1141,7 +1205,9 @@ future<> table::parallel_foreach_compaction_group(std::function(compact void table::for_each_compaction_group(std::function action) { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { - action(*cg); + if (auto holder = try_hold_gate(cg->async_gate())) { + action(*cg); + } }); }); } @@ -1149,7 +1215,9 @@ void table::for_each_compaction_group(std::function act void table::for_each_compaction_group(std::function action) const { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { - action(*cg); + if (auto holder = try_hold_gate(cg->async_gate())) { + action(*cg); + } }); }); } @@ -1803,6 +1871,35 @@ compaction_group::delete_unused_sstables(sstables::compaction_completion_desc de return delete_sstables_atomically(std::move(sstables_to_remove)); } +std::vector compaction_group::all_sstables() const { + std::vector all; + auto main_sstables = _main_sstables->all(); + auto maintenance_sstables = _maintenance_sstables->all(); + all.reserve(main_sstables->size() + maintenance_sstables->size()); + std::ranges::copy(*main_sstables, std::back_inserter(all)); + std::ranges::copy(*maintenance_sstables, std::back_inserter(all)); + return all; +} + +future<> +compaction_group::merge_sstables_from(compaction_group& group) { + auto& cs = _t.get_compaction_strategy(); + auto permit = co_await seastar::get_units(_t._sstable_set_mutation_sem, 1); + table::sstable_list_builder builder(std::move(permit)); + + auto sstables_to_merge = group.all_sstables(); + // re-build new list for this group with sstables of the group being merged. + auto res = co_await builder.build_new_list(*main_sstables(), cs.make_sstable_set(_t.schema()), sstables_to_merge, {}); + // execute: + std::invoke([&] noexcept { + set_main_sstables(std::move(res.new_sstable_set)); + group.clear_sstables(); + // FIXME: backlog adjustment is not exception safe. + backlog_tracker_adjust_charges({}, sstables_to_merge); + }); + _t.rebuild_statistics(); +} + future<> compaction_group::update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc) { // Build a new list of _sstables: We remove from the existing list the @@ -2393,7 +2490,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca // Stop the released main compaction groups asynchronously future<> stop_fut = make_ready_future<>(); for (auto& [id, sg] : _storage_groups) { - if (!sg->main_compaction_group()->empty()) { + if (!sg->split_unready_groups_are_empty()) { on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ "therefore groups cannot be remapped with the new tablet count.", id, table_id)); @@ -2428,6 +2525,79 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca return stop_fut; } +future<> tablet_storage_group_manager::merge_completion_fiber() { + co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group); + + while (!_t.async_gate().is_closed()) { + try { + co_await for_each_storage_group_gently([] (storage_group& sg) -> future<> { + auto main_group = sg.main_compaction_group(); + for (auto& group : sg.merging_groups()) { + // Synchronize with ongoing writes that might be blocked waiting for memory. + // Also, disabling compaction provides stability on the sstable set. + co_await group->stop("tablet merge"); + // Flushes memtable, so all the data can be moved. + co_await group->flush(); + co_await main_group->merge_sstables_from(*group); + } + co_await sg.remove_empty_merging_groups(); + }); + } catch (...) { + tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name()); + } + utils::get_local_injector().inject("replica_merge_completion_wait", [] () { + tlogger.info("Merge completion fiber finished, about to sleep"); + }); + co_await _merge_completion_event.wait(); + tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name()); + } +} + +future<> tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) { + auto table_id = schema()->id(); + size_t old_tablet_count = old_tmap.tablet_count(); + size_t new_tablet_count = new_tmap.tablet_count(); + storage_group_map new_storage_groups; + + unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count); + unsigned merge_size = 1 << log2_reduce_factor; + + if (merge_size != 2) { + throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}", + old_tablet_count, new_tablet_count, table_id)); + } + + for (auto& [id, sg] : _storage_groups) { + // Pick first (even) tablet of each sibling pair. + if (id % merge_size != 0) { + continue; + } + auto new_tid = id >> log2_reduce_factor; + + auto new_cg = make_lw_shared(_t, new_tid, new_tmap.get_token_range(locator::tablet_id(new_tid))); + auto new_sg = make_lw_shared(std::move(new_cg)); + + for (unsigned i = 0; i < merge_size; i++) { + auto group_id = id + i; + + auto it = _storage_groups.find(group_id); + if (it == _storage_groups.end()) { + throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id)); + } + auto& sg = it->second; + sg->for_each_compaction_group([&new_sg, new_tid] (const compaction_group_ptr& cg) { + cg->update_id(new_tid); + new_sg->add_merging_group(cg); + }); + } + + new_storage_groups[new_tid] = std::move(new_sg); + } + _storage_groups = std::move(new_storage_groups); + _merge_completion_event.signal(); + return make_ready_future<>(); +} + future<> tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) { auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id()); auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map); @@ -2439,6 +2609,11 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); co_return; + } else if (new_tablet_count < old_tablet_count) { + tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets", + schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); + co_await handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map); + co_return; } // Allocate storage group if tablet is migrating in. @@ -2846,9 +3021,7 @@ size_t compaction_group::memtable_count() const noexcept { } size_t storage_group::memtable_count() const noexcept { - auto memtable_count = [] (const compaction_group_ptr& cg) { return cg ? cg->memtable_count() : 0; }; - return memtable_count(_main_cg) + - std::ranges::fold_left(_split_ready_groups | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{}); + return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{}); } future<> table::flush(std::optional pos) { @@ -3801,6 +3974,9 @@ future<> storage_group::stop(sstring reason) noexcept { co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) { return cg_ptr->stop(reason); }); + co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) { + return cg_ptr->stop(reason); + }); co_await std::move(closed_gate_fut); } diff --git a/scripts/tablet-mon.py b/scripts/tablet-mon.py index ec8be748dd..4f7eadd2dd 100755 --- a/scripts/tablet-mon.py +++ b/scripts/tablet-mon.py @@ -435,8 +435,17 @@ def update_from_cql(initial=False): changed = True tablets_by_shard = set() + tablet_id_by_table = {} + + def tablet_id_for_table(table_id): + if table_id not in tablet_id_by_table: + tablet_id_by_table[table_id] = 0 + ret = tablet_id_by_table[table_id] + tablet_id_by_table[table_id] += 1 + return ret + for tablet in session.execute(tablets_query): - id = (tablet.table_id, tablet.last_token) + id = (tablet.table_id, tablet.last_token, tablet_id_for_table(tablet.table_id)) replicas = set(tablet.replicas) new_replicas = set(tablet.new_replicas) if tablet.new_replicas else replicas @@ -540,6 +549,7 @@ window_width = min(window_width, 3000) window_height = min(window_height, 2000) window = pygame.display.set_mode((window_width, window_height), pygame.RESIZABLE) pygame.display.set_caption('Tablets') +number_font = pygame.font.SysFont(None, 20) def draw_tablet(tablet, x, y): tablet.x = x @@ -567,6 +577,11 @@ def draw_tablet(tablet, x, y): border_top_left_radius=tablet_radius, border_top_right_radius=tablet_radius) + number_text = str(tablet.id[2]) + number_image = number_font.render(number_text, True, BLACK) + window.blit(number_image, (x + tablet_frame_size + (w - number_image.get_width()) / 2, + y + tablet_frame_size + (h-1 - number_image.get_height()) / 2)) + def draw_node_frame(x, y, x2, y2, color): pygame.draw.rect(window, color, (x, y, x2 - x, y2 - y), node_frame_thickness, border_radius=tablet_radius + tablet_frame_size + node_frame_mid) diff --git a/service/storage_service.cc b/service/storage_service.cc index 9c647da19b..04eb1eb08b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -743,7 +743,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) { [[fallthrough]]; case topology::transition_state::tablet_migration: [[fallthrough]]; - case topology::transition_state::tablet_split_finalization: + case topology::transition_state::tablet_resize_finalization: [[fallthrough]]; case topology::transition_state::commit_cdc_generation: [[fallthrough]]; @@ -5416,7 +5416,11 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep sleep = true; } if (sleep) { - co_await split_retry.retry(_group0_as); + try { + co_await split_retry.retry(_group0_as); + } catch (...) { + slogger.warn("Sleep in split monitor failed with {}", std::current_exception()); + } } } } diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index b5a3f443ac..18d35a184b 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -15,10 +15,12 @@ #include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/stall_free.hh" +#include "utils/overloaded_functor.hh" #include "db/config.hh" #include "locator/load_sketch.hh" #include "replica/database.hh" #include "gms/feature_service.hh" +#include #include #include #include @@ -150,8 +152,53 @@ struct migration_badness { bool operator<=>(const migration_badness& other) const = default; }; +struct colocated_tablets { + global_tablet_id left_tablet; + global_tablet_id right_tablet; + + auto operator<=>(const colocated_tablets&) const = default; +}; + +// Represents either a single tablet replica or co-located replicas of sibling +// tablets. The migration tablet set is logically treated by balancer as a single +// candidate. When candidate represents co-located replicas, it means that +// the balancer will work to preserve the co-location by migrating those replicas +// to same destination. +struct migration_tablet_set { + std::variant tablet_s; + + table_id table() const { + return std::visit( + overloaded_functor{ + [](global_tablet_id t) { return t.table; }, + [](colocated_tablets t) { return t.left_tablet.table; }, + }, + tablet_s); + } + + using tablet_small_vector = utils::small_vector; + + tablet_small_vector tablets() const { + return std::visit( + overloaded_functor{ + [](global_tablet_id t) { + return tablet_small_vector{t}; }, + [](colocated_tablets t) { + return tablet_small_vector{t.left_tablet, t.right_tablet}; + }, + }, + tablet_s); + } + + bool colocated() const { + return std::holds_alternative(tablet_s); + } + + auto operator<=>(const migration_tablet_set&) const = default; +}; + struct migration_candidate { - global_tablet_id tablet; + migration_tablet_set tablets; tablet_replica src; tablet_replica dst; migration_badness badness; @@ -167,11 +214,22 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(const service::migration_tablet_set& tablet_set, FormatContext& ctx) const { + if (tablet_set.colocated()) { + return fmt::format_to(ctx.out(), "{{colocated: {}}}", tablet_set.tablets()); + } + return fmt::format_to(ctx.out(), "{}", tablet_set.tablets().front()); + } +}; + template<> struct fmt::formatter : fmt::formatter { template auto format(const service::migration_candidate& candidate, FormatContext& ctx) const { - fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablet, candidate.src, + fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablets, candidate.src, candidate.dst, candidate.badness); if (candidate.badness.is_bad()) { fmt::format_to(ctx.out(), " (bad!)"); @@ -181,6 +239,27 @@ struct fmt::formatter : fmt::formatter +struct hash { + size_t operator()(const colocated_tablets& id) const { + return utils::hash_combine(std::hash()(id.left_tablet), + std::hash()(id.right_tablet)); + } +}; + +template <> +struct hash { + size_t operator()(const migration_tablet_set& tablet_set) const { + return std::hash()(tablet_set.tablet_s); + } +}; + +} + namespace service { /// The algorithm aims to equalize tablet count on each shard. @@ -270,6 +349,8 @@ class load_balancer { // It's an average per-shard load in terms of tablet count. using load_type = double; + using table_candidates_map = std::unordered_map>; + struct shard_load { size_t tablet_count = 0; @@ -284,9 +365,9 @@ class load_balancer { // Tablets which still have a replica on this shard which are candidates for migrating away from this shard. // Grouped by table. Used when _use_table_aware_balancing == true. // The set of candidates per table may be empty. - std::unordered_map> candidates; + table_candidates_map candidates; // For all tables. Used when _use_table_aware_balancing == false. - std::unordered_set candidates_all_tables; + std::unordered_set candidates_all_tables; future<> clear_gently() { co_await utils::clear_gently(candidates); @@ -313,7 +394,7 @@ class load_balancer { struct skipped_candidate { tablet_replica replica; - global_tablet_id tablet; + migration_tablet_set tablets; std::unordered_set viable_targets; }; @@ -423,8 +504,16 @@ class load_balancer { std::vector tables_being_resized; static bool table_needs_merge(const table_size_desc& d) { - // FIXME: ignore merge request if tablet_count == initial_tablets. - return d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); + // The initial_tablet_count is respected while the table is in "growing mode". + // We say that a table leaves this mode if it required a split above the initial + // tablet count. After that, we can rely purely on the average size to say that + // a table is shrinking and requires merge. + // FIXME: this is not perfect and we may want to leave the mode too if we detect + // average size is decreasing significantly, before any split happened. + bool left_growing_mode = !d.resize_decision.initial_decision(); + lblogger.debug("table_needs_merge: tablet_count={}, avg_tablet_size={}, left_growing_mode={} (seq number: {})", + d.tablet_count, d.avg_tablet_size, left_growing_mode, d.resize_decision.sequence_number); + return left_growing_mode && d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); } static bool table_needs_split(const table_size_desc& d) { return d.avg_tablet_size > d.target_max_tablet_size; @@ -541,6 +630,12 @@ private: return trinfo ? trinfo->next : ti.replicas; } + tablet_replica_set sorted_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { + auto set = get_replicas_for_tablet_load(ti, trinfo); + std::ranges::sort(set, std::less()); + return set; + } + // Whether to count the tablet as putting streaming load on the system. // Tablets which are streaming or are yet-to-stream are counted. bool is_streaming(const tablet_transition_info* trinfo) { @@ -574,6 +669,26 @@ private: on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); } + using migration_vector = migration_plan::migrations_vector; + static migration_vector + get_migration_info(const migration_tablet_set& tablet_set, tablet_transition_kind kind, tablet_replica src, tablet_replica dst) { + migration_vector infos; + for (auto tablet : tablet_set.tablets()) { + infos.push_back(tablet_migration_info{kind, tablet, src, dst}); + } + return infos; + } + + using migration_streaming_info_vector = utils::small_vector; + static migration_streaming_info_vector + get_migration_streaming_infos(const locator::topology& topology, const tablet_map& tmap, const migration_vector& infos) { + migration_streaming_info_vector streaming_infos; + for (auto& info : infos) { + auto& ti = tmap.get_tablet_info(info.tablet.tablet); + streaming_infos.push_back(get_migration_streaming_info(topology, ti, info)); + } + return streaming_infos; + } public: load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, std::unordered_set skiplist) : _target_tablet_size(target_tablet_size) @@ -598,7 +713,8 @@ public: // Make plans for repair jobs plan.set_repair_plan(co_await make_repair_plan(plan)); - plan.set_resize_plan(co_await make_resize_plan()); + // Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones. + plan.merge_resize_plan(co_await make_resize_plan(plan)); lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s)", plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count()); @@ -766,7 +882,183 @@ public: co_return ret; } - future make_resize_plan() { + // Returns true if a table has replicas of all its sibling tablets co-located. + // This is used for determining whether merge can be finalized, since co-location + // is a strict requirement for sibling tablets to be merged. + future all_sibling_tablet_replicas_colocated(table_id table, const tablet_map& tmap) { + bool all_colocated = true; + co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { + // FIXME: introduce variant of for_each_sibling_tablets() that accepts stop_iteration. + if (!all_colocated) { + return make_ready_future<>(); + } + + if (!t2_opt) { + on_internal_error(lblogger, format("Unable to find sibling tablet during co-location check for table {}", table)); + } + auto t2 = *t2_opt; + + // Sibling tablets cannot be considered co-located if their tablet info is temporarily unmergeable. + // It can happen either has active repair task for example. + all_colocated &= bool(merge_tablet_info(*t1.info, *t2.info)); + return make_ready_future<>(); + }); + if (all_colocated) { + lblogger.info("All sibling tablets are co-located for table {}", table); + } + co_return all_colocated; + } + + future make_merge_colocation_plan(node_load_map& nodes) { + migration_plan plan; + table_resize_plan resize_plan; + + auto can_proceed_with_colocation = [this] (table_id tid, const locator::tablet_map& tmap) { + // FIXME: tables with views aren't supported yet. See: https://github.com/scylladb/scylladb/issues/17265. + return tmap.needs_merge() && _db.column_family_exists(tid) && _db.find_column_family(tid).views().empty(); + }; + + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = *tmap_; + if (!can_proceed_with_colocation(table, tmap)) { + continue; + } + + // Also filter out replicas that don't belong to the DC being worked on. + auto get_replicas = [this, &nodes] (const tablet_desc& t) { + auto ret = sorted_replicas_for_tablet_load(*t.info, t.transition); + const auto [first, last] = std::ranges::remove_if(ret, [&] (tablet_replica r) { return !nodes.contains(r.host); }); + ret.erase(first, last); + return ret; + }; + + auto migrating = [] (const tablet_desc& t) { + return bool(t.transition); + }; + + auto first_non_matching_replicas = [] (tablet_replica_set r1, tablet_replica_set r2) -> std::optional> { + assert(r1.size() == r2.size()); + // Subtract intersecting (co-located) elements from the replicas set of sibling tablets. + // Think for example that tablet 0 and 1 have replicas [n2, n4] and [n1, n2] respectively. + // After subtraction, replica of tablet 1 in n1 will be a candidate for co-location with + // replica of tablet 0 in n4. + std::unordered_set intersection; + std::ranges::set_intersection(r1, r2, std::inserter(intersection, intersection.begin())); + const auto [r1_first, r1_last] = std::ranges::remove_if(r1, [&] (tablet_replica r) { return intersection.contains(r); }); + r1.erase(r1_first, r1_last); + const auto [r2_first, r2_last] = std::ranges::remove_if(r2, [&] (tablet_replica r) { return intersection.contains(r); }); + r2.erase(r2_first, r2_last); + // Favor replicas of different tablets that belong to same node. For example: + // tablet 0 replicas: [n2:s1, n3:s0] + // tablet 1 replicas: [n1:s0, n2:s0] + // Replica in n1:s0 cannot follow sibling replica in n2:s1. Otherwise, RF invariant is broken. + // Instead, tablet 1 in n2:s0 will be co-located with tablet 0 in n2:s1. + std::unordered_map r1_map; + std::ranges::transform(r1, std::inserter(r1_map, r1_map.begin()), [] (tablet_replica r) { + return std::make_pair(r.host, r); + }); + for (unsigned i = 0; i < r2.size(); i++) { + auto r1_it = r1_map.find(r2[i].host); + if (r1_it != r1_map.end()) { + return std::make_pair(r1_it->second, r2[i]); + } + } + // Since sets had intersection subtracted, the remaining replicas are certainly not co-located. + if (r1.size() > 0) { + return std::make_pair(r1[0], r2[0]); + } + return std::nullopt; + }; + + auto create_migration_info = [] (global_tablet_id gid, tablet_replica src, tablet_replica dst) { + auto kind = (src.host != dst.host) ? tablet_transition_kind::migration : tablet_transition_kind::intranode_migration; + return tablet_migration_info{kind, gid, src, dst}; + }; + + co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { + // Be optimistic about migrating tablets, as if they succeeded. + // Merge finalization will have to recheck that all sibling tablets are co-located. + + if (!t2_opt) { + on_internal_error(lblogger, format("Unable to find sibling tablet during co-location, with tablet count {}, for table {}", + tmap.tablet_count(), table)); + } + auto t2 = *t2_opt; + + auto r1 = get_replicas(t1); + auto r2 = get_replicas(t2); + if (r1 == r2) { + return make_ready_future<>(); + } + auto t1_id = global_tablet_id{table, t1.tid}; + auto t2_id = global_tablet_id{table, t2.tid}; + + if (migrating(t1) || migrating(t2)) { + return make_ready_future<>(); + } + // During RF change, tablets may have incrementally replicas allocated / deallocated to them. + // Let's temporarily delay their co-location until their replica sets have the same size. + if (r1.size() != r2.size()) { + lblogger.warn("Replica sets of tablets to be co-located differ in size: ({}: {}), ({}, {})", + t1_id, r1, t2_id, r2); + return make_ready_future<>(); + } + + // Returns true if moving candidate into dst will violate replication constraint. + const auto r2_hosts = r2 + | std::views::transform(std::mem_fn(&locator::tablet_replica::host)) + | std::ranges::to>(); + auto check_constraints = [r2_hosts = std::move(r2_hosts)] (tablet_replica src, tablet_replica dst) { + // handles intra-node migration. + if (src.host == dst.host && src.shard != dst.shard) { + return false; + } + return r2_hosts.contains(dst.host); + }; + + lblogger.debug("Replica sets of tablets being co-located: ({}: {}), ({}, {})", t1_id, r1, t2_id, r2); + + auto ret = first_non_matching_replicas(r1, r2); + if (!ret) { + // this shouldn't happen in practice, since the above call should always produce a pair of + // replicas to co-locate, since we only got here if the sibling tablets aren't fully co-located. + on_internal_error(lblogger, format("Unable to find replicas to co-locate for sibling tablets ({}: {}), and ({}, {})", + t1_id, r1, t2_id, r2)); + } + + // Emits migration for replica of t2 to co-habit same shard as replica of t1. + auto src = ret->second; + auto dst = ret->first; + + // If migration will violate replication constraint, skip to next pair of replicas of sibling tablets. + auto skip = check_constraints(src, dst); + if (skip) { + lblogger.debug("Replication constraint check failed, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", + t2_id, src, t1_id, dst); + return make_ready_future<>(); + } + + auto mig = create_migration_info(t2_id, src, dst); + auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), *t2.info, mig); + if (!can_accept_load(nodes, mig_streaming_info)) { + // FIXME: we can try another pair of non-colocated replicas of same sibling tablets. + lblogger.debug("Load limit reached, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", + t2_id, src, t1_id, dst); + return make_ready_future<>(); + } + apply_load(nodes, mig_streaming_info); + + lblogger.info("Created migration for replica ({}, {}) to co-habit same shard as ({}, {})", t2_id, src, t1_id, dst); + plan.add(std::move(mig)); + return make_ready_future<>(); + }); + } + plan.merge_resize_plan(std::move(resize_plan)); + + co_return std::move(plan); + } + + future make_resize_plan(const migration_plan& plan) { table_resize_plan resize_plan; if (!_tm->tablets().balancing_enabled()) { @@ -863,13 +1155,21 @@ public: continue; } - // If all replicas have completed split work for the current sequence number, it means that - // load balancer can emit finalize decision, for split to be completed. - if (table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + auto finalize_decision = [&] { _stats.for_cluster().resizes_finalized++; resize_plan.finalize_resize.insert(table); + }; + + // If all replicas have completed split work for the current sequence number, it means that + // load balancer can emit finalize decision, for split to be completed. + if (tmap.needs_split() && table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + finalize_decision(); lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", table, table_stats->split_ready_seq_number); + // If all sibling tablets are co-located across all DCs, then merge can be finalized. + } else if (tmap.needs_merge() && co_await all_sibling_tablet_replicas_colocated(table, tmap) && !bypass_merge_completion()) { + finalize_decision(); + lblogger.info("Finalizing resize decision for table {} as all replicas are co-located", table); } } @@ -889,6 +1189,12 @@ public: } } + void apply_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { + for (auto& info : infos) { + apply_load(nodes, info); + } + } + bool can_accept_load(node_load_map& nodes, const tablet_migration_streaming_info& info) { for (auto r : info.read_from) { if (!nodes.contains(r.host)) { @@ -913,10 +1219,26 @@ public: return true; } + // Precondition: all migration streaming info have same source and destination. + // FIXME: remove precondition but it's not easy without copying noad_load_map. + bool can_accept_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { + // Since all migration info have the same source and destination, the load check can be easily done + // by informing the number of migrations. + auto info = infos[0]; + info.stream_weight = infos.size(); + return can_accept_load(nodes, info); + } + bool in_shuffle_mode() const { return utils::get_local_injector().enter("tablet_allocator_shuffle"); } + // If cluster cannot agree on tablet merge feature, then merge will not be finalized since + // not all nodes in the cluster can handle the finalization step. + bool bypass_merge_completion() const { + return !_db.features().tablet_merge || utils::get_local_injector().enter("tablet_merge_completion_bypass"); + } + size_t rand_int() const { static thread_local std::default_random_engine re{std::random_device{}()}; static thread_local std::uniform_int_distribution dist; @@ -927,7 +1249,7 @@ public: return rand_int() % shard_count; } - table_id pick_table(const std::unordered_map>& candidates) { + table_id pick_table(const table_candidates_map& candidates) { if (!_use_table_aware_balancing) { on_internal_error(lblogger, "pick_table() called when table-aware balancing is disabled"); } @@ -945,7 +1267,7 @@ public: on_internal_error(lblogger, "No candidate table"); } - global_tablet_id peek_candidate(shard_load& shard_info) { + migration_tablet_set peek_candidate(shard_load& shard_info) { if (_use_table_aware_balancing) { auto table = pick_table(shard_info.candidates); return *shard_info.candidates[table].begin(); @@ -1065,22 +1387,52 @@ public: co_return *best_candidate; } - void erase_candidate(shard_load& shard_info, global_tablet_id tablet) { + void erase_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { - shard_info.candidates[tablet.table].erase(tablet); - if (shard_info.candidates[tablet.table].empty()) { - shard_info.candidates.erase(tablet.table); + auto table = tablets.table(); + shard_info.candidates[table].erase(tablets); + if (shard_info.candidates[table].empty()) { + shard_info.candidates.erase(table); } } else { - shard_info.candidates_all_tables.erase(tablet); + shard_info.candidates_all_tables.erase(tablets); } } - void add_candidate(shard_load& shard_info, global_tablet_id tablet) { + void maybe_erase_colocated_candidate(shard_load& shard_info, const tablet_map& tmap, global_tablet_id tablet) { + if (!tmap.needs_merge()) { + return; + } + auto siblings = tmap.sibling_tablets(tablet.tablet); + if (!siblings) { + on_internal_error(lblogger, format("Unable to find sibling tablet of {} during merge", tablet)); + } + auto left_sibling = global_tablet_id{tablet.table, siblings->first}; + auto right_sibling = global_tablet_id{tablet.table, siblings->second}; + erase_candidate(shard_info, migration_tablet_set{colocated_tablets{left_sibling, right_sibling}}); + } + + void erase_candidates(node_load_map& nodes, const tablet_map& tmap, const migration_tablet_set& tablets) { + // FIXME: indentation. + for (auto tablet : tablets.tablets()) { + auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); + for (auto&& r : src_tinfo.replicas) { + if (nodes.contains(r.host)) { + lblogger.trace("Erasing tablet {} from {}", tablet, r); + // Not necessarily all replicas of sibling tablets are co-located, and so we need to + // remove them from candidate list using global_tablet_id. + erase_candidate(nodes[r.host].shards[r.shard], migration_tablet_set{tablet}); + maybe_erase_colocated_candidate(nodes[r.host].shards[r.shard], tmap, tablet); + } + } + } + } + + void add_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { - shard_info.candidates[tablet.table].insert(tablet); + shard_info.candidates[tablets.table()].insert(tablets); } else { - shard_info.candidates_all_tables.insert(tablet); + shard_info.candidates_all_tables.insert(tablets); } } @@ -1094,7 +1446,7 @@ public: // The assumption is that the algorithm moves tablets from more loaded nodes to less loaded nodes, // so convergence is reached where the node we picked as source has lower load, or will have lower // load post-movement, than the node we picked as the destination. - bool check_convergence(node_load& src_info, node_load& dst_info) { + bool check_convergence(node_load& src_info, node_load& dst_info, unsigned delta = 1) { // Allow migrating only from candidate nodes which have higher load than the target. if (src_info.avg_load <= dst_info.avg_load) { lblogger.trace("Load inversion: src={} (avg_load={}), dst={} (avg_load={})", @@ -1103,8 +1455,8 @@ public: } // Prevent load inversion post-movement which can lead to oscillations. - if (src_info.get_avg_load(src_info.tablet_count - 1) < - dst_info.get_avg_load(dst_info.tablet_count + 1)) { + if (src_info.get_avg_load(src_info.tablet_count - delta) < + dst_info.get_avg_load(dst_info.tablet_count + delta)) { lblogger.trace("Load inversion post-movement: src={} (avg_load={}), dst={} (avg_load={})", src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load); return false; @@ -1113,6 +1465,75 @@ public: return true; } + bool check_convergence(node_load& src_info, node_load& dst_info, const migration_tablet_set& tablet_set) { + return check_convergence(src_info, dst_info, tablet_set.tablets().size()); + } + + // Checks whether moving a tablet from shard A to B (intra-node) would go against convergence. + // Returns false if the tablet should not be moved, and true if it may be moved. + bool check_convergence(const shard_load& src_info, const shard_load& dst_info, unsigned delta = 1) { + return src_info.tablet_count > dst_info.tablet_count + delta; + } + + bool check_convergence(const shard_load& src_info, const shard_load& dst_info, const migration_tablet_set& tablet_set) { + return check_convergence(src_info, dst_info, tablet_set.tablets().size()); + } + + // Adjusts the load of the source and destination shards in the host where intra-node migration happens. + void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, global_tablet_id tablet) { + auto& src_info = node_load.shards[src]; + auto& dst_info = node_load.shards[dst]; + dst_info.tablet_count++; + src_info.tablet_count--; + dst_info.tablet_count_per_table[tablet.table]++; + src_info.tablet_count_per_table[tablet.table]--; + } + + // Adjusts the load of the source and destination (host:shard) that were picked for the migration. + void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, global_tablet_id source_tablet) { + { + auto& target_info = nodes[dst.host]; + target_info.shards[dst.shard].tablet_count++; + target_info.shards[dst.shard].tablet_count_per_table[source_tablet.table]++; + target_info.tablet_count_per_table[source_tablet.table]++; + target_info.tablet_count += 1; + target_info.update(); + } + + auto& src_node_info = nodes[src.host]; + auto& src_shard_info = src_node_info.shards[src.shard]; + src_shard_info.tablet_count -= 1; + src_shard_info.tablet_count_per_table[source_tablet.table]--; + src_node_info.tablet_count_per_table[source_tablet.table]--; + + src_node_info.tablet_count -= 1; + src_node_info.update(); + } + + void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, const migration_tablet_set& tablet_set) { + for (auto tablet : tablet_set.tablets()) { + update_node_load_on_migration(node_load, host, src, dst, tablet); + } + } + + void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) { + for (auto tablet : tablet_set.tablets()) { + update_node_load_on_migration(nodes, src, dst, tablet); + } + } + + static void unload(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { + for (auto _ : tablet_set.tablets()) { + sketch.unload(host, shard); + } + } + + static void pick(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { + for (auto _ : tablet_set.tablets()) { + sketch.pick(host, shard); + } + } + future make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) { migration_plan plan; const tablet_metadata& tmeta = _tm->tablets(); @@ -1174,7 +1595,7 @@ public: // Convergence check // When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit. - if (!shuffle && (src == dst || src_info.tablet_count <= dst_info.tablet_count + 1)) { + if (!shuffle && (src == dst || !check_convergence(src_info, dst_info))) { lblogger.debug("Node {} is balanced", host); break; } @@ -1188,15 +1609,20 @@ public: } auto candidate = co_await peek_candidate(nodes, src_info, tablet_replica{host, src}, tablet_replica{host, dst}); - auto tablet = candidate.tablet; + auto tablets = candidate.tablets; + + // Recheck convergence to avoid oscillations if co-located tablets are being migrated together. + if (!shuffle && (src == dst || !check_convergence(src_info, dst_info, tablets))) { + lblogger.debug("Node {} is balanced", host); + break; + } // Emit migration. - auto mig = tablet_migration_info {tablet_transition_kind::intranode_migration, tablet, - tablet_replica{host, src}, tablet_replica{host, dst}}; - auto& tmap = tmeta.get_tablet_map(tablet.table); - auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); - auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), src_tinfo, mig); + auto mig = get_migration_info(tablets, tablet_transition_kind::intranode_migration, + tablet_replica{host, src}, tablet_replica{host, dst}); + auto& tmap = tmeta.get_tablet_map(tablets.table()); + auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig); if (!can_accept_load(nodes, mig_streaming_info)) { _stats.for_dc(node_load.dc()).migrations_skipped++; @@ -1210,16 +1636,9 @@ public: _stats.for_dc(node_load.dc()).intranode_migrations_produced++; plan.add(std::move(mig)); - for (auto&& r : src_tinfo.replicas) { - if (nodes.contains(r.host)) { - erase_candidate(nodes[r.host].shards[r.shard], tablet); - } - } + erase_candidates(nodes, tmap, tablets); - dst_info.tablet_count++; - src_info.tablet_count--; - dst_info.tablet_count_per_table[tablet.table]++; - src_info.tablet_count_per_table[tablet.table]--; + update_node_load_on_migration(node_load, host, src, dst, tablets); sketch.pick(host, dst); sketch.unload(host, src); } @@ -1325,6 +1744,65 @@ public: return std::nullopt; } + // Verifies if moving a given tablet from src_info.id to dst_info.id would not violate + // replication constraints (no increase in replica co-location on nodes, racks). + // Returns std::nullopt if it does not and the movement is allowed. + // + // The contraints might not be the same for two sibling tablets that have co-located + // replicas. + // Example: + // nodes = {A, B, C, D} + // tablet1 = {A, B, C} + // tablet2 = {A, B, D} + // viable target for {tablet1, B} is D. + // viable target for {tablet2, B} is C. + // + // When co-located replicas share a viable target, then a migration can be emitted to + // preserve co-location. + // To allow decommission when co-located replicas don't share a viable target, a skip + // info will be returned for each tablet, even though that means breaking this + // co-location. Decommission is higher in priority. + using skip_info_vector = std::vector>; + std::optional + check_constraints(node_load_map& nodes, + const locator::tablet_map& tmap, + node_load& src_info, + node_load& dst_info, + migration_tablet_set tablet_set, + bool need_viable_targets) { + std::unordered_map viable_targets_count; + std::unordered_map skip_info_per_tablet; + std::unordered_set shared_viable_targets; + const size_t tablet_count = tablet_set.tablets().size(); + + for (auto tablet : tablet_set.tablets()) { + auto skip = check_constraints(nodes, tmap, src_info, dst_info, tablet, need_viable_targets); + if (!skip) { + continue; + } + for (const auto& target : skip->viable_targets) { + // A viable target is considered shared if all candidates share that same viable target. + if (++viable_targets_count[target] == tablet_count) { + shared_viable_targets.insert(target); + } + } + skip_info_per_tablet.emplace(std::make_pair(tablet, std::move(*skip))); + } + if (skip_info_per_tablet.empty()) { + return std::nullopt; + } + if (!shared_viable_targets.empty()) { + return skip_info_vector{std::make_pair(skip_info{std::move(shared_viable_targets)}, std::move(tablet_set))}; + } + + skip_info_vector skip_infos; + skip_infos.reserve(skip_info_per_tablet.size()); + for (auto&& [tablet, info] : skip_info_per_tablet) { + skip_infos.push_back(std::make_pair(std::move(info), migration_tablet_set{std::move(tablet)})); + } + return skip_infos; + } + // Picks best tablet replica to move and its new destination. // The destination host is picked among nodes_by_load_dst, with dst being the preferred destination. // @@ -1369,9 +1847,9 @@ public: auto get_candidate = [this, drain_skipped, &nodes, &src_node_info] (tablet_replica src, tablet_replica dst) -> future { if (drain_skipped) { - auto source_tablet = src_node_info.skipped_candidates.back().tablet; - auto badness = evaluate_candidate(nodes, source_tablet.table, src, dst); - co_return migration_candidate{source_tablet, src, dst, badness}; + auto source_tablets = src_node_info.skipped_candidates.back().tablets; + auto badness = evaluate_candidate(nodes, source_tablets.table(), src, dst); + co_return migration_candidate{source_tablets, src, dst, badness}; } else { auto&& src_shard_info = src_node_info.shards[src.shard]; co_return co_await peek_candidate(nodes, src_shard_info, src, dst); @@ -1382,7 +1860,7 @@ public: // Given src as the source replica, evaluate all destinations. // Updates min_candidate with the best candidate, if better is found. - auto evaluate_targets = [&] (global_tablet_id tablet, tablet_replica src, migration_badness src_badness) -> future<> { + auto evaluate_targets = [&] (migration_tablet_set tablets, tablet_replica src, migration_badness src_badness) -> future<> { migration_badness min_dst_badness; std::optional min_dst_host; std::vector best_hosts; @@ -1393,11 +1871,11 @@ public: auto& new_target_info = nodes[new_target]; // Skip movements which may harm convergence. - if (!src_node_info.drained && !check_convergence(src_node_info, new_target_info)) { + if (!src_node_info.drained && !check_convergence(src_node_info, new_target_info, tablets)) { continue; } - auto badness = evaluate_dst_badness(nodes, tablet.table, tablet_replica{new_target, 0}); + auto badness = evaluate_dst_badness(nodes, tablets.table(), tablet_replica{new_target, 0}); if (!min_dst_host || badness.dst_node_badness < min_dst_badness.dst_node_badness) { min_dst_badness = badness; min_dst_host = new_target; @@ -1421,7 +1899,8 @@ public: for (shard_id new_dst_shard = 0; new_dst_shard < nodes[host].shard_count; new_dst_shard++) { co_await coroutine::maybe_yield(); auto new_dst = tablet_replica{host, new_dst_shard}; - auto badness = evaluate_dst_badness(nodes, tablet.table, new_dst); + + auto badness = evaluate_dst_badness(nodes, tablets.table(), new_dst); if (!min_dst || badness < min_dst_badness) { min_dst_badness = badness; min_dst = new_dst; @@ -1437,7 +1916,7 @@ public: } auto candidate = migration_candidate{ - tablet, src, *min_dst, + tablets, src, *min_dst, migration_badness{src_badness.shard_badness(), src_badness.node_badness(), min_dst_badness.shard_badness(), @@ -1456,9 +1935,9 @@ public: // Consider better alternatives. if (drain_skipped) { - auto source_tablet = src_node_info.skipped_candidates.back().tablet; - auto badness = evaluate_src_badness(nodes, source_tablet.table, src); - co_await evaluate_targets(source_tablet, src, badness); + auto tablets = src_node_info.skipped_candidates.back().tablets; + auto badness = evaluate_src_badness(nodes, tablets.table(), src); + co_await evaluate_targets(tablets, src, badness); } else { // Find a better candidate. // Consider different tables. For each table, first find the best source shard. @@ -1504,7 +1983,7 @@ public: if (drain_skipped) { src_node_info.skipped_candidates.pop_back(); } else { - erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablet); + erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablets); } // Restore invariants. @@ -1658,7 +2137,7 @@ public: push_back_shard_candidate.cancel(); auto& candidate = src_node_info.skipped_candidates.back(); src = candidate.replica; - lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablet, src, candidate.viable_targets); + lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablets, src, candidate.viable_targets); // When draining, need to narrow down targets to viable targets before choosing the best target. nodes_by_load_dst.clear(); @@ -1707,6 +2186,7 @@ public: // Check convergence conditions. // When draining nodes, disable convergence checks so that all tablets are migrated away. + bool can_check_convergence = !shuffle && nodes_to_drain.empty(); if (!shuffle && nodes_to_drain.empty()) { // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than @@ -1749,28 +2229,41 @@ public: // May choose a different source shard than src.shard or different destination host/shard than dst. auto candidate = co_await pick_candidate(nodes, src_node_info, target_info, src, dst, nodes_by_load_dst, drain_skipped); - auto source_tablet = candidate.tablet; + auto source_tablets = candidate.tablets; src = candidate.src; dst = candidate.dst; - auto& tmap = tmeta.get_tablet_map(source_tablet.table); + auto& tmap = tmeta.get_tablet_map(source_tablets.table()); + // If best candidate is co-located sibling tablets, then convergence is re-checked to avoid oscillations. + if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) { + lblogger.debug("No more candidates. Load would be inverted."); + _stats.for_dc(dc).stop_load_inversion++; + break; + } // Check replication strategy constraints. // When drain_skipped is true, we already picked movement to a viable target. if (!drain_skipped) { - auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablet, src_node_info.drained); - if (skip) { - if (src_node_info.drained && skip->viable_targets.empty()) { - auto replicas = tmap.get_tablet_info(source_tablet.tablet).replicas; + auto process_skip_info = [&] (migration_tablet_set tablets, skip_info skip) { + if (src_node_info.drained && skip.viable_targets.empty()) { + auto tablet = tablets.tablets().front(); + auto replicas = tmap.get_tablet_info(tablet.tablet).replicas; throw std::runtime_error(fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", - source_tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); + tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); + } + lblogger.debug("Adding replica {} of candidate {} to skipped list with the viable targets {}", src, candidate, skip.viable_targets); + src_node_info.skipped_candidates.emplace_back(src, tablets, std::move(skip.viable_targets)); + }; + + auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablets, src_node_info.drained); + if (skip) { + for (auto&& [skip_info, tablets] : *skip) { + process_skip_info(tablets, skip_info); } - src_node_info.skipped_candidates.emplace_back(src, source_tablet, std::move(skip->viable_targets)); continue; } } - if (candidate.badness.is_bad()) { _stats.for_dc(_dc).bad_migrations++; } @@ -1782,11 +2275,10 @@ public: tablet_transition_kind kind = (src_node_info.state() == locator::node::state::being_removed || src_node_info.state() == locator::node::state::left) ? tablet_transition_kind::rebuild : tablet_transition_kind::migration; - auto mig = tablet_migration_info {kind, source_tablet, src, dst}; - auto& src_tinfo = tmap.get_tablet_info(source_tablet.tablet); - auto mig_streaming_info = get_migration_streaming_info(topo, src_tinfo, mig); + auto mig = get_migration_info(source_tablets, kind, src, dst); + auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig); - _load_sketch->pick(dst.host, dst.shard); + pick(*_load_sketch, dst.host, dst.shard, source_tablets); if (can_accept_load(nodes, mig_streaming_info)) { apply_load(nodes, mig_streaming_info); @@ -1809,35 +2301,16 @@ public: } } - for (auto&& r : src_tinfo.replicas) { - if (nodes.contains(r.host)) { - erase_candidate(nodes[r.host].shards[r.shard], source_tablet); - } - } + erase_candidates(nodes, tmap, source_tablets); - { - auto& target_info = nodes[dst.host]; - target_info.shards[dst.shard].tablet_count++; - target_info.shards[dst.shard].tablet_count_per_table[source_tablet.table]++; - target_info.tablet_count_per_table[source_tablet.table]++; - target_info.tablet_count += 1; - target_info.update(); - } - - auto& src_shard_info = src_node_info.shards[src.shard]; - src_shard_info.tablet_count -= 1; - src_shard_info.tablet_count_per_table[source_tablet.table]--; - src_node_info.tablet_count_per_table[source_tablet.table]--; - - src_node_info.tablet_count -= 1; - src_node_info.update(); + update_node_load_on_migration(nodes, src, dst, source_tablets); if (src_node_info.tablet_count == 0) { push_back_node_candidate.cancel(); nodes_by_load.pop_back(); } if (lblogger.is_enabled(seastar::log_level::debug)) { - co_await log_table_load(nodes, source_tablet.table); + co_await log_table_load(nodes, source_tablets.table()); } } @@ -1865,6 +2338,57 @@ public: co_return std::move(plan); } + class sibling_tablets_replicas_processor { + const tablet_desc _t1; + const std::optional _t2; + tablet_replica_set _t1_replicas; + tablet_replica_set _t2_replicas; + tablet_replica_set::iterator _current_t1; + tablet_replica_set::iterator _current_t2; + public: + sibling_tablets_replicas_processor(const tablet_desc t1, const std::optional t2, + tablet_replica_set t1_replicas, tablet_replica_set t2_replicas) + : _t1(std::move(t1)) + , _t2(std::move(t2)) + , _t1_replicas(std::move(t1_replicas)) + , _t2_replicas(std::move(t2_replicas)) + , _current_t1(_t1_replicas.begin()) + , _current_t2(_t2_replicas.begin()) { + } + + using tablet_ids = utils::small_vector; + + // Produces the next replica from sets of sibling tablets. If a given replica has + // the sibling tablets co-located in it, the ids of both tablets will be returned + // for that replica. + // Given replica sets of sibling tablets: + // t1 {A, B, C}, + // t2 {A, C, D}, + // it will yield + // {A, {t1, t2}}, {B, {t1}}, {C, {t1, t2}}, {D, {t2}} + // Invariant: if return value is engaged, size of tablet_ids will be 1 or 2. + std::optional> next_replica() { + if (_current_t1 == _t1_replicas.end() && _current_t2 == _t2_replicas.end()) { + return std::nullopt; + } + if (_current_t1 == _t1_replicas.end()) { + return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); + } + if (_current_t2 == _t2_replicas.end()) { + return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); + } + // Detect co-located replicas of sibling tablets. + if (*_current_t1 == *_current_t2) { + _current_t1++; + return std::make_pair(*_current_t2++, tablet_ids{_t1.tid, _t2->tid}); + } + if (*_current_t1 < *_current_t2) { + return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); + } + return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); + } + }; + future make_plan(dc_name dc) { migration_plan plan; @@ -2042,14 +2566,40 @@ public: for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = *tmap_; uint64_t total_load = 0; - co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) -> future<> { - auto trinfo = tmap.get_tablet_transition_info(tid); - if (is_streaming(trinfo)) { - apply_load(nodes, get_migration_streaming_info(topo, ti, *trinfo)); + auto get_replicas = [this] (std::optional t) -> tablet_replica_set { + return t ? sorted_replicas_for_tablet_load(*t->info, t->transition) : tablet_replica_set{}; + }; + auto migrating = [] (std::optional t) { + return t && bool(t->transition); + }; + auto maybe_apply_load = [&] (std::optional t) { + if (t && is_streaming(t->transition)) { + apply_load(nodes, get_migration_streaming_info(topo, *t->info, *t->transition)); } + }; - for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { + // If a table is undergoing merge, co-located replicas of sibling tablets will be treated as a single migration candidate, + // even though each tablet replica will be migrated independently. Next invocation of load balancer is able to exclude both + // sibling if either haven't finished migration yet. That's to prevent load balancer from incorrectly considering that + // they're not co-located if only one of them completed migration. + co_await tmap.for_each_sibling_tablets([&, table = table] (tablet_desc t1, std::optional t2) -> future<> { + maybe_apply_load(t1); + maybe_apply_load(t2); + + auto t1_replicas = get_replicas(t1); + // If t2 is disengaged, when tablet_count == 1, t2_replicas is empty and so will have no effect + // when adding t1 replicas as candidates. + auto t2_replicas = get_replicas(t2); + + sibling_tablets_replicas_processor processor(t1, t2, std::move(t1_replicas), std::move(t2_replicas)); + + auto get_table_desc = [&] (tablet_id tid) { + return tid == t1.tid ? t1 : t2; + }; + + while (auto next = processor.next_replica()) { + auto& [replica, tids] = *next; if (!nodes.contains(replica.host)) { continue; } @@ -2058,12 +2608,23 @@ public: if (shard_load_info.tablet_count == 0) { node_load_info.shards_by_load.push_back(replica.shard); } - shard_load_info.tablet_count += 1; - shard_load_info.tablet_count_per_table[table]++; - node_load_info.tablet_count_per_table[table]++; - total_load++; - if (!trinfo) { // migrating tablets are not candidates - add_candidate(shard_load_info, global_tablet_id {table, tid}); + shard_load_info.tablet_count += tids.size(); + shard_load_info.tablet_count_per_table[table] += tids.size(); + node_load_info.tablet_count_per_table[table] += tids.size(); + total_load += tids.size(); + if (tmap.needs_merge() && tids.size() == 2) { + // Exclude both sibling tablets if either haven't finished migration yet. That's to prevent balancer from + // un-doing the colocation. + if (!migrating(t1) && !migrating(t2)) { + auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}}; + add_candidate(shard_load_info, migration_tablet_set{std::move(candidate)}); + } + } else { + for (auto tid : tids) { + if (!migrating(get_table_desc(tid))) { // migrating tablets are not candidates + add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tid}}); + } + } } } @@ -2084,6 +2645,12 @@ public: plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain)); } + if (_tm->tablets().balancing_enabled() && plan.empty()) { + auto dc_merge_plan = co_await make_merge_colocation_plan(nodes); + lblogger.info("Prepared {} migrations for co-locating sibling tablets in DC {}", dc_merge_plan.tablet_migration_count(), dc); + plan.merge(std::move(dc_merge_plan)); + } + co_await utils::clear_gently(nodes); co_return std::move(plan); } @@ -2170,7 +2737,7 @@ public: load_balancer_stats_manager& stats() { return _load_balancer_stats; } - +private: // The splitting of tablets today is completely based on the power-of-two constraint. // A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and // (x << 1) + 1. @@ -2198,6 +2765,56 @@ public: co_return std::move(new_tablets); } + // The merging of tablet is completely based on the power-of-two constraint. + // Tablet of ids X and X+1 are merged into new tablet id (X >> 1). + future merge_tablets(token_metadata_ptr tm, table_id table) { + auto& tablets = tm->tablets().get_tablet_map(table); + + tablet_map new_tablets(tablets.tablet_count() / 2); + + for (tablet_id tid : new_tablets.tablet_ids()) { + co_await coroutine::maybe_yield(); + + tablet_id old_left_tid = tablet_id(tid.value() << 1); + tablet_id old_right_tid = tablet_id(old_left_tid.value() + 1); + + auto& left_tablet_info = tablets.get_tablet_info(old_left_tid); + auto& right_tablet_info = tablets.get_tablet_info(old_right_tid); + + auto sorted = [] (tablet_replica_set set) { + std::ranges::sort(set, std::less()); + return set; + }; + auto left_tablet_replicas = sorted(left_tablet_info.replicas); + auto right_tablet_replicas = sorted(right_tablet_info.replicas); + if (left_tablet_replicas != right_tablet_replicas) { + throw std::runtime_error(format("Sibling tablets {} (r: {}) and {} (r: {}) are not colocated.", + old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); + } + auto merged_tablet_info = locator::merge_tablet_info(left_tablet_info, right_tablet_info); + if (!merged_tablet_info) { + throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).", + old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); + } + + new_tablets.set_tablet(tid, *merged_tablet_info); + } + + lblogger.info("Merge tablets for table {}, decreasing tablet count from {} to {}", + table, tablets.tablet_count(), new_tablets.tablet_count()); + co_return std::move(new_tablets); + } +public: + future resize_tablets(token_metadata_ptr tm, table_id table) { + auto& tmap = tm->tablets().get_tablet_map(table); + if (tmap.needs_split()) { + return split_tablets(std::move(tm), table); + } else if (tmap.needs_merge()) { + return merge_tablets(std::move(tm), table); + } + throw std::logic_error(format("Table {} cannot be resized", table)); + } + // FIXME: Handle materialized views. }; @@ -2230,8 +2847,8 @@ void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balan impl().set_use_tablet_aware_balancing(use_tablet_aware_balancing); } -future tablet_allocator::split_tablets(locator::token_metadata_ptr tm, table_id table) { - return impl().split_tablets(std::move(tm), table); +future tablet_allocator::resize_tablets(locator::token_metadata_ptr tm, table_id table) { + return impl().resize_tablets(std::move(tm), table); } tablet_allocator_impl& tablet_allocator::impl() { diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 98ccff24b1..2036b9ea3d 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -106,6 +106,7 @@ struct table_resize_plan { resize[id] = std::move(other_resize); } } + finalize_resize.merge(std::move(other.finalize_resize)); } }; @@ -153,6 +154,12 @@ public: _migrations.emplace_back(std::move(info)); } + void add(migrations_vector migrations) { + for (auto&& mig : migrations) { + add(std::move(mig)); + } + } + void merge(migration_plan&& other) { std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations)); _has_nodes_to_drain |= other._has_nodes_to_drain; @@ -166,8 +173,8 @@ public: const table_resize_plan& resize_plan() const { return _resize_plan; } - void set_resize_plan(table_resize_plan resize_plan) { - _resize_plan = std::move(resize_plan); + void merge_resize_plan(table_resize_plan resize_plan) { + _resize_plan.merge(std::move(resize_plan)); } const tablet_repair_plan& repair_plan() const { return _repair_plan; } @@ -228,7 +235,7 @@ public: void set_use_table_aware_balancing(bool); - future split_tablets(locator::token_metadata_ptr, table_id); + future resize_tablets(locator::token_metadata_ptr, table_id); /// Should be called when the node is no longer a leader. void on_leadership_lost(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index cd6d7da5ff..47a537b19f 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1537,7 +1537,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration"); } - future<> handle_tablet_split_finalization(group0_guard g) { + future<> handle_tablet_resize_finalization(group0_guard g) { // Executes a global barrier to guarantee that any process (e.g. repair) holding stale version // of token metadata will complete before we update topology. auto guard = co_await global_tablet_token_metadata_barrier(std::move(g)); @@ -1550,7 +1550,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { for (auto& table_id : plan.resize_plan().finalize_resize) { auto s = _db.find_schema(table_id); - auto new_tablet_map = co_await _tablet_allocator.split_tablets(tm, table_id); + auto new_tablet_map = co_await _tablet_allocator.resize_tablets(tm, table_id); updates.emplace_back(co_await replica::tablet_map_to_mutation( new_tablet_map, table_id, @@ -2168,8 +2168,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case topology::transition_state::tablet_migration: co_await handle_tablet_migration(std::move(guard), false); break; - case topology::transition_state::tablet_split_finalization: - co_await handle_tablet_split_finalization(std::move(guard)); + case topology::transition_state::tablet_resize_finalization: + co_await handle_tablet_resize_finalization(std::move(guard)); break; case topology::transition_state::left_token_ring: { auto node = get_node_to_work_on(std::move(guard)); @@ -2616,8 +2616,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); - // Returns true if the state machine was transitioned into tablet split finalization path. - future maybe_start_tablet_split_finalization(group0_guard, const table_resize_plan& plan); + // Returns true if the state machine was transitioned into tablet resize finalization path. + future maybe_start_tablet_resize_finalization(group0_guard, const table_resize_plan& plan); future refresh_tablet_load_stats(); future<> start_tablet_load_stats_refresher(); @@ -2717,10 +2717,10 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_await generate_migration_updates(updates, guard, plan); - // We only want to consider transitioning into tablet split finalization path, if there's no other work + // We only want to consider transitioning into tablet resize finalization path, if there's no other work // to be done (e.g. start migration or/and emit split decision). if (updates.empty()) { - co_return co_await maybe_start_tablet_split_finalization(std::move(guard), plan.resize_plan()); + co_return co_await maybe_start_tablet_resize_finalization(std::move(guard), plan.resize_plan()); } updates.emplace_back( @@ -2733,7 +2733,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_return true; } -future topology_coordinator::maybe_start_tablet_split_finalization(group0_guard guard, const table_resize_plan& plan) { +future topology_coordinator::maybe_start_tablet_resize_finalization(group0_guard guard, const table_resize_plan& plan) { if (plan.finalize_resize.empty()) { co_return false; } @@ -2745,11 +2745,11 @@ future topology_coordinator::maybe_start_tablet_split_finalization(group0_ updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) - .set_transition_state(topology::transition_state::tablet_split_finalization) + .set_transition_state(topology::transition_state::tablet_resize_finalization) .set_version(_topo_sm._topology.version + 1) .build()); - co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet split finalization"); + co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet resize finalization"); co_return true; } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index be783b52b8..045e57fe47 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -147,18 +147,27 @@ static std::unordered_map transition_state_ {topology::transition_state::write_both_read_old, "write both read old"}, {topology::transition_state::write_both_read_new, "write both read new"}, {topology::transition_state::tablet_migration, "tablet migration"}, - {topology::transition_state::tablet_split_finalization, "tablet split finalization"}, + {topology::transition_state::tablet_resize_finalization, "tablet resize finalization"}, {topology::transition_state::tablet_draining, "tablet draining"}, {topology::transition_state::left_token_ring, "left token ring"}, {topology::transition_state::rollback_to_normal, "rollback to normal"}, }; +// Allows old deprecated names to be recognized and point to the correct transition. +static std::unordered_map deprecated_name_to_transition_state = { + {"tablet split finalization", topology::transition_state::tablet_resize_finalization}, +}; + topology::transition_state transition_state_from_string(const sstring& s) { for (auto&& e : transition_state_to_name_map) { if (e.second == s) { return e.first; } } + auto it = deprecated_name_to_transition_state.find(s); + if (it != deprecated_name_to_transition_state.end()) { + return it->second; + } on_internal_error(tsmlogger, format("cannot map name {} to transition_state", s)); } diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 49d30fb393..20c0bd6610 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -110,7 +110,7 @@ struct topology { write_both_read_old, write_both_read_new, tablet_migration, - tablet_split_finalization, + tablet_resize_finalization, left_token_ring, rollback_to_normal, }; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 64f133b66b..a32cb41dda 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -33,6 +33,7 @@ #include "utils/error_injection.hh" #include "utils/to_string.hh" #include "service/topology_coordinator.hh" +#include "service/topology_state_machine.hh" #include @@ -1354,11 +1355,23 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { tmap.set_resize_decision(resize_decision); }); } +} + +static +future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) { for (auto table_id : plan.resize_plan().finalize_resize) { - const auto& old_tmap = tm.tablets().get_tablet_map(table_id); - testlog.info("Setting new tablet map of size {}", old_tmap.tablet_count() * 2); - tablet_map tmap(old_tmap.tablet_count() * 2); - tm.tablets().set_tablet_map(table_id, std::move(tmap)); + auto tm = stm.get(); + const auto& old_tmap = tm->tablets().get_tablet_map(table_id); + + auto new_tmap = co_await talloc.resize_tablets(tm, table_id); + auto new_resize_decision = locator::resize_decision{}; + new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); + new_tmap.set_resize_decision(std::move(new_resize_decision)); + + co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) { + tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); + return make_ready_future<>(); + }); } } @@ -1368,6 +1381,7 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) { for (auto&& mig : plan.migrations()) { tm.tablets().mutate_tablet_map(mig.tablet.table, [&] (tablet_map& tmap) { auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); + testlog.trace("Replacing tablet {} replica from {} to {}", mig.tablet.tablet, mig.src, mig.dst); tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); tmap.set_tablet(mig.tablet.tablet, tinfo); }); @@ -1399,6 +1413,9 @@ size_t get_tablet_count(const tablet_metadata& tm) { return count; } +static +void check_tablet_invariants(const tablet_metadata& tmeta); + static void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, locator::load_stats_ptr load_stats = {}, std::unordered_set skiplist = {}) { // Sanity limit to avoid infinite loops. @@ -1414,6 +1431,7 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, loc apply_plan(tm, plan); return make_ready_future<>(); }).get(); + handle_resize_finalize(talloc, stm, plan).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } @@ -2420,13 +2438,37 @@ void check_tablet_invariants(const tablet_metadata& tmeta) { std::unordered_set hosts; // Uniqueness of hosts for (const auto& replica: tinfo.replicas) { - BOOST_REQUIRE(hosts.insert(replica.host).second); + auto ret = hosts.insert(replica.host).second; + if (!ret) { + testlog.error("Failed tablet invariant check for tablet {}: {}", tid, tinfo.replicas); + } + BOOST_REQUIRE(ret); } return make_ready_future<>(); }).get(); } } +static +std::vector +allocate_replicas_in_racks(const std::vector& racks, int rf, + const std::unordered_map>& hosts_by_rack) { + // Choose replicas randomly while loading racks evenly. + std::vector replica_hosts; + for (int i = 0; i < rf; ++i) { + auto rack = racks[i % racks.size()]; + auto& rack_hosts = hosts_by_rack.at(rack.rack); + while (true) { + auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; + if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { + replica_hosts.push_back(candidate_host); + break; + } + } + } + return replica_hosts; +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { const int n_hosts = 6; @@ -2480,18 +2522,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { tablet_map tmap(1 << log2_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. - std::vector replica_hosts; - for (int i = 0; i < rf; ++i) { - auto rack = racks[i % racks.size()]; - auto& rack_hosts = hosts_by_rack[rack.rack]; - while (true) { - auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; - if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { - replica_hosts.push_back(candidate_host); - break; - } - } - } + std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); @@ -2606,6 +2637,205 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) { }, std::move(cfg)).get(); } +using rack_vector = std::vector; +using hosts_by_rack_map = std::unordered_map>; + +// runs in seastar thread. +static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, + const unsigned shard_count, const unsigned initial_tablets, + std::function set_tablets) { + rack_vector racks; + for (int i = 0; i < n_racks; i++) { + racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)}); + } + + testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); + + std::vector hosts; + for (int i = 0; i < n_hosts; ++i) { + hosts.push_back(host_id(next_uuid())); + } + + auto table1 = add_table(e).get(); + + hosts_by_rack_map hosts_by_rack; + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = inet_address("192.168.0.1"), + .this_host_id = hosts[0], + .local_dc_rack = racks[std::min(1, n_racks - 1)] + } + }); + + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + tablet_metadata tmeta; + + int i = 0; + for (auto h : hosts) { + auto ip = inet_address(format("192.168.0.{}", ++i)); + tm.update_host_id(h, ip); + auto rack = racks[i % racks.size()]; + hosts_by_rack[rack.rack].push_back(h); + tm.update_topology(h, rack, node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h); + testlog.debug("adding host {}, ip {}, rack {}, token {}", h, ip, rack.rack, token(tests::d2t(1. / hosts.size()))); + } + + tablet_map tmap(initial_tablets); + locator::resize_decision decision; + // leaves growing mode, allowing for merge decision. + decision.sequence_number = decision.next_sequence_number(); + tmap.set_resize_decision(std::move(decision)); + set_tablets(tm, tmap, racks, hosts_by_rack); + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + }).get(); + + auto tablet_count = [&] { + return stm.get()->tablets().get_tablet_map(table1).tablet_count(); + }; + auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { + rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + }; + + const uint64_t target_tablet_size = service::default_target_tablet_size; + auto merge_threshold = [&] () -> uint64_t { + return (target_tablet_size * 0.5f) * tablet_count(); + }; + + while (tablet_count() > 1) { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = merge_threshold() - 1 }}, + } + }; + + auto old_tablet_count = tablet_count(); + check_tablet_invariants(stm.get()->tablets()); + do_rebalance_tablets(std::move(load_stats)); + check_tablet_invariants(stm.get()->tablets()); + BOOST_REQUIRE_LT(tablet_count(), old_tablet_count); + } +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) { + do_with_cql_env_thread([] (auto& e) { + auto seed = tests::random::get_int(); + std::mt19937 random_engine{seed}; + + testlog.info("test_load_balancing_merge_colocation - seed {}", seed); + + for (auto i = 0; i < 10; i++) { + const int rf = tests::random::get_int(3, 3); + const int n_racks = rf; + const int n_hosts = tests::random::get_int(n_racks * rf, n_racks * rf * 2); + const unsigned shard_count = tests::random::get_int(2, 12); + const unsigned total_shard_count = n_hosts * shard_count; + const unsigned initial_tablets = std::bit_ceil(tests::random::get_int(total_shard_count, total_shard_count * 10)); + + auto set_tablets = [rf, shard_count] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + for (auto tid : tmap.tablet_ids()) { + testlog.debug("allocating replica in racks with rf {}", rf); + std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); + tablet_replica_set replicas; + replicas.reserve(replica_hosts.size()); + for (auto h : replica_hosts) { + replicas.push_back(tablet_replica {h, tests::random::get_int(0, shard_count - 1)}); + } + testlog.debug("allocating replicas for tablet {}: {}", tid, replicas); + tmap.set_tablet(tid, tablet_info {std::move(replicas)}); + } + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + } + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) { + do_with_cql_env_thread([] (auto& e) { + const int rf = 2; + const int n_racks = 1; + const int n_hosts = 2; + const unsigned shard_count = 2; + const unsigned initial_tablets = 2; + + auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + auto& hosts = hosts_by_rack.at(racks.front().rack); + auto host1 = hosts[0]; + auto host2 = hosts[1]; + tmap.set_tablet(tablet_id(0), tablet_info { + tablet_replica_set { + tablet_replica {host1, shard_id(0)}, + tablet_replica {host2, shard_id(0)}, + } + }); + tmap.set_tablet(tablet_id(1), tablet_info { + tablet_replica_set { + tablet_replica {host2, shard_id(0)}, + tablet_replica {host1, shard_id(0)}, + } + }); + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) { + do_with_cql_env_thread([] (auto& e) { + const int rf = 3; + const int n_racks = 1; + const int n_hosts = 4; + const unsigned shard_count = 2; + const unsigned initial_tablets = 2; + + auto set_tablets = [&] (token_metadata& tm, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + auto& rack = racks.front(); + auto& hosts = hosts_by_rack.at(rack.rack); + BOOST_REQUIRE(hosts.size() == 4); + auto a = hosts[0]; + auto b = hosts[1]; + auto c = hosts[2]; + auto d = hosts[3]; + + // nodes = {A, B, C, D} + // tablet1 = {A, B, C} + // tablet2 = {A, B, D} + // viable target for {tablet1, B} is D. + // viable target for {tablet2, B} is C. + // + // Decomission should succeed by migrating away even co-located replicas of sibling tablets that don't share viable targets. + // That should produce: + // tablet1 = {A, D, C} + // tablet2 = {A, C, D} + + auto decision = tmap.resize_decision(); + decision.way = locator::resize_decision::merge{}; + tmap.set_resize_decision(std::move(decision)); + tm.update_topology(b, rack, node::state::being_decommissioned, shard_count); + + tmap.set_tablet(tablet_id(0), tablet_info { + tablet_replica_set { + tablet_replica {a, shard_id(0)}, + tablet_replica {b, shard_id(0)}, + tablet_replica {c, shard_id(0)}, + } + }); + tmap.set_tablet(tablet_id(1), tablet_info { + tablet_replica_set { + tablet_replica {a, shard_id(0)}, + tablet_replica {b, shard_id(0)}, + tablet_replica {d, shard_id(0)}, + } + }); + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + }).get(); +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { inet_address ip1("192.168.0.1"); @@ -2614,7 +2844,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { auto host1 = host_id(next_uuid()); auto host2 = host_id(next_uuid()); - auto table1 = table_id(next_uuid()); + auto table1 = add_table(e).get(); unsigned shard_count = 2; @@ -2627,11 +2857,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { } }); - stm.mutate_token_metadata([&] (token_metadata& tm) { + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { tm.update_host_id(host1, ip1); tm.update_host_id(host2, ip2); tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); tablet_map tmap(2); for (auto tid : tmap.tablet_ids()) { @@ -2645,7 +2877,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); tm.set_tablets(std::move(tmeta)); - return make_ready_future<>(); }).get(); auto tablet_count = [&] { @@ -2668,19 +2899,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { const auto initial_ready_seq_number = std::numeric_limits::min(); - // there are 2 tablets, each with avg size hitting merge threshold, so merge request is emitted - { - locator::load_stats load_stats = { - .tables = { - { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, - } - }; - - do_rebalance_tablets(std::move(load_stats)); - BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets); - BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); - } - // avg size moved above target size, so merge is cancelled { locator::load_stats load_stats = { @@ -2722,6 +2940,19 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets * 2); BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); } + + // Check that balancer detects table size dropped to 0 and reduces tablet count down to 1 through merges. + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE_EQUAL(tablet_count(), 1); + } + }).get(); } @@ -3330,3 +3561,10 @@ SEASTAR_TEST_CASE(test_explicit_tablets_disable) { co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0); co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128); } + +SEASTAR_TEST_CASE(test_recognition_of_deprecated_name_for_resize_transition) { + using transition_state = service::topology::transition_state; + BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet split finalization"), transition_state::tablet_resize_finalization); + BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet resize finalization"), transition_state::tablet_resize_finalization); + return make_ready_future<>(); +} diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 6677fb6d88..db79e64a81 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -37,6 +37,10 @@ async def inject_error_on(manager, error_name, servers): errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] await asyncio.gather(*errs) +async def disable_injection_on(manager, error_name, servers): + errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo], ranges: str = ''): node = server.ip_addr await manager.servers_see_each_other(servers) diff --git a/test/topology_experimental_raft/test_tablets_merge.py b/test/topology_experimental_raft/test_tablets_merge.py new file mode 100644 index 0000000000..40b8906646 --- /dev/null +++ b/test/topology_experimental_raft/test_tablets_merge.py @@ -0,0 +1,303 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from cassandra.query import SimpleStatement, ConsistencyLevel + +from test.pylib.internal_types import ServerInfo +from test.pylib.manager_client import ManagerClient +from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier +from test.topology.conftest import skip_mode + +import pytest +import asyncio +import logging +import time +import random + +logger = logging.getLogger(__name__) + +async def inject_error_one_shot_on(manager, error_name, servers): + errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + + +async def inject_error_on(manager, error_name, servers): + errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] + await asyncio.gather(*errs) + +async def disable_injection_on(manager, error_name, servers): + errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + + +async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str): + host = manager.cql.cluster.metadata.get_host(server.ip_addr) + + # read_barrier is needed to ensure that local tablet metadata on the queried node + # reflects the finalized tablet movement. + await read_barrier(manager.api, server.ip_addr) + + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where " + f"table_id = {table_id}", host=host) + return rows[0].tablet_count + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_merge_simple(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--logger-log-level', 'table=debug', + '--logger-log-level', 'load_balancer=debug', + '--target-tablet-size-in-bytes', '30000', + ] + servers = [await manager.server_add(config={ + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + }, cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + + # Initial average table size of 400k (1 tablet), so triggers some splits. + total_keys = 200 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;") + assert len(rows) == len(keys) + + await check() + + await manager.api.flush_keyspace(servers[0].ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count == 1 + + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + + # Increases the chance of tablet migration concurrent with split + await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await check() + time.sleep(2) # Give load balancer some time to do work + + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await check() + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > 1 + + # Allow shuffling of tablet replicas to make co-location work harder + async def shuffle(): + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + time.sleep(2) + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + await shuffle() + + # This will allow us to simulate some balancing after co-location with shuffling, to make sure that + # balancer won't break co-location. + await inject_error_on(manager, "tablet_merge_completion_bypass", servers) + + # Shrinks table significantly, forcing merge. + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + # To avoid race of major with migration + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) + # Waits for balancer to co-locate sibling tablets + await s1_log.wait_for("All sibling tablets are co-located") + # Do some shuffling to make sure balancer works with co-located tablets + await shuffle() + + old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + await inject_error_on(manager, "replica_merge_completion_wait", servers) + await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) + + await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) + await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count < old_tablet_count + await check() + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await check() + +# Multiple cycles of split and merge, with topology changes in parallel and RF > 1. +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=info', + '--logger-log-level', 'table=info', + '--logger-log-level', 'raft_topology=info', + '--logger-log-level', 'group0_raft_sm=info', + '--logger-log-level', 'load_balancer=info', + '--target-tablet-size-in-bytes', '30000', + ] + config = { + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + } + servers = [await manager.server_add(config=config, cmdline=cmdline), + await manager.server_add(config=config, cmdline=cmdline), + await manager.server_add(config=config, cmdline=cmdline)] + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + + async def perform_topology_ops(): + logger.info("Topology ops in background") + server_id_to_decommission = servers[-1].server_id + logger.info("Decommissioning old server with id {}".format(server_id_to_decommission)) + await manager.decommission_node(server_id_to_decommission) + servers.pop() + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + logger.info("Completed topology ops") + + for cycle in range(2): + logger.info("Running split-merge cycle #{}".format(cycle)) + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + logger.info("Inserting data") + # Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits. + total_keys = 200 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;") + assert len(rows) == len(keys) + + await check() + + logger.info("Flushing keyspace") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + # Increases the chance of tablet migration concurrent with split + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + logger.info("Enabling balancing") + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + topology_ops_task = asyncio.create_task(perform_topology_ops()) + + await check() + + logger.info("Waiting for split") + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + logger.info("Waiting for topology ops") + await topology_ops_task + + await check() + + old_tablet_count = tablet_count + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > old_tablet_count + logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) + + # Allow shuffling of tablet replicas to make co-location work harder + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + # This will allow us to simulate some balancing after co-location with shuffling, to make sure that + # balancer won't break co-location. + await inject_error_on(manager, "tablet_merge_completion_bypass", servers) + + logger.info("Deleting data") + # Delete almost all keys, enough to trigger a few merges. + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + # To avoid race of major with migration + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + logger.info("Flushing keyspace and performing major") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + logger.info("Waiting for merge decision") + await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) + # Waits for balancer to co-locate sibling tablets + await s1_log.wait_for("All sibling tablets are co-located") + # Do some shuffling to make sure balancer works with co-located tablets + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + + old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + topology_ops_task = asyncio.create_task(perform_topology_ops()) + + await inject_error_on(manager, "replica_merge_completion_wait", servers) + await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) + await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) + + logger.info("Waiting for topology ops") + await topology_ops_task + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count < old_tablet_count + logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) + await check() + + logger.info("Flushing keyspace and performing major") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await check()