From 3f9e317b2378ad66c0c9e0f8e95f73a65f388c17 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 14:28:49 -0300 Subject: [PATCH 01/31] scripts/tablet-mon: Allow visualization of tablet id That will help visualizing co-location of sibling tablets for a table that is undergoing merge. Signed-off-by: Raphael S. Carvalho --- scripts/tablet-mon.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/scripts/tablet-mon.py b/scripts/tablet-mon.py index ec8be748dd..4f7eadd2dd 100755 --- a/scripts/tablet-mon.py +++ b/scripts/tablet-mon.py @@ -435,8 +435,17 @@ def update_from_cql(initial=False): changed = True tablets_by_shard = set() + tablet_id_by_table = {} + + def tablet_id_for_table(table_id): + if table_id not in tablet_id_by_table: + tablet_id_by_table[table_id] = 0 + ret = tablet_id_by_table[table_id] + tablet_id_by_table[table_id] += 1 + return ret + for tablet in session.execute(tablets_query): - id = (tablet.table_id, tablet.last_token) + id = (tablet.table_id, tablet.last_token, tablet_id_for_table(tablet.table_id)) replicas = set(tablet.replicas) new_replicas = set(tablet.new_replicas) if tablet.new_replicas else replicas @@ -540,6 +549,7 @@ window_width = min(window_width, 3000) window_height = min(window_height, 2000) window = pygame.display.set_mode((window_width, window_height), pygame.RESIZABLE) pygame.display.set_caption('Tablets') +number_font = pygame.font.SysFont(None, 20) def draw_tablet(tablet, x, y): tablet.x = x @@ -567,6 +577,11 @@ def draw_tablet(tablet, x, y): border_top_left_radius=tablet_radius, border_top_right_radius=tablet_radius) + number_text = str(tablet.id[2]) + number_image = number_font.render(number_text, True, BLACK) + window.blit(number_image, (x + tablet_frame_size + (w - number_image.get_width()) / 2, + y + tablet_frame_size + (h-1 - number_image.get_height()) / 2)) + def draw_node_frame(x, y, x2, y2, color): pygame.draw.rect(window, color, (x, y, x2 - x, y2 - y), node_frame_thickness, border_radius=tablet_radius + tablet_frame_size + node_frame_mid) From 4a0c3ca5763e8bc850f9e9d3b1183d242e4418a5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 14:37:41 -0300 Subject: [PATCH 02/31] service: Extract erase of tablet replicas from candidate list Intra and inter migration can reuse it. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index b5a3f443ac..5e86a743d0 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1076,6 +1076,15 @@ public: } } + void erase_candidates(node_load_map& nodes, const tablet_map& tmap, global_tablet_id tablet) { + auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); + for (auto&& r : src_tinfo.replicas) { + if (nodes.contains(r.host)) { + erase_candidate(nodes[r.host].shards[r.shard], tablet); + } + } + } + void add_candidate(shard_load& shard_info, global_tablet_id tablet) { if (_use_table_aware_balancing) { shard_info.candidates[tablet.table].insert(tablet); @@ -1210,11 +1219,7 @@ public: _stats.for_dc(node_load.dc()).intranode_migrations_produced++; plan.add(std::move(mig)); - for (auto&& r : src_tinfo.replicas) { - if (nodes.contains(r.host)) { - erase_candidate(nodes[r.host].shards[r.shard], tablet); - } - } + erase_candidates(nodes, tmap, tablet); dst_info.tablet_count++; src_info.tablet_count--; @@ -1809,11 +1814,7 @@ public: } } - for (auto&& r : src_tinfo.replicas) { - if (nodes.contains(r.host)) { - erase_candidate(nodes[r.host].shards[r.shard], source_tablet); - } - } + erase_candidates(nodes, tmap, source_tablet); { auto& target_info = nodes[dst.host]; From e2edcf2c881b43b21745b0c19b7f41137e0487c1 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 15:25:42 -0300 Subject: [PATCH 03/31] service: Extract converge check for intra-node migration This extraction will make it easier later when co-located tablets are introduced in load balancer. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 5e86a743d0..d1d3c3a29d 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1122,6 +1122,12 @@ public: return true; } + // Checks whether moving a tablet from shard A to B (intra-node) would go against convergence. + // Returns false if the tablet should not be moved, and true if it may be moved. + bool check_convergence(const shard_load& src_info, const shard_load& dst_info) { + return src_info.tablet_count > dst_info.tablet_count + 1; + } + future make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) { migration_plan plan; const tablet_metadata& tmeta = _tm->tablets(); @@ -1183,7 +1189,7 @@ public: // Convergence check // When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit. - if (!shuffle && (src == dst || src_info.tablet_count <= dst_info.tablet_count + 1)) { + if (!shuffle && (src == dst || !check_convergence(src_info, dst_info))) { lblogger.debug("Node {} is balanced", host); break; } From 4e20a5eeb1be65fe5481fbd95d932260ca1a472d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 16:42:44 -0300 Subject: [PATCH 04/31] service: Extract update of node load on migrations Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 53 +++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index d1d3c3a29d..1c330b459b 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1128,6 +1128,37 @@ public: return src_info.tablet_count > dst_info.tablet_count + 1; } + // Adjusts the load of the source and destination shards in the host where intra-node migration happens. + void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, global_tablet_id tablet) { + auto& src_info = node_load.shards[src]; + auto& dst_info = node_load.shards[dst]; + dst_info.tablet_count++; + src_info.tablet_count--; + dst_info.tablet_count_per_table[tablet.table]++; + src_info.tablet_count_per_table[tablet.table]--; + } + + // Adjusts the load of the source and destination (host:shard) that were picked for the migration. + void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, global_tablet_id source_tablet) { + { + auto& target_info = nodes[dst.host]; + target_info.shards[dst.shard].tablet_count++; + target_info.shards[dst.shard].tablet_count_per_table[source_tablet.table]++; + target_info.tablet_count_per_table[source_tablet.table]++; + target_info.tablet_count += 1; + target_info.update(); + } + + auto& src_node_info = nodes[src.host]; + auto& src_shard_info = src_node_info.shards[src.shard]; + src_shard_info.tablet_count -= 1; + src_shard_info.tablet_count_per_table[source_tablet.table]--; + src_node_info.tablet_count_per_table[source_tablet.table]--; + + src_node_info.tablet_count -= 1; + src_node_info.update(); + } + future make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) { migration_plan plan; const tablet_metadata& tmeta = _tm->tablets(); @@ -1227,10 +1258,7 @@ public: erase_candidates(nodes, tmap, tablet); - dst_info.tablet_count++; - src_info.tablet_count--; - dst_info.tablet_count_per_table[tablet.table]++; - src_info.tablet_count_per_table[tablet.table]--; + update_node_load_on_migration(node_load, host, src, dst, tablet); sketch.pick(host, dst); sketch.unload(host, src); } @@ -1822,22 +1850,7 @@ public: erase_candidates(nodes, tmap, source_tablet); - { - auto& target_info = nodes[dst.host]; - target_info.shards[dst.shard].tablet_count++; - target_info.shards[dst.shard].tablet_count_per_table[source_tablet.table]++; - target_info.tablet_count_per_table[source_tablet.table]++; - target_info.tablet_count += 1; - target_info.update(); - } - - auto& src_shard_info = src_node_info.shards[src.shard]; - src_shard_info.tablet_count -= 1; - src_shard_info.tablet_count_per_table[source_tablet.table]--; - src_node_info.tablet_count_per_table[source_tablet.table]--; - - src_node_info.tablet_count -= 1; - src_node_info.update(); + update_node_load_on_migration(nodes, src, dst, source_tablet); if (src_node_info.tablet_count == 0) { push_back_node_candidate.cancel(); nodes_by_load.pop_back(); From 61f694acf5a18d3960b7bcbaf72358a70564bfc7 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:33:58 -0300 Subject: [PATCH 05/31] locator/tablets: Fix return type of three-way comparison operators Signed-off-by: Raphael S. Carvalho --- locator/tablets.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/locator/tablets.hh b/locator/tablets.hh index f1c13493f2..bf93285536 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -47,7 +47,7 @@ struct tablet_id { explicit tablet_id(size_t id) : id(id) {} size_t value() const { return id; } explicit operator size_t() const { return id; } - bool operator<=>(const tablet_id&) const = default; + auto operator<=>(const tablet_id&) const = default; }; /// Identifies tablet (not be confused with tablet replica) in the scope of the whole cluster. @@ -55,7 +55,7 @@ struct global_tablet_id { table_id table; tablet_id tablet; - bool operator<=>(const global_tablet_id&) const = default; + auto operator<=>(const global_tablet_id&) const = default; }; struct tablet_replica { From 93990eb16276198fc9332307fe03a7a6aaad27e9 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:43:02 -0300 Subject: [PATCH 06/31] locator/tablets: Introduce resize_decision::initial_decision() Know whether resize (e.g. split) decision was needed above initial tablet count will be helpful for guiding the merge decision, since we don't want a merge to happen while table is still growing, but hasn't left the merge threshold yet. Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 4 ++++ locator/tablets.hh | 2 ++ 2 files changed, 6 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index f787d557c2..cdbe31c9bc 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -580,6 +580,10 @@ resize_decision::seq_number_t resize_decision::next_sequence_number() const { return (sequence_number == std::numeric_limits::max()) ? 0 : sequence_number + 1; } +bool resize_decision::initial_decision() const { + return sequence_number == 0; +} + table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept { size_in_bytes = size_in_bytes + s.size_in_bytes; split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number); diff --git a/locator/tablets.hh b/locator/tablets.hh index bf93285536..23324c9dca 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -312,6 +312,8 @@ struct resize_decision { bool operator==(const resize_decision&) const; sstring type_name() const; seq_number_t next_sequence_number() const; + // Returns true if this is the initial decision, before split or merge was emitted. + bool initial_decision() const; }; struct table_load_stats { From 47c8237de083f25f4932f015110c762dff07098a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:44:13 -0300 Subject: [PATCH 07/31] locator/tablets: Introduce tablet_map::needs_merge() Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 4 ++++ locator/tablets.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index cdbe31c9bc..a4c79081dd 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -540,6 +540,10 @@ bool tablet_map::needs_split() const { return std::holds_alternative(_resize_decision.way); } +bool tablet_map::needs_merge() const { + return std::holds_alternative(_resize_decision.way); +} + const locator::resize_decision& tablet_map::resize_decision() const { return _resize_decision; } diff --git a/locator/tablets.hh b/locator/tablets.hh index 23324c9dca..34b326ddf3 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -482,6 +482,7 @@ public: bool operator==(const tablet_map&) const = default; bool needs_split() const; + bool needs_merge() const; /// Returns the token_range in which the given token will belong to after a tablet split dht::token_range get_token_range_after_split(const token& t) const noexcept; From 3082ff992cfa2b1ebc7fb77abba5df5c7c14bf11 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:46:23 -0300 Subject: [PATCH 08/31] locator/tablets: Introduce tablet_map::for_each_sibling_tablets() Adding interface to iterate through sibling tablets for a given table, one pair at a time. Initially I thought of having for_each_sibling_tablet do nothing for single tablet tables. But later I bumped into complications when wiring it into load balancer for building candidate list, since single-tablet tables have to be special cased. Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 18 ++++++++++++++++++ locator/tablets.hh | 10 ++++++++++ 2 files changed, 28 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index a4c79081dd..c1dd471ff5 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -406,6 +406,24 @@ future<> tablet_map::for_each_tablet(seastar::noncopyable_function(tabl } } +future<> tablet_map::for_each_sibling_tablets(seastar::noncopyable_function(tablet_desc, std::optional)> func) const { + auto make_desc = [this] (tablet_id tid) { + return tablet_desc{tid, &get_tablet_info(tid), get_tablet_transition_info(tid)}; + }; + if (_tablets.size() == 1) { + co_return co_await func(make_desc(first_tablet()), std::nullopt); + } + for (std::optional tid = first_tablet(); tid; tid = next_tablet(*tid)) { + auto tid1 = tid; + auto tid2 = tid = next_tablet(*tid); + if (!tid2) { + // Cannot happen with power-of-two invariant. + throw std::logic_error(format("Cannot retrieve sibling tablet with tablet count {}", tablet_count())); + } + co_await func(make_desc(*tid1), make_desc(*tid2)); + } +} + void tablet_map::clear_transitions() { _transitions.clear(); } diff --git a/locator/tablets.hh b/locator/tablets.hh index 34b326ddf3..728de957e0 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -348,6 +348,12 @@ struct repair_scheduler_config { using load_stats_ptr = lw_shared_ptr; +struct tablet_desc { + tablet_id tid; + const tablet_info* info; // cannot be null. + const tablet_transition_info* transition; // null if there's no transition. +}; + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). @@ -453,6 +459,10 @@ public: /// Calls a given function for each tablet in the map in token ownership order. future<> for_each_tablet(seastar::noncopyable_function(tablet_id, const tablet_info&)> func) const; + /// Calls a given function for each sibling tablet in the map in token ownership order. + /// If tablet count == 1, then there will be only one call and 2nd tablet_desc is disengaged. + future<> for_each_sibling_tablets(seastar::noncopyable_function(tablet_desc, std::optional)> func) const; + const auto& transitions() const { return _transitions; } From 96d4f2230eee7ca0a55b37c175d74d9b906be2cd Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:50:35 -0300 Subject: [PATCH 09/31] service: Introduce migration_plan::add(migrations_vector) Allow addition of multiple tablet_migration_info into the plan. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.hh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 98ccff24b1..a7a862f6d2 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -153,6 +153,12 @@ public: _migrations.emplace_back(std::move(info)); } + void add(migrations_vector migrations) { + for (auto&& mig : migrations) { + add(std::move(mig)); + } + } + void merge(migration_plan&& other) { std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations)); _has_nodes_to_drain |= other._has_nodes_to_drain; From a5db92b9e68f3e0c77f38d2add39e11b6dbddb2e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:59:10 -0300 Subject: [PATCH 10/31] service/tablet_allocator: Introduce migration_tablet_set This new type will allow the load balancer to treat co-located tablets as a single candidate (will treat them as if they were already merged), allowing co-located replicas to be migrated together (in the same migration plan). The type is a variant of global_tablet_id and colocated_tablets (which holds the global_tablet_id of the sibling tablets). It will be eventually wired after some more preparation. It will allow for minimal amount of changes in the balancer code. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 78 +++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 1c330b459b..604f4a9e82 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -15,6 +15,7 @@ #include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/stall_free.hh" +#include "utils/overloaded_functor.hh" #include "db/config.hh" #include "locator/load_sketch.hh" #include "replica/database.hh" @@ -150,6 +151,51 @@ struct migration_badness { bool operator<=>(const migration_badness& other) const = default; }; +struct colocated_tablets { + global_tablet_id left_tablet; + global_tablet_id right_tablet; + + auto operator<=>(const colocated_tablets&) const = default; +}; + +// Represents either a single tablet replica or co-located replicas of sibling +// tablets. The migration tablet set is logically treated by balancer as a single +// candidate. When candidate represents co-located replicas, it means that +// the balancer will work to preserve the co-location by migrating those replicas +// to same destination. +struct migration_tablet_set { + std::variant tablet_s; + + table_id table() const { + return std::visit( + overloaded_functor{ + [](global_tablet_id t) { return t.table; }, + [](colocated_tablets t) { return t.left_tablet.table; }, + }, + tablet_s); + } + + using tablet_small_vector = utils::small_vector; + + tablet_small_vector tablets() const { + return std::visit( + overloaded_functor{ + [](global_tablet_id t) { + return tablet_small_vector{t}; }, + [](colocated_tablets t) { + return tablet_small_vector{t.left_tablet, t.right_tablet}; + }, + }, + tablet_s); + } + + bool colocated() const { + return std::holds_alternative(tablet_s); + } + + auto operator<=>(const migration_tablet_set&) const = default; +}; + struct migration_candidate { global_tablet_id tablet; tablet_replica src; @@ -167,6 +213,17 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(const service::migration_tablet_set& tablet_set, FormatContext& ctx) const { + if (tablet_set.colocated()) { + return fmt::format_to(ctx.out(), "{{colocated: {}}}", tablet_set.tablets()); + } + return fmt::format_to(ctx.out(), "{}", tablet_set.tablets().front()); + } +}; + template<> struct fmt::formatter : fmt::formatter { template @@ -181,6 +238,27 @@ struct fmt::formatter : fmt::formatter +struct hash { + size_t operator()(const colocated_tablets& id) const { + return utils::hash_combine(std::hash()(id.left_tablet), + std::hash()(id.right_tablet)); + } +}; + +template <> +struct hash { + size_t operator()(const migration_tablet_set& tablet_set) const { + return std::hash()(tablet_set.tablet_s); + } +}; + +} + namespace service { /// The algorithm aims to equalize tablet count on each shard. From ed06b4b1e782e88098027908b1619c1973c4396a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 22:07:17 -0300 Subject: [PATCH 11/31] service: Add migration helpers for migration_tablet_set Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 60 +++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 604f4a9e82..75860037ae 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -652,6 +652,26 @@ private: on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); } + using migration_vector = migration_plan::migrations_vector; + static migration_vector + get_migration_info(const migration_tablet_set& tablet_set, tablet_transition_kind kind, tablet_replica src, tablet_replica dst) { + migration_vector infos; + for (auto tablet : tablet_set.tablets()) { + infos.push_back(tablet_migration_info{kind, tablet, src, dst}); + } + return infos; + } + + using migration_streaming_info_vector = utils::small_vector; + static migration_streaming_info_vector + get_migration_streaming_infos(const locator::topology& topology, const tablet_map& tmap, const migration_vector& infos) { + migration_streaming_info_vector streaming_infos; + for (auto& info : infos) { + auto& ti = tmap.get_tablet_info(info.tablet.tablet); + streaming_infos.push_back(get_migration_streaming_info(topology, ti, info)); + } + return streaming_infos; + } public: load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, std::unordered_set skiplist) : _target_tablet_size(target_tablet_size) @@ -967,6 +987,12 @@ public: } } + void apply_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { + for (auto& info : infos) { + apply_load(nodes, info); + } + } + bool can_accept_load(node_load_map& nodes, const tablet_migration_streaming_info& info) { for (auto r : info.read_from) { if (!nodes.contains(r.host)) { @@ -991,6 +1017,16 @@ public: return true; } + // Precondition: all migration streaming info have same source and destination. + // FIXME: remove precondition but it's not easy without copying noad_load_map. + bool can_accept_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { + // Since all migration info have the same source and destination, the load check can be easily done + // by informing the number of migrations. + auto info = infos[0]; + info.stream_weight = infos.size(); + return can_accept_load(nodes, info); + } + bool in_shuffle_mode() const { return utils::get_local_injector().enter("tablet_allocator_shuffle"); } @@ -1237,6 +1273,30 @@ public: src_node_info.update(); } + void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, const migration_tablet_set& tablet_set) { + for (auto tablet : tablet_set.tablets()) { + update_node_load_on_migration(node_load, host, src, dst, tablet); + } + } + + void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) { + for (auto tablet : tablet_set.tablets()) { + update_node_load_on_migration(nodes, src, dst, tablet); + } + } + + static void unload(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { + for (auto _ : tablet_set.tablets()) { + sketch.unload(host, shard); + } + } + + static void pick(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { + for (auto _ : tablet_set.tablets()) { + sketch.pick(host, shard); + } + } + future make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) { migration_plan plan; const tablet_metadata& tmeta = _tm->tablets(); From 308741c9cb7a2cf9dd354604bddfad384f5c892d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 22:11:53 -0300 Subject: [PATCH 12/31] service: Add convergence check variant for migration_tablet_set The load inversion convergence check should be able to know when two tablets are being migrated instead of one, to avoid oscillations. This will be wired when migration_tablet_set is wired. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 75860037ae..60bad0cba5 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1217,7 +1217,7 @@ public: // The assumption is that the algorithm moves tablets from more loaded nodes to less loaded nodes, // so convergence is reached where the node we picked as source has lower load, or will have lower // load post-movement, than the node we picked as the destination. - bool check_convergence(node_load& src_info, node_load& dst_info) { + bool check_convergence(node_load& src_info, node_load& dst_info, unsigned delta = 1) { // Allow migrating only from candidate nodes which have higher load than the target. if (src_info.avg_load <= dst_info.avg_load) { lblogger.trace("Load inversion: src={} (avg_load={}), dst={} (avg_load={})", @@ -1226,8 +1226,8 @@ public: } // Prevent load inversion post-movement which can lead to oscillations. - if (src_info.get_avg_load(src_info.tablet_count - 1) < - dst_info.get_avg_load(dst_info.tablet_count + 1)) { + if (src_info.get_avg_load(src_info.tablet_count - delta) < + dst_info.get_avg_load(dst_info.tablet_count + delta)) { lblogger.trace("Load inversion post-movement: src={} (avg_load={}), dst={} (avg_load={})", src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load); return false; @@ -1236,10 +1236,18 @@ public: return true; } + bool check_convergence(node_load& src_info, node_load& dst_info, const migration_tablet_set& tablet_set) { + return check_convergence(src_info, dst_info, tablet_set.tablets().size()); + } + // Checks whether moving a tablet from shard A to B (intra-node) would go against convergence. // Returns false if the tablet should not be moved, and true if it may be moved. - bool check_convergence(const shard_load& src_info, const shard_load& dst_info) { - return src_info.tablet_count > dst_info.tablet_count + 1; + bool check_convergence(const shard_load& src_info, const shard_load& dst_info, unsigned delta = 1) { + return src_info.tablet_count > dst_info.tablet_count + delta; + } + + bool check_convergence(const shard_load& src_info, const shard_load& dst_info, const migration_tablet_set& tablet_set) { + return check_convergence(src_info, dst_info, tablet_set.tablets().size()); } // Adjusts the load of the source and destination shards in the host where intra-node migration happens. From 2791923a2137b449c7d1ffade5d98ab2e93af5ac Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 22:13:40 -0300 Subject: [PATCH 13/31] service: Add replication constraint check variant for migration_tablet_set We have a check that moving a tablet from A to B won't violate replication constraints. The contraints might not be the same for two sibling tablets that have co-located replicas. Example: nodes = {A, B, C, D} tablet1 = {A, B, C} tablet2 = {A, B, D} viable target for {tablet1, B} is D. viable target for {tablet2, B} is C. When co-located replicas share a viable target, then a migration can be emitted to preserve co-location. To allow decommission when co-located replicas don't share a viable target, a skip info will be returned for each tablet, even though that means breaking this co-location. Decommission is higher in priority. Also, doing some preparation for integration of migration_tablet_set. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 75 ++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 60bad0cba5..431cadd6d1 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1510,6 +1510,65 @@ public: return std::nullopt; } + // Verifies if moving a given tablet from src_info.id to dst_info.id would not violate + // replication constraints (no increase in replica co-location on nodes, racks). + // Returns std::nullopt if it does not and the movement is allowed. + // + // The contraints might not be the same for two sibling tablets that have co-located + // replicas. + // Example: + // nodes = {A, B, C, D} + // tablet1 = {A, B, C} + // tablet2 = {A, B, D} + // viable target for {tablet1, B} is D. + // viable target for {tablet2, B} is C. + // + // When co-located replicas share a viable target, then a migration can be emitted to + // preserve co-location. + // To allow decommission when co-located replicas don't share a viable target, a skip + // info will be returned for each tablet, even though that means breaking this + // co-location. Decommission is higher in priority. + using skip_info_vector = std::vector>; + std::optional + check_constraints(node_load_map& nodes, + const locator::tablet_map& tmap, + node_load& src_info, + node_load& dst_info, + migration_tablet_set tablet_set, + bool need_viable_targets) { + std::unordered_map viable_targets_count; + std::unordered_map skip_info_per_tablet; + std::unordered_set shared_viable_targets; + const size_t tablet_count = tablet_set.tablets().size(); + + for (auto tablet : tablet_set.tablets()) { + auto skip = check_constraints(nodes, tmap, src_info, dst_info, tablet, need_viable_targets); + if (!skip) { + continue; + } + for (const auto& target : skip->viable_targets) { + // A viable target is considered shared if all candidates share that same viable target. + if (++viable_targets_count[target] == tablet_count) { + shared_viable_targets.insert(target); + } + } + skip_info_per_tablet.emplace(std::make_pair(tablet, std::move(*skip))); + } + if (skip_info_per_tablet.empty()) { + return std::nullopt; + } + if (!shared_viable_targets.empty()) { + return skip_info_vector{std::make_pair(skip_info{std::move(shared_viable_targets)}, std::move(tablet_set))}; + } + + skip_info_vector skip_infos; + skip_infos.reserve(skip_info_per_tablet.size()); + for (auto&& [tablet, info] : skip_info_per_tablet) { + skip_infos.push_back(std::make_pair(std::move(info), migration_tablet_set{std::move(tablet)})); + } + return skip_infos; + } + // Picks best tablet replica to move and its new destination. // The destination host is picked among nodes_by_load_dst, with dst being the preferred destination. // @@ -1944,14 +2003,18 @@ public: // When drain_skipped is true, we already picked movement to a viable target. if (!drain_skipped) { + auto process_skip_info = [&] (global_tablet_id tablet, skip_info skip) { + if (src_node_info.drained && skip.viable_targets.empty()) { + auto replicas = tmap.get_tablet_info(tablet.tablet).replicas; + throw std::runtime_error(fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", + tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); + } + src_node_info.skipped_candidates.emplace_back(src, tablet, std::move(skip.viable_targets)); + }; + auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablet, src_node_info.drained); if (skip) { - if (src_node_info.drained && skip->viable_targets.empty()) { - auto replicas = tmap.get_tablet_info(source_tablet.tablet).replicas; - throw std::runtime_error(fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", - source_tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); - } - src_node_info.skipped_candidates.emplace_back(src, source_tablet, std::move(skip->viable_targets)); + process_skip_info(source_tablet, std::move(*skip)); continue; } } From ba633b1da2845dcf1a504dfc44cd8182cf6c3292 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 29 Nov 2024 15:57:11 -0300 Subject: [PATCH 14/31] service: Introduce alias to per-table candidate map type Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 431cadd6d1..ca5c761081 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -348,6 +348,8 @@ class load_balancer { // It's an average per-shard load in terms of tablet count. using load_type = double; + using table_candidates_map = std::unordered_map>; + struct shard_load { size_t tablet_count = 0; @@ -362,7 +364,7 @@ class load_balancer { // Tablets which still have a replica on this shard which are candidates for migrating away from this shard. // Grouped by table. Used when _use_table_aware_balancing == true. // The set of candidates per table may be empty. - std::unordered_map> candidates; + table_candidates_map candidates; // For all tables. Used when _use_table_aware_balancing == false. std::unordered_set candidates_all_tables; @@ -1041,7 +1043,7 @@ public: return rand_int() % shard_count; } - table_id pick_table(const std::unordered_map>& candidates) { + table_id pick_table(const table_candidates_map& candidates) { if (!_use_table_aware_balancing) { on_internal_error(lblogger, "pick_table() called when table-aware balancing is disabled"); } From fd6bf7b357e76b31ab8ade3364e9175ffabd3818 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 10 Sep 2024 20:36:34 -0300 Subject: [PATCH 15/31] locator/tablets: Extend tablet_replica equality comparator to three-way Will be needed later for sorting tablet replicas. Signed-off-by: Raphael S. Carvalho --- locator/tablets.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locator/tablets.hh b/locator/tablets.hh index 728de957e0..5221fc10cd 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -62,7 +62,7 @@ struct tablet_replica { host_id host; shard_id shard; - bool operator==(const tablet_replica&) const = default; + auto operator<=>(const tablet_replica&) const = default; }; using tablet_replica_set = utils::small_vector; From fd33e6dfad21204bc410f8a497239543b55461a7 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 09:14:42 -0300 Subject: [PATCH 16/31] service: Introduce sorted_replicas_for_tablet_load() Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index ca5c761081..af5963ae7e 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -621,6 +621,12 @@ private: return trinfo ? trinfo->next : ti.replicas; } + tablet_replica_set sorted_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { + auto set = get_replicas_for_tablet_load(ti, trinfo); + std::ranges::sort(set, std::less()); + return set; + } + // Whether to count the tablet as putting streaming load on the system. // Tablets which are streaming or are yet-to-stream are counted. bool is_streaming(const tablet_transition_info* trinfo) { From a5cc6fb297e3ce6cc9ab112705249060259680a8 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 14 Oct 2024 16:14:26 -0300 Subject: [PATCH 17/31] locator: Add tablet_map::sibling_tablets() Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 8 ++++++++ locator/tablets.hh | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index c1dd471ff5..cfc84fe94b 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -30,6 +30,14 @@ namespace locator { seastar::logger tablet_logger("tablets"); +std::optional> tablet_map::sibling_tablets(tablet_id t) const { + if (tablet_count() == 1) { + return std::nullopt; + } + auto first_sibling = tablet_id(t.value() & ~0x1); + return std::make_pair(first_sibling, *next_tablet(first_sibling)); +} + static write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) { diff --git a/locator/tablets.hh b/locator/tablets.hh index 5221fc10cd..2463d3bbc7 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -448,6 +448,11 @@ public: return tablet_id(size_t(t) + 1); } + // Returns the pair of sibling tablets for a given tablet id. + // For example, if id 1 is provided, a pair of 0 and 1 is returned. + // Returns disengaged optional when sibling pair cannot be found. + std::optional> sibling_tablets(tablet_id t) const; + /// Returns true iff tablet has a given replica. /// If tablet is in transition, considers both previous and next replica set. bool has_replica(tablet_id, tablet_replica) const; From 5d3b9dba47c10814f2b61e0777d62cf580333673 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 09:26:55 -0300 Subject: [PATCH 18/31] service: Wire migration_tablet_set into the load balancer If table is undergoing merge, co-located replicas of sibling tablets will be treated by balancer as if they were a single migration candidate. The reason for that is that the balancer must not undo the co-location work done previously on behalf of merge decision. Sibling tablets will be put in the same migration plan, but note that each tablet is still migrated independently in the state machine. The balancer will exclude both co-located tablets from the candidate list if either haven't finished migration yet. It achieves that by pretending migration of sibling tablets succeeded, allowing it to note that tablets are co-located even though either can still be migrating. The load inversion convergence check also happens after picking a candidate now, since the balancer must be aware that co-located tablets are being migrated together and we want to avoid oscillations. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 247 +++++++++++++++++++++++++++--------- 1 file changed, 185 insertions(+), 62 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index af5963ae7e..da30f98d91 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -197,7 +197,7 @@ struct migration_tablet_set { }; struct migration_candidate { - global_tablet_id tablet; + migration_tablet_set tablets; tablet_replica src; tablet_replica dst; migration_badness badness; @@ -228,7 +228,7 @@ template<> struct fmt::formatter : fmt::formatter { template auto format(const service::migration_candidate& candidate, FormatContext& ctx) const { - fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablet, candidate.src, + fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablets, candidate.src, candidate.dst, candidate.badness); if (candidate.badness.is_bad()) { fmt::format_to(ctx.out(), " (bad!)"); @@ -348,7 +348,7 @@ class load_balancer { // It's an average per-shard load in terms of tablet count. using load_type = double; - using table_candidates_map = std::unordered_map>; + using table_candidates_map = std::unordered_map>; struct shard_load { size_t tablet_count = 0; @@ -366,7 +366,7 @@ class load_balancer { // The set of candidates per table may be empty. table_candidates_map candidates; // For all tables. Used when _use_table_aware_balancing == false. - std::unordered_set candidates_all_tables; + std::unordered_set candidates_all_tables; future<> clear_gently() { co_await utils::clear_gently(candidates); @@ -393,7 +393,7 @@ class load_balancer { struct skipped_candidate { tablet_replica replica; - global_tablet_id tablet; + migration_tablet_set tablets; std::unordered_set viable_targets; }; @@ -1067,7 +1067,7 @@ public: on_internal_error(lblogger, "No candidate table"); } - global_tablet_id peek_candidate(shard_load& shard_info) { + migration_tablet_set peek_candidate(shard_load& shard_info) { if (_use_table_aware_balancing) { auto table = pick_table(shard_info.candidates); return *shard_info.candidates[table].begin(); @@ -1187,31 +1187,52 @@ public: co_return *best_candidate; } - void erase_candidate(shard_load& shard_info, global_tablet_id tablet) { + void erase_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { - shard_info.candidates[tablet.table].erase(tablet); - if (shard_info.candidates[tablet.table].empty()) { - shard_info.candidates.erase(tablet.table); + auto table = tablets.table(); + shard_info.candidates[table].erase(tablets); + if (shard_info.candidates[table].empty()) { + shard_info.candidates.erase(table); } } else { - shard_info.candidates_all_tables.erase(tablet); + shard_info.candidates_all_tables.erase(tablets); } } - void erase_candidates(node_load_map& nodes, const tablet_map& tmap, global_tablet_id tablet) { + void maybe_erase_colocated_candidate(shard_load& shard_info, const tablet_map& tmap, global_tablet_id tablet) { + if (!tmap.needs_merge()) { + return; + } + auto siblings = tmap.sibling_tablets(tablet.tablet); + if (!siblings) { + on_internal_error(lblogger, format("Unable to find sibling tablet of {} during merge", tablet)); + } + auto left_sibling = global_tablet_id{tablet.table, siblings->first}; + auto right_sibling = global_tablet_id{tablet.table, siblings->second}; + erase_candidate(shard_info, migration_tablet_set{colocated_tablets{left_sibling, right_sibling}}); + } + + void erase_candidates(node_load_map& nodes, const tablet_map& tmap, const migration_tablet_set& tablets) { + // FIXME: indentation. + for (auto tablet : tablets.tablets()) { auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); for (auto&& r : src_tinfo.replicas) { if (nodes.contains(r.host)) { - erase_candidate(nodes[r.host].shards[r.shard], tablet); + lblogger.trace("Erasing tablet {} from {}", tablet, r); + // Not necessarily all replicas of sibling tablets are co-located, and so we need to + // remove them from candidate list using global_tablet_id. + erase_candidate(nodes[r.host].shards[r.shard], migration_tablet_set{tablet}); + maybe_erase_colocated_candidate(nodes[r.host].shards[r.shard], tmap, tablet); } } + } } - void add_candidate(shard_load& shard_info, global_tablet_id tablet) { + void add_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { - shard_info.candidates[tablet.table].insert(tablet); + shard_info.candidates[tablets.table()].insert(tablets); } else { - shard_info.candidates_all_tables.insert(tablet); + shard_info.candidates_all_tables.insert(tablets); } } @@ -1388,15 +1409,20 @@ public: } auto candidate = co_await peek_candidate(nodes, src_info, tablet_replica{host, src}, tablet_replica{host, dst}); - auto tablet = candidate.tablet; + auto tablets = candidate.tablets; + + // Recheck convergence to avoid oscillations if co-located tablets are being migrated together. + if (!shuffle && (src == dst || !check_convergence(src_info, dst_info, tablets))) { + lblogger.debug("Node {} is balanced", host); + break; + } // Emit migration. - auto mig = tablet_migration_info {tablet_transition_kind::intranode_migration, tablet, - tablet_replica{host, src}, tablet_replica{host, dst}}; - auto& tmap = tmeta.get_tablet_map(tablet.table); - auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); - auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), src_tinfo, mig); + auto mig = get_migration_info(tablets, tablet_transition_kind::intranode_migration, + tablet_replica{host, src}, tablet_replica{host, dst}); + auto& tmap = tmeta.get_tablet_map(tablets.table()); + auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig); if (!can_accept_load(nodes, mig_streaming_info)) { _stats.for_dc(node_load.dc()).migrations_skipped++; @@ -1410,9 +1436,9 @@ public: _stats.for_dc(node_load.dc()).intranode_migrations_produced++; plan.add(std::move(mig)); - erase_candidates(nodes, tmap, tablet); + erase_candidates(nodes, tmap, tablets); - update_node_load_on_migration(node_load, host, src, dst, tablet); + update_node_load_on_migration(node_load, host, src, dst, tablets); sketch.pick(host, dst); sketch.unload(host, src); } @@ -1621,9 +1647,9 @@ public: auto get_candidate = [this, drain_skipped, &nodes, &src_node_info] (tablet_replica src, tablet_replica dst) -> future { if (drain_skipped) { - auto source_tablet = src_node_info.skipped_candidates.back().tablet; - auto badness = evaluate_candidate(nodes, source_tablet.table, src, dst); - co_return migration_candidate{source_tablet, src, dst, badness}; + auto source_tablets = src_node_info.skipped_candidates.back().tablets; + auto badness = evaluate_candidate(nodes, source_tablets.table(), src, dst); + co_return migration_candidate{source_tablets, src, dst, badness}; } else { auto&& src_shard_info = src_node_info.shards[src.shard]; co_return co_await peek_candidate(nodes, src_shard_info, src, dst); @@ -1634,7 +1660,7 @@ public: // Given src as the source replica, evaluate all destinations. // Updates min_candidate with the best candidate, if better is found. - auto evaluate_targets = [&] (global_tablet_id tablet, tablet_replica src, migration_badness src_badness) -> future<> { + auto evaluate_targets = [&] (migration_tablet_set tablets, tablet_replica src, migration_badness src_badness) -> future<> { migration_badness min_dst_badness; std::optional min_dst_host; std::vector best_hosts; @@ -1645,11 +1671,11 @@ public: auto& new_target_info = nodes[new_target]; // Skip movements which may harm convergence. - if (!src_node_info.drained && !check_convergence(src_node_info, new_target_info)) { + if (!src_node_info.drained && !check_convergence(src_node_info, new_target_info, tablets)) { continue; } - auto badness = evaluate_dst_badness(nodes, tablet.table, tablet_replica{new_target, 0}); + auto badness = evaluate_dst_badness(nodes, tablets.table(), tablet_replica{new_target, 0}); if (!min_dst_host || badness.dst_node_badness < min_dst_badness.dst_node_badness) { min_dst_badness = badness; min_dst_host = new_target; @@ -1673,7 +1699,8 @@ public: for (shard_id new_dst_shard = 0; new_dst_shard < nodes[host].shard_count; new_dst_shard++) { co_await coroutine::maybe_yield(); auto new_dst = tablet_replica{host, new_dst_shard}; - auto badness = evaluate_dst_badness(nodes, tablet.table, new_dst); + + auto badness = evaluate_dst_badness(nodes, tablets.table(), new_dst); if (!min_dst || badness < min_dst_badness) { min_dst_badness = badness; min_dst = new_dst; @@ -1689,7 +1716,7 @@ public: } auto candidate = migration_candidate{ - tablet, src, *min_dst, + tablets, src, *min_dst, migration_badness{src_badness.shard_badness(), src_badness.node_badness(), min_dst_badness.shard_badness(), @@ -1708,9 +1735,9 @@ public: // Consider better alternatives. if (drain_skipped) { - auto source_tablet = src_node_info.skipped_candidates.back().tablet; - auto badness = evaluate_src_badness(nodes, source_tablet.table, src); - co_await evaluate_targets(source_tablet, src, badness); + auto tablets = src_node_info.skipped_candidates.back().tablets; + auto badness = evaluate_src_badness(nodes, tablets.table(), src); + co_await evaluate_targets(tablets, src, badness); } else { // Find a better candidate. // Consider different tables. For each table, first find the best source shard. @@ -1756,7 +1783,7 @@ public: if (drain_skipped) { src_node_info.skipped_candidates.pop_back(); } else { - erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablet); + erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablets); } // Restore invariants. @@ -1910,7 +1937,7 @@ public: push_back_shard_candidate.cancel(); auto& candidate = src_node_info.skipped_candidates.back(); src = candidate.replica; - lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablet, src, candidate.viable_targets); + lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablets, src, candidate.viable_targets); // When draining, need to narrow down targets to viable targets before choosing the best target. nodes_by_load_dst.clear(); @@ -1959,6 +1986,7 @@ public: // Check convergence conditions. // When draining nodes, disable convergence checks so that all tablets are migrated away. + bool can_check_convergence = !shuffle && nodes_to_drain.empty(); if (!shuffle && nodes_to_drain.empty()) { // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than @@ -2001,32 +2029,40 @@ public: // May choose a different source shard than src.shard or different destination host/shard than dst. auto candidate = co_await pick_candidate(nodes, src_node_info, target_info, src, dst, nodes_by_load_dst, drain_skipped); - auto source_tablet = candidate.tablet; + auto source_tablets = candidate.tablets; src = candidate.src; dst = candidate.dst; - auto& tmap = tmeta.get_tablet_map(source_tablet.table); + auto& tmap = tmeta.get_tablet_map(source_tablets.table()); + // If best candidate is co-located sibling tablets, then convergence is re-checked to avoid oscillations. + if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) { + lblogger.debug("No more candidates. Load would be inverted."); + _stats.for_dc(dc).stop_load_inversion++; + break; + } // Check replication strategy constraints. // When drain_skipped is true, we already picked movement to a viable target. if (!drain_skipped) { - auto process_skip_info = [&] (global_tablet_id tablet, skip_info skip) { + auto process_skip_info = [&] (migration_tablet_set tablets, skip_info skip) { if (src_node_info.drained && skip.viable_targets.empty()) { + auto tablet = tablets.tablets().front(); auto replicas = tmap.get_tablet_info(tablet.tablet).replicas; throw std::runtime_error(fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); } - src_node_info.skipped_candidates.emplace_back(src, tablet, std::move(skip.viable_targets)); + src_node_info.skipped_candidates.emplace_back(src, tablets, std::move(skip.viable_targets)); }; - auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablet, src_node_info.drained); + auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablets, src_node_info.drained); if (skip) { - process_skip_info(source_tablet, std::move(*skip)); + for (auto&& [skip_info, tablets] : *skip) { + process_skip_info(tablets, skip_info); + } continue; } } - if (candidate.badness.is_bad()) { _stats.for_dc(_dc).bad_migrations++; } @@ -2038,11 +2074,10 @@ public: tablet_transition_kind kind = (src_node_info.state() == locator::node::state::being_removed || src_node_info.state() == locator::node::state::left) ? tablet_transition_kind::rebuild : tablet_transition_kind::migration; - auto mig = tablet_migration_info {kind, source_tablet, src, dst}; - auto& src_tinfo = tmap.get_tablet_info(source_tablet.tablet); - auto mig_streaming_info = get_migration_streaming_info(topo, src_tinfo, mig); + auto mig = get_migration_info(source_tablets, kind, src, dst); + auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig); - _load_sketch->pick(dst.host, dst.shard); + pick(*_load_sketch, dst.host, dst.shard, source_tablets); if (can_accept_load(nodes, mig_streaming_info)) { apply_load(nodes, mig_streaming_info); @@ -2065,16 +2100,16 @@ public: } } - erase_candidates(nodes, tmap, source_tablet); + erase_candidates(nodes, tmap, source_tablets); - update_node_load_on_migration(nodes, src, dst, source_tablet); + update_node_load_on_migration(nodes, src, dst, source_tablets); if (src_node_info.tablet_count == 0) { push_back_node_candidate.cancel(); nodes_by_load.pop_back(); } if (lblogger.is_enabled(seastar::log_level::debug)) { - co_await log_table_load(nodes, source_tablet.table); + co_await log_table_load(nodes, source_tablets.table()); } } @@ -2102,6 +2137,57 @@ public: co_return std::move(plan); } + class sibling_tablets_replicas_processor { + const tablet_desc _t1; + const std::optional _t2; + tablet_replica_set _t1_replicas; + tablet_replica_set _t2_replicas; + tablet_replica_set::iterator _current_t1; + tablet_replica_set::iterator _current_t2; + public: + sibling_tablets_replicas_processor(const tablet_desc t1, const std::optional t2, + tablet_replica_set t1_replicas, tablet_replica_set t2_replicas) + : _t1(std::move(t1)) + , _t2(std::move(t2)) + , _t1_replicas(std::move(t1_replicas)) + , _t2_replicas(std::move(t2_replicas)) + , _current_t1(_t1_replicas.begin()) + , _current_t2(_t2_replicas.begin()) { + } + + using tablet_ids = utils::small_vector; + + // Produces the next replica from sets of sibling tablets. If a given replica has + // the sibling tablets co-located in it, the ids of both tablets will be returned + // for that replica. + // Given replica sets of sibling tablets: + // t1 {A, B, C}, + // t2 {A, C, D}, + // it will yield + // {A, {t1, t2}}, {B, {t1}}, {C, {t1, t2}}, {D, {t2}} + // Invariant: if return value is engaged, size of tablet_ids will be 1 or 2. + std::optional> next_replica() { + if (_current_t1 == _t1_replicas.end() && _current_t2 == _t2_replicas.end()) { + return std::nullopt; + } + if (_current_t1 == _t1_replicas.end()) { + return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); + } + if (_current_t2 == _t2_replicas.end()) { + return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); + } + // Detect co-located replicas of sibling tablets. + if (*_current_t1 == *_current_t2) { + _current_t1++; + return std::make_pair(*_current_t2++, tablet_ids{_t1.tid, _t2->tid}); + } + if (*_current_t1 < *_current_t2) { + return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); + } + return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); + } + }; + future make_plan(dc_name dc) { migration_plan plan; @@ -2279,14 +2365,40 @@ public: for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = *tmap_; uint64_t total_load = 0; - co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) -> future<> { - auto trinfo = tmap.get_tablet_transition_info(tid); - if (is_streaming(trinfo)) { - apply_load(nodes, get_migration_streaming_info(topo, ti, *trinfo)); + auto get_replicas = [this] (std::optional t) -> tablet_replica_set { + return t ? sorted_replicas_for_tablet_load(*t->info, t->transition) : tablet_replica_set{}; + }; + auto migrating = [] (std::optional t) { + return t && bool(t->transition); + }; + auto maybe_apply_load = [&] (std::optional t) { + if (t && is_streaming(t->transition)) { + apply_load(nodes, get_migration_streaming_info(topo, *t->info, *t->transition)); } + }; - for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { + // If a table is undergoing merge, co-located replicas of sibling tablets will be treated as a single migration candidate, + // even though each tablet replica will be migrated independently. Next invocation of load balancer is able to exclude both + // sibling if either haven't finished migration yet. That's to prevent load balancer from incorrectly considering that + // they're not co-located if only one of them completed migration. + co_await tmap.for_each_sibling_tablets([&, table = table] (tablet_desc t1, std::optional t2) -> future<> { + maybe_apply_load(t1); + maybe_apply_load(t2); + + auto t1_replicas = get_replicas(t1); + // If t2 is disengaged, when tablet_count == 1, t2_replicas is empty and so will have no effect + // when adding t1 replicas as candidates. + auto t2_replicas = get_replicas(t2); + + sibling_tablets_replicas_processor processor(t1, t2, std::move(t1_replicas), std::move(t2_replicas)); + + auto get_table_desc = [&] (tablet_id tid) { + return tid == t1.tid ? t1 : t2; + }; + + while (auto next = processor.next_replica()) { + auto& [replica, tids] = *next; if (!nodes.contains(replica.host)) { continue; } @@ -2295,12 +2407,23 @@ public: if (shard_load_info.tablet_count == 0) { node_load_info.shards_by_load.push_back(replica.shard); } - shard_load_info.tablet_count += 1; - shard_load_info.tablet_count_per_table[table]++; - node_load_info.tablet_count_per_table[table]++; - total_load++; - if (!trinfo) { // migrating tablets are not candidates - add_candidate(shard_load_info, global_tablet_id {table, tid}); + shard_load_info.tablet_count += tids.size(); + shard_load_info.tablet_count_per_table[table] += tids.size(); + node_load_info.tablet_count_per_table[table] += tids.size(); + total_load += tids.size(); + if (tmap.needs_merge() && tids.size() == 2) { + // Exclude both sibling tablets if either haven't finished migration yet. That's to prevent balancer from + // un-doing the colocation. + if (!migrating(t1) && !migrating(t2)) { + auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}}; + add_candidate(shard_load_info, migration_tablet_set{std::move(candidate)}); + } + } else { + for (auto tid : tids) { + if (!migrating(get_table_desc(tid))) { // migrating tablets are not candidates + add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tid}}); + } + } } } From 75a6fe6a75cb5b8779153367077d8b434775c893 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 11:59:04 -0300 Subject: [PATCH 19/31] service: Respect initial_tablet_count if table is in growing mode The initial_tablet_count is respected while the table is in "growing mode". The table implicitly enters this mode when created, since we expect the table to be populated thereafter. We say that a table leaves this mode if it required a split above the initial tablet count. After that, we can rely purely on the average size to say that a table is shrinking and requires merge. This is not perfect and we may want to leave the mode too if we detect the table is shrinking (or even not growing for some significant amount of time), before any split happened. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index da30f98d91..988ba4fb7d 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -503,8 +503,16 @@ class load_balancer { std::vector tables_being_resized; static bool table_needs_merge(const table_size_desc& d) { - // FIXME: ignore merge request if tablet_count == initial_tablets. - return d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); + // The initial_tablet_count is respected while the table is in "growing mode". + // We say that a table leaves this mode if it required a split above the initial + // tablet count. After that, we can rely purely on the average size to say that + // a table is shrinking and requires merge. + // FIXME: this is not perfect and we may want to leave the mode too if we detect + // average size is decreasing significantly, before any split happened. + bool left_growing_mode = !d.resize_decision.initial_decision(); + lblogger.debug("table_needs_merge: tablet_count={}, avg_tablet_size={}, left_growing_mode={} (seq number: {})", + d.tablet_count, d.avg_tablet_size, left_growing_mode, d.resize_decision.sequence_number); + return left_growing_mode && d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); } static bool table_needs_split(const table_size_desc& d) { return d.avg_tablet_size > d.target_max_tablet_size; From e00798f1b1e4d8cfbd035751dc74b8ca62f05c36 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 12:27:33 -0300 Subject: [PATCH 20/31] service: Rename topology::transition_state::tablet_split_finalization This transition state will be reused by merge completion, so let's rename it to tablet_resize_finalization. The completion handling path will also be reused, so let's rename functions involved similarly. The old name "tablet split finalization" is deprecated but still recognized and points to the correct transition. Otherwise, the reverse lookup would fail when populating topology system table which last state was split finalization. NOTE: I thought of adding a new tablet_merge_finalization, but it would complicate things since more than one table could be ready for either split or merge, so you need a generic transition state for handling resize completion. Signed-off-by: Raphael S. Carvalho --- docs/dev/topology-over-raft.md | 2 +- service/storage_service.cc | 2 +- service/tablet_allocator.cc | 14 +++++++++++--- service/tablet_allocator.hh | 2 +- service/topology_coordinator.cc | 22 +++++++++++----------- service/topology_state_machine.cc | 11 ++++++++++- service/topology_state_machine.hh | 2 +- test/boost/tablets_test.cc | 8 ++++++++ 8 files changed, 44 insertions(+), 19 deletions(-) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 7d2b304dec..79ec3cedef 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -369,7 +369,7 @@ emits a decision to finalize the split request. The finalization is serialized w doubling tablet count would interfere with the migration process. When the state machine leaves the migration track, and there are tablets waiting for tablet split to -be finalized, the topology will transition into `tablet_split_finalization` state. At this moment, there will +be finalized, the topology will transition into `tablet_resize_finalization` state. At this moment, there will be no migration running in the system. A global token metadata barrier is executed to make sure that no process e.g. repair will be holding stale metadata when finalizing split. After that, the new tablet map, which is a result of splitting each preexisting tablet into two, is committed to group0. diff --git a/service/storage_service.cc b/service/storage_service.cc index d3ce753ae6..93b5baf4ae 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -756,7 +756,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) { [[fallthrough]]; case topology::transition_state::tablet_migration: [[fallthrough]]; - case topology::transition_state::tablet_split_finalization: + case topology::transition_state::tablet_resize_finalization: [[fallthrough]]; case topology::transition_state::commit_cdc_generation: [[fallthrough]]; diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 988ba4fb7d..b93f236c21 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -2538,7 +2538,7 @@ public: load_balancer_stats_manager& stats() { return _load_balancer_stats; } - +private: // The splitting of tablets today is completely based on the power-of-two constraint. // A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and // (x << 1) + 1. @@ -2565,6 +2565,14 @@ public: table, tablets.tablet_count(), new_tablets.tablet_count()); co_return std::move(new_tablets); } +public: + future resize_tablets(token_metadata_ptr tm, table_id table) { + auto& tmap = tm->tablets().get_tablet_map(table); + if (tmap.needs_split()) { + return split_tablets(std::move(tm), table); + } + throw std::logic_error(format("Table {} cannot be resized", table)); + } // FIXME: Handle materialized views. }; @@ -2598,8 +2606,8 @@ void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balan impl().set_use_tablet_aware_balancing(use_tablet_aware_balancing); } -future tablet_allocator::split_tablets(locator::token_metadata_ptr tm, table_id table) { - return impl().split_tablets(std::move(tm), table); +future tablet_allocator::resize_tablets(locator::token_metadata_ptr tm, table_id table) { + return impl().resize_tablets(std::move(tm), table); } tablet_allocator_impl& tablet_allocator::impl() { diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index a7a862f6d2..9305607074 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -234,7 +234,7 @@ public: void set_use_table_aware_balancing(bool); - future split_tablets(locator::token_metadata_ptr, table_id); + future resize_tablets(locator::token_metadata_ptr, table_id); /// Should be called when the node is no longer a leader. void on_leadership_lost(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 1112b50947..9856f14484 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1464,7 +1464,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration"); } - future<> handle_tablet_split_finalization(group0_guard g) { + future<> handle_tablet_resize_finalization(group0_guard g) { // Executes a global barrier to guarantee that any process (e.g. repair) holding stale version // of token metadata will complete before we update topology. auto guard = co_await global_tablet_token_metadata_barrier(std::move(g)); @@ -1477,7 +1477,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { for (auto& table_id : plan.resize_plan().finalize_resize) { auto s = _db.find_schema(table_id); - auto new_tablet_map = co_await _tablet_allocator.split_tablets(tm, table_id); + auto new_tablet_map = co_await _tablet_allocator.resize_tablets(tm, table_id); updates.emplace_back(co_await replica::tablet_map_to_mutation( new_tablet_map, table_id, @@ -2095,8 +2095,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case topology::transition_state::tablet_migration: co_await handle_tablet_migration(std::move(guard), false); break; - case topology::transition_state::tablet_split_finalization: - co_await handle_tablet_split_finalization(std::move(guard)); + case topology::transition_state::tablet_resize_finalization: + co_await handle_tablet_resize_finalization(std::move(guard)); break; case topology::transition_state::left_token_ring: { auto node = get_node_to_work_on(std::move(guard)); @@ -2544,8 +2544,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); - // Returns true if the state machine was transitioned into tablet split finalization path. - future maybe_start_tablet_split_finalization(group0_guard, const table_resize_plan& plan); + // Returns true if the state machine was transitioned into tablet resize finalization path. + future maybe_start_tablet_resize_finalization(group0_guard, const table_resize_plan& plan); future refresh_tablet_load_stats(); future<> start_tablet_load_stats_refresher(); @@ -2638,10 +2638,10 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_await generate_migration_updates(updates, guard, plan); - // We only want to consider transitioning into tablet split finalization path, if there's no other work + // We only want to consider transitioning into tablet resize finalization path, if there's no other work // to be done (e.g. start migration or/and emit split decision). if (updates.empty()) { - co_return co_await maybe_start_tablet_split_finalization(std::move(guard), plan.resize_plan()); + co_return co_await maybe_start_tablet_resize_finalization(std::move(guard), plan.resize_plan()); } updates.emplace_back( @@ -2654,7 +2654,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_return true; } -future topology_coordinator::maybe_start_tablet_split_finalization(group0_guard guard, const table_resize_plan& plan) { +future topology_coordinator::maybe_start_tablet_resize_finalization(group0_guard guard, const table_resize_plan& plan) { if (plan.finalize_resize.empty()) { co_return false; } @@ -2666,11 +2666,11 @@ future topology_coordinator::maybe_start_tablet_split_finalization(group0_ updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) - .set_transition_state(topology::transition_state::tablet_split_finalization) + .set_transition_state(topology::transition_state::tablet_resize_finalization) .set_version(_topo_sm._topology.version + 1) .build()); - co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet split finalization"); + co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet resize finalization"); co_return true; } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index be783b52b8..045e57fe47 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -147,18 +147,27 @@ static std::unordered_map transition_state_ {topology::transition_state::write_both_read_old, "write both read old"}, {topology::transition_state::write_both_read_new, "write both read new"}, {topology::transition_state::tablet_migration, "tablet migration"}, - {topology::transition_state::tablet_split_finalization, "tablet split finalization"}, + {topology::transition_state::tablet_resize_finalization, "tablet resize finalization"}, {topology::transition_state::tablet_draining, "tablet draining"}, {topology::transition_state::left_token_ring, "left token ring"}, {topology::transition_state::rollback_to_normal, "rollback to normal"}, }; +// Allows old deprecated names to be recognized and point to the correct transition. +static std::unordered_map deprecated_name_to_transition_state = { + {"tablet split finalization", topology::transition_state::tablet_resize_finalization}, +}; + topology::transition_state transition_state_from_string(const sstring& s) { for (auto&& e : transition_state_to_name_map) { if (e.second == s) { return e.first; } } + auto it = deprecated_name_to_transition_state.find(s); + if (it != deprecated_name_to_transition_state.end()) { + return it->second; + } on_internal_error(tsmlogger, format("cannot map name {} to transition_state", s)); } diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 0c0119de1f..f62b2055bd 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -110,7 +110,7 @@ struct topology { write_both_read_old, write_both_read_new, tablet_migration, - tablet_split_finalization, + tablet_resize_finalization, left_token_ring, rollback_to_normal, }; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index c8533ae451..fe87ef2638 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -33,6 +33,7 @@ #include "utils/error_injection.hh" #include "utils/to_string.hh" #include "service/topology_coordinator.hh" +#include "service/topology_state_machine.hh" #include @@ -3316,3 +3317,10 @@ SEASTAR_TEST_CASE(test_explicit_tablets_disable) { co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0); co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128); } + +SEASTAR_TEST_CASE(test_recognition_of_deprecated_name_for_resize_transition) { + using transition_state = service::topology::transition_state; + BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet split finalization"), transition_state::tablet_resize_finalization); + BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet resize finalization"), transition_state::tablet_resize_finalization); + return make_ready_future<>(); +} From 014e1c9a0fe7f244493f985970047dc94d093233 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 29 Nov 2024 17:03:39 -0300 Subject: [PATCH 21/31] locator: Introduce merge_tablet_info() Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 26 ++++++++++++++++++++++++++ locator/tablets.hh | 10 ++++++++++ 2 files changed, 36 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index cfc84fe94b..3f5ce6b0b5 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -159,6 +159,32 @@ bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info& return false; } +tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info) + : replicas(std::move(replicas)) + , repair_time(repair_time) + , repair_task_info(std::move(repair_task_info)) +{} + +tablet_info::tablet_info(tablet_replica_set replicas) + : tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}) +{} + +std::optional merge_tablet_info(tablet_info a, tablet_info b) { + if (a.repair_task_info.is_valid() || b.repair_task_info.is_valid()) { + return {}; + } + + auto sorted = [] (tablet_replica_set rs) { + std::ranges::sort(rs, std::less()); + return rs; + }; + if (sorted(a.replicas) != sorted(b.replicas)) { + return {}; + } + + auto repair_time = std::max(a.repair_time, b.repair_time); + return tablet_info(std::move(a.replicas), repair_time, a.repair_task_info); +} std::optional get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) { auto leaving = substract_sets(tinfo.replicas, trinfo.next); diff --git a/locator/tablets.hh b/locator/tablets.hh index 2463d3bbc7..f039fb2385 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -171,9 +171,19 @@ struct tablet_info { db_clock::time_point repair_time; locator::tablet_task_info repair_task_info; + tablet_info() = default; + tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info); + tablet_info(tablet_replica_set); + bool operator==(const tablet_info&) const = default; }; +// Merges tablet_info b into a, but with following constraints: +// - they cannot have active repair task, since each task has a different id +// - their replicas must be all co-located. +// If tablet infos are mergeable, merged info is returned. Otherwise, nullopt. +std::optional merge_tablet_info(tablet_info a, tablet_info b); + /// Represents states of the tablet migration state machine. /// /// The stage serves two major purposes: From 48dcefbf459013ae17dddc8c04d7749f8143473e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 12:39:55 -0300 Subject: [PATCH 22/31] service: Implement tablet map resize for merge This implements the ability to resize the tablet map for merge if the balancer emits the decision to finalize the merge when all sibling replicas are colocated for a table. But the co-location plan is not implemented in the balancer yet, so this is still not in use. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 42 +++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index b93f236c21..0be0cbe577 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -2565,11 +2565,53 @@ private: table, tablets.tablet_count(), new_tablets.tablet_count()); co_return std::move(new_tablets); } + + // The merging of tablet is completely based on the power-of-two constraint. + // Tablet of ids X and X+1 are merged into new tablet id (X >> 1). + future merge_tablets(token_metadata_ptr tm, table_id table) { + auto& tablets = tm->tablets().get_tablet_map(table); + + tablet_map new_tablets(tablets.tablet_count() / 2); + + for (tablet_id tid : new_tablets.tablet_ids()) { + co_await coroutine::maybe_yield(); + + tablet_id old_left_tid = tablet_id(tid.value() << 1); + tablet_id old_right_tid = tablet_id(old_left_tid.value() + 1); + + auto& left_tablet_info = tablets.get_tablet_info(old_left_tid); + auto& right_tablet_info = tablets.get_tablet_info(old_right_tid); + + auto sorted = [] (tablet_replica_set set) { + std::ranges::sort(set, std::less()); + return set; + }; + auto left_tablet_replicas = sorted(left_tablet_info.replicas); + auto right_tablet_replicas = sorted(right_tablet_info.replicas); + if (left_tablet_replicas != right_tablet_replicas) { + throw std::runtime_error(format("Sibling tablets {} (r: {}) and {} (r: {}) are not colocated.", + old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); + } + auto merged_tablet_info = locator::merge_tablet_info(left_tablet_info, right_tablet_info); + if (!merged_tablet_info) { + throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).", + old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); + } + + new_tablets.set_tablet(tid, *merged_tablet_info); + } + + lblogger.info("Merge tablets for table {}, decreasing tablet count from {} to {}", + table, tablets.tablet_count(), new_tablets.tablet_count()); + co_return std::move(new_tablets); + } public: future resize_tablets(token_metadata_ptr tm, table_id table) { auto& tmap = tm->tablets().get_tablet_map(table); if (tmap.needs_split()) { return split_tablets(std::move(tm), table); + } else if (tmap.needs_merge()) { + return merge_tablets(std::move(tm), table); } throw std::logic_error(format("Table {} cannot be resized", table)); } From 907739f3d1479b923d16c12d3028b5beb3378f41 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 20:13:58 -0300 Subject: [PATCH 23/31] replica: Handle tablet merge completion Signed-off-by: Raphael S. Carvalho --- replica/compaction_group.hh | 11 ++++ replica/table.cc | 109 +++++++++++++++++++++++++++++++----- 2 files changed, 106 insertions(+), 14 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 5970fac7d9..277594973c 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -91,6 +91,9 @@ public: compaction_group(table& t, size_t gid, dht::token_range token_range); ~compaction_group(); + void update_id(size_t id) { + _group_id = id; + } void update_id_and_range(size_t id, dht::token_range token_range) { _group_id = id; _token_range = std::move(token_range); @@ -203,6 +206,9 @@ using const_compaction_group_ptr = lw_shared_ptr; // shard will have as many groups as there are tablet replicas owned by that shard. class storage_group { compaction_group_ptr _main_cg; + // Holds compaction groups that now belongs to same tablet after merge. Compaction groups here will + // eventually have all their data moved into main group. + std::vector _merging_groups; std::vector _split_ready_groups; seastar::gate _async_gate; private: @@ -231,6 +237,11 @@ public: utils::small_vector compaction_groups() noexcept; utils::small_vector compaction_groups() const noexcept; + utils::small_vector split_unready_groups() const; + bool split_unready_groups_are_empty() const; + + void add_merging_group(compaction_group_ptr); + // Puts the storage group in split mode, in which it internally segregates data // into two sstable sets and two memtable sets corresponding to the two adjacent // tablets post-split. diff --git a/replica/table.cc b/replica/table.cc index a1f12b91db..ddfcdf2911 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -757,6 +757,11 @@ private: // that were previously split. future<> handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); + // Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into + // the new tablet id (X >> 1). In practice, that means storage groups for X and X+1 + // are merged into a new storage group with id (X >> 1). + future<> handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); + storage_group& storage_group_for_id(size_t i) const { return storage_group_manager::storage_group_for_id(schema(), i); } @@ -874,6 +879,9 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran void storage_group::for_each_compaction_group(std::function action) const noexcept { action(_main_cg); + for (auto& cg : _merging_groups) { + action(cg); + } for (auto& cg : _split_ready_groups) { action(cg); } @@ -895,6 +903,17 @@ utils::small_vector storage_group::compaction_gro return cgs; } +utils::small_vector storage_group::split_unready_groups() const { + utils::small_vector cgs; + cgs.push_back(_main_cg); + std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs)); + return cgs; +} + +bool storage_group::split_unready_groups_are_empty() const { + return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty)); +} + bool storage_group::set_split_mode() { if (!splitting_mode()) { auto create_cg = [this] () -> compaction_group_ptr { @@ -907,8 +926,12 @@ bool storage_group::set_split_mode() { _split_ready_groups = std::move(split_ready_groups); } - // The storage group is considered "split ready" if its main compaction group is empty. - return _main_cg->empty(); + // The storage group is considered "split ready" if all split unready groups (main + merging) are empty. + return split_unready_groups_are_empty(); +} + +void storage_group::add_merging_group(compaction_group_ptr cg) { + _merging_groups.push_back(std::move(cg)); } future<> storage_group::split(sstables::compaction_type_options::split opt) { @@ -917,24 +940,31 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) { } co_await utils::get_local_injector().inject("delay_split_compaction", 5s); - if (_main_cg->empty()) { + if (split_unready_groups_are_empty()) { co_return; } - auto holder = _main_cg->async_gate().hold(); - co_await _main_cg->flush(); - // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. - co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); - co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); + for (auto cg : split_unready_groups()) { + auto holder = cg->async_gate().hold(); + co_await cg->flush(); + // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. + co_await cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); + co_await cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); + } } lw_shared_ptr storage_group::make_sstable_set() const { - if (!splitting_mode()) { + if (_split_ready_groups.empty() && _merging_groups.empty()) { return _main_cg->make_sstable_set(); } const auto& schema = _main_cg->_t.schema(); std::vector> underlying; - underlying.reserve(1 + _split_ready_groups.size()); + underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size()); underlying.emplace_back(_main_cg->make_sstable_set()); + for (const auto& cg : _merging_groups) { + if (!cg->empty()) { + underlying.emplace_back(cg->make_sstable_set()); + } + } for (const auto& cg : _split_ready_groups) { underlying.emplace_back(cg->make_sstable_set()); } @@ -2392,7 +2422,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca // Stop the released main compaction groups asynchronously future<> stop_fut = make_ready_future<>(); for (auto& [id, sg] : _storage_groups) { - if (!sg->main_compaction_group()->empty()) { + if (!sg->split_unready_groups_are_empty()) { on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ "therefore groups cannot be remapped with the new tablet count.", id, table_id)); @@ -2427,6 +2457,51 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca return stop_fut; } +future<> tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) { + auto table_id = schema()->id(); + size_t old_tablet_count = old_tmap.tablet_count(); + size_t new_tablet_count = new_tmap.tablet_count(); + storage_group_map new_storage_groups; + + unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count); + unsigned merge_size = 1 << log2_reduce_factor; + + if (merge_size != 2) { + throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}", + old_tablet_count, new_tablet_count, table_id)); + } + + for (auto& [id, sg] : _storage_groups) { + // Pick first (even) tablet of each sibling pair. + if (id % merge_size != 0) { + continue; + } + auto new_tid = id >> log2_reduce_factor; + + auto new_cg = make_lw_shared(_t, new_tid, new_tmap.get_token_range(locator::tablet_id(new_tid))); + auto new_sg = make_lw_shared(std::move(new_cg)); + + for (unsigned i = 0; i < merge_size; i++) { + auto group_id = id + i; + + auto it = _storage_groups.find(group_id); + if (it == _storage_groups.end()) { + throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id)); + } + auto& sg = it->second; + sg->for_each_compaction_group([&new_sg, new_tid] (const compaction_group_ptr& cg) { + cg->update_id(new_tid); + new_sg->add_merging_group(cg); + }); + // FIXME: we MUST schedule a background action to flush memtable and move sstables of merging groups into the main one. + } + + new_storage_groups[new_tid] = std::move(new_sg); + } + _storage_groups = std::move(new_storage_groups); + return make_ready_future<>(); +} + future<> tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) { auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id()); auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map); @@ -2438,6 +2513,11 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); co_return; + } else if (new_tablet_count < old_tablet_count) { + tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets", + schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); + co_await handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map); + co_return; } // Allocate storage group if tablet is migrating in. @@ -2845,9 +2925,7 @@ size_t compaction_group::memtable_count() const noexcept { } size_t storage_group::memtable_count() const noexcept { - auto memtable_count = [] (const compaction_group_ptr& cg) { return cg ? cg->memtable_count() : 0; }; - return memtable_count(_main_cg) + - std::ranges::fold_left(_split_ready_groups | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{}); + return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{}); } future<> table::flush(std::optional pos) { @@ -3799,6 +3877,9 @@ future<> storage_group::stop(sstring reason) noexcept { co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) { return cg_ptr->stop(reason); }); + co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) { + return cg_ptr->stop(reason); + }); co_await std::move(closed_gate_fut); } From 70b3963b8d48814dd0bd81b6e859f90ab8fe7ff7 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 15 Oct 2024 17:19:37 -0300 Subject: [PATCH 24/31] replica: Implement merging of compaction groups on merge completion When handling merge completion, compaction groups that belonged to sibling tablets are placed into the same storage group, since those tablets become one after merge. In order to merge two groups, the source group needs its memtable to be flushed first, such that all the data can be moved into the destination. The handling happens in update_effective_replication_map() which cannot afford to wait for I/O, so the group merge will happen in background. There's a fiber that will wake up on merge completion and will iterate through the new set of storage groups (after merge), and will work on merging additional compaction groups into the main one. Signed-off-by: Raphael S. Carvalho --- replica/compaction_group.hh | 8 +++ replica/table.cc | 103 ++++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 277594973c..7d71ba70a0 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -158,6 +158,9 @@ public: // invalidated and statistics are updated. future<> update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc); + // Merges all sstables from another group into this one. + future<> merge_sstables_from(compaction_group& group); + const lw_shared_ptr& main_sstables() const noexcept; void set_main_sstables(lw_shared_ptr new_main_sstables); @@ -166,6 +169,7 @@ public: // Makes a sstable set, which includes all sstables managed by this group lw_shared_ptr make_sstable_set() const; + std::vector all_sstables() const; const std::vector& compacted_undeleted_sstables() const noexcept; // Triggers regular compaction. @@ -241,6 +245,8 @@ public: bool split_unready_groups_are_empty() const; void add_merging_group(compaction_group_ptr); + const std::vector& merging_groups() const; + future<> remove_empty_merging_groups(); // Puts the storage group in split mode, in which it internally segregates data // into two sstable sets and two memtable sets corresponding to the two adjacent @@ -282,6 +288,8 @@ using storage_group_map = absl::flat_hash_map stop() = 0; public: virtual ~storage_group_manager(); diff --git a/replica/table.cc b/replica/table.cc index ddfcdf2911..8afe0c31c9 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -635,7 +636,8 @@ const storage_group_map& storage_group_manager::storage_groups() const { } future<> storage_group_manager::stop_storage_groups() noexcept { - return parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); }); + co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); }); + co_await stop(); } void storage_group_manager::clear_storage_groups() { @@ -686,6 +688,10 @@ public: _storage_groups = std::move(r); } + future<> stop() override { + return make_ready_future<>(); + } + future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override { return make_ready_future(); } compaction_group& compaction_group_for_token(dht::token token) const noexcept override { @@ -737,6 +743,8 @@ class tablet_storage_group_manager final : public storage_group_manager { // current split, and not a previously revoked (stale) decision. // The minimum value, which is a negative number, is not used by coordinator for first decision. locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); + future<> _merge_completion_fiber; + condition_variable _merge_completion_event; private: const schema_ptr& schema() const { return _t.schema(); @@ -762,6 +770,12 @@ private: // are merged into a new storage group with id (X >> 1). future<> handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); + // When merge completes, compaction groups of sibling tablets are added to same storage + // group, but they're not merged yet into one, since the merge completion handler happens + // inside the erm updater which must complete ASAP. Therefore, those groups will be merged + // into a single one (main) in background. + future<> merge_completion_fiber(); + storage_group& storage_group_for_id(size_t i) const { return storage_group_manager::storage_group_for_id(schema(), i); } @@ -800,6 +814,7 @@ public: : _t(t) , _my_host_id(erm.get_token_metadata().get_my_id()) , _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id())) + , _merge_completion_fiber(merge_completion_fiber()) { storage_group_map ret; @@ -817,6 +832,11 @@ public: _storage_groups = std::move(ret); } + future<> stop() override { + _merge_completion_event.signal(); + return std::exchange(_merge_completion_fiber, make_ready_future<>()); + } + future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override; compaction_group& compaction_group_for_token(dht::token token) const noexcept override; @@ -934,6 +954,17 @@ void storage_group::add_merging_group(compaction_group_ptr cg) { _merging_groups.push_back(std::move(cg)); } +const std::vector& storage_group::merging_groups() const { + return _merging_groups; +} + +future<> storage_group::remove_empty_merging_groups() { + for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) { + co_await group->stop("tablet merge"); + } + std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty)); +} + future<> storage_group::split(sstables::compaction_type_options::split opt) { if (set_split_mode()) { co_return; @@ -944,6 +975,9 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) { co_return; } for (auto cg : split_unready_groups()) { + if (cg->async_gate().is_closed()) { + continue; + } auto holder = cg->async_gate().hold(); co_await cg->flush(); // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. @@ -1170,7 +1204,9 @@ future<> table::parallel_foreach_compaction_group(std::function(compact void table::for_each_compaction_group(std::function action) { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { - action(*cg); + if (auto holder = try_hold_gate(cg->async_gate())) { + action(*cg); + } }); }); } @@ -1178,7 +1214,9 @@ void table::for_each_compaction_group(std::function act void table::for_each_compaction_group(std::function action) const { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { - action(*cg); + if (auto holder = try_hold_gate(cg->async_gate())) { + action(*cg); + } }); }); } @@ -1832,6 +1870,35 @@ compaction_group::delete_unused_sstables(sstables::compaction_completion_desc de return delete_sstables_atomically(std::move(sstables_to_remove)); } +std::vector compaction_group::all_sstables() const { + std::vector all; + auto main_sstables = _main_sstables->all(); + auto maintenance_sstables = _maintenance_sstables->all(); + all.reserve(main_sstables->size() + maintenance_sstables->size()); + std::ranges::copy(*main_sstables, std::back_inserter(all)); + std::ranges::copy(*maintenance_sstables, std::back_inserter(all)); + return all; +} + +future<> +compaction_group::merge_sstables_from(compaction_group& group) { + auto& cs = _t.get_compaction_strategy(); + auto permit = co_await seastar::get_units(_t._sstable_set_mutation_sem, 1); + table::sstable_list_builder builder(std::move(permit)); + + auto sstables_to_merge = group.all_sstables(); + // re-build new list for this group with sstables of the group being merged. + auto res = co_await builder.build_new_list(*main_sstables(), cs.make_sstable_set(_t.schema()), sstables_to_merge, {}); + // execute: + std::invoke([&] noexcept { + set_main_sstables(std::move(res.new_sstable_set)); + group.clear_sstables(); + // FIXME: backlog adjustment is not exception safe. + backlog_tracker_adjust_charges({}, sstables_to_merge); + }); + _t.rebuild_statistics(); +} + future<> compaction_group::update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc) { // Build a new list of _sstables: We remove from the existing list the @@ -2457,6 +2524,34 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca return stop_fut; } +future<> tablet_storage_group_manager::merge_completion_fiber() { + co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group); + + while (!_t.async_gate().is_closed()) { + try { + co_await for_each_storage_group_gently([] (storage_group& sg) -> future<> { + auto main_group = sg.main_compaction_group(); + for (auto& group : sg.merging_groups()) { + // Synchronize with ongoing writes that might be blocked waiting for memory. + // Also, disabling compaction provides stability on the sstable set. + co_await group->stop("tablet merge"); + // Flushes memtable, so all the data can be moved. + co_await group->flush(); + co_await main_group->merge_sstables_from(*group); + } + co_await sg.remove_empty_merging_groups(); + }); + } catch (...) { + tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name()); + } + utils::get_local_injector().inject("replica_merge_completion_wait", [] () { + tlogger.info("Merge completion fiber finished, about to sleep"); + }); + co_await _merge_completion_event.wait(); + tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name()); + } +} + future<> tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) { auto table_id = schema()->id(); size_t old_tablet_count = old_tmap.tablet_count(); @@ -2493,12 +2588,12 @@ future<> tablet_storage_group_manager::handle_tablet_merge_completion(const loca cg->update_id(new_tid); new_sg->add_merging_group(cg); }); - // FIXME: we MUST schedule a background action to flush memtable and move sstables of merging groups into the main one. } new_storage_groups[new_tid] = std::move(new_sg); } _storage_groups = std::move(new_storage_groups); + _merge_completion_event.signal(); return make_ready_future<>(); } From 0a6d41305a64f384a595e4cad91dce5f22799ab3 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 30 Nov 2024 00:13:58 -0300 Subject: [PATCH 25/31] service: Make merge of resize plan commutative set_resize_plan() breaks commutativity since it may override the resize plans done earlier, for example, when adding co-location migrations in the DC plan. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 3 ++- service/tablet_allocator.hh | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 0be0cbe577..249bd1fc47 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -712,7 +712,8 @@ public: // Make plans for repair jobs plan.set_repair_plan(co_await make_repair_plan(plan)); - plan.set_resize_plan(co_await make_resize_plan()); + // Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones. + plan.merge_resize_plan(co_await make_resize_plan()); lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s)", plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count()); diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 9305607074..1d4923c14a 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -172,8 +172,8 @@ public: const table_resize_plan& resize_plan() const { return _resize_plan; } - void set_resize_plan(table_resize_plan resize_plan) { - _resize_plan = std::move(resize_plan); + void merge_resize_plan(table_resize_plan resize_plan) { + _resize_plan.merge(std::move(resize_plan)); } const tablet_repair_plan& repair_plan() const { return _repair_plan; } From cd5d1d3c990afeacdf0ef4fddf3a27829e4ad3d6 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 3 Dec 2024 21:16:08 -0300 Subject: [PATCH 26/31] gms: Add cluster feature for tablet merge The reason we need it is that tablet merge can only be finalized when the cluster agrees on the feature, otherwise unpatched nodes would fail to handle merge finalization, potentially crashing. Signed-off-by: Raphael S. Carvalho --- gms/feature_service.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 1f68b431c9..c5dcf34e1d 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -145,6 +145,7 @@ public: gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv }; gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv }; + gms::feature tablet_merge { *this, "TABLET_MERGE"sv }; // A feature just for use in tests. It must not be advertised unless // the "features_enable_test_feature" injection is enabled. From 3e518c7b235a54d2398605851243dc32c1ac62eb Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 11 Sep 2024 20:34:05 -0300 Subject: [PATCH 27/31] service: Co-locate sibling tablets for a table undergoing merge This implements the ability for the balancer to co-locate sibling tablets on the same shard. Co-location is low in priority, so regular load balancer is preferred over it. Previous changes allowed balancer to move co-located sibling tablets together, to not undo the co-location work done so far. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 207 +++++++++++++++++++++++++++++++++++- service/tablet_allocator.hh | 1 + test/boost/tablets_test.cc | 54 ++++++---- 3 files changed, 237 insertions(+), 25 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 249bd1fc47..f381b49942 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -20,6 +20,7 @@ #include "locator/load_sketch.hh" #include "replica/database.hh" #include "gms/feature_service.hh" +#include #include #include #include @@ -713,7 +714,7 @@ public: plan.set_repair_plan(co_await make_repair_plan(plan)); // Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones. - plan.merge_resize_plan(co_await make_resize_plan()); + plan.merge_resize_plan(co_await make_resize_plan(plan)); lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s)", plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count()); @@ -881,7 +882,183 @@ public: co_return ret; } - future make_resize_plan() { + // Returns true if a table has replicas of all its sibling tablets co-located. + // This is used for determining whether merge can be finalized, since co-location + // is a strict requirement for sibling tablets to be merged. + future all_sibling_tablet_replicas_colocated(table_id table, const tablet_map& tmap) { + bool all_colocated = true; + co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { + // FIXME: introduce variant of for_each_sibling_tablets() that accepts stop_iteration. + if (!all_colocated) { + return make_ready_future<>(); + } + + if (!t2_opt) { + on_internal_error(lblogger, format("Unable to find sibling tablet during co-location check for table {}", table)); + } + auto t2 = *t2_opt; + + // Sibling tablets cannot be considered co-located if their tablet info is temporarily unmergeable. + // It can happen either has active repair task for example. + all_colocated &= bool(merge_tablet_info(*t1.info, *t2.info)); + return make_ready_future<>(); + }); + if (all_colocated) { + lblogger.info("All sibling tablets are co-located for table {}", table); + } + co_return all_colocated; + } + + future make_merge_colocation_plan(node_load_map& nodes) { + migration_plan plan; + table_resize_plan resize_plan; + + auto can_proceed_with_colocation = [this] (table_id tid, const locator::tablet_map& tmap) { + // FIXME: tables with views aren't supported yet. See: https://github.com/scylladb/scylladb/issues/17265. + return tmap.needs_merge() && _db.column_family_exists(tid) && _db.find_column_family(tid).views().empty(); + }; + + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = *tmap_; + if (!can_proceed_with_colocation(table, tmap)) { + continue; + } + + // Also filter out replicas that don't belong to the DC being worked on. + auto get_replicas = [this, &nodes] (const tablet_desc& t) { + auto ret = sorted_replicas_for_tablet_load(*t.info, t.transition); + const auto [first, last] = std::ranges::remove_if(ret, [&] (tablet_replica r) { return !nodes.contains(r.host); }); + ret.erase(first, last); + return ret; + }; + + auto migrating = [] (const tablet_desc& t) { + return bool(t.transition); + }; + + auto first_non_matching_replicas = [] (tablet_replica_set r1, tablet_replica_set r2) -> std::optional> { + assert(r1.size() == r2.size()); + // Subtract intersecting (co-located) elements from the replicas set of sibling tablets. + // Think for example that tablet 0 and 1 have replicas [n2, n4] and [n1, n2] respectively. + // After subtraction, replica of tablet 1 in n1 will be a candidate for co-location with + // replica of tablet 0 in n4. + std::unordered_set intersection; + std::ranges::set_intersection(r1, r2, std::inserter(intersection, intersection.begin())); + const auto [r1_first, r1_last] = std::ranges::remove_if(r1, [&] (tablet_replica r) { return intersection.contains(r); }); + r1.erase(r1_first, r1_last); + const auto [r2_first, r2_last] = std::ranges::remove_if(r2, [&] (tablet_replica r) { return intersection.contains(r); }); + r2.erase(r2_first, r2_last); + // Favor replicas of different tablets that belong to same node. For example: + // tablet 0 replicas: [n2:s1, n3:s0] + // tablet 1 replicas: [n1:s0, n2:s0] + // Replica in n1:s0 cannot follow sibling replica in n2:s1. Otherwise, RF invariant is broken. + // Instead, tablet 1 in n2:s0 will be co-located with tablet 0 in n2:s1. + std::unordered_map r1_map; + std::ranges::transform(r1, std::inserter(r1_map, r1_map.begin()), [] (tablet_replica r) { + return std::make_pair(r.host, r); + }); + for (unsigned i = 0; i < r2.size(); i++) { + auto r1_it = r1_map.find(r2[i].host); + if (r1_it != r1_map.end()) { + return std::make_pair(r1_it->second, r2[i]); + } + } + // Since sets had intersection subtracted, the remaining replicas are certainly not co-located. + if (r1.size() > 0) { + return std::make_pair(r1[0], r2[0]); + } + return std::nullopt; + }; + + auto create_migration_info = [] (global_tablet_id gid, tablet_replica src, tablet_replica dst) { + auto kind = (src.host != dst.host) ? tablet_transition_kind::migration : tablet_transition_kind::intranode_migration; + return tablet_migration_info{kind, gid, src, dst}; + }; + + co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { + // Be optimistic about migrating tablets, as if they succeeded. + // Merge finalization will have to recheck that all sibling tablets are co-located. + + if (!t2_opt) { + on_internal_error(lblogger, format("Unable to find sibling tablet during co-location, with tablet count {}, for table {}", + tmap.tablet_count(), table)); + } + auto t2 = *t2_opt; + + auto r1 = get_replicas(t1); + auto r2 = get_replicas(t2); + if (r1 == r2) { + return make_ready_future<>(); + } + auto t1_id = global_tablet_id{table, t1.tid}; + auto t2_id = global_tablet_id{table, t2.tid}; + + if (migrating(t1) || migrating(t2)) { + return make_ready_future<>(); + } + // During RF change, tablets may have incrementally replicas allocated / deallocated to them. + // Let's temporarily delay their co-location until their replica sets have the same size. + if (r1.size() != r2.size()) { + lblogger.warn("Replica sets of tablets to be co-located differ in size: ({}: {}), ({}, {})", + t1_id, r1, t2_id, r2); + return make_ready_future<>(); + } + + // Returns true if moving candidate into dst will violate replication constraint. + const auto r2_hosts = r2 + | std::views::transform(std::mem_fn(&locator::tablet_replica::host)) + | std::ranges::to>(); + auto check_constraints = [r2_hosts = std::move(r2_hosts)] (tablet_replica src, tablet_replica dst) { + // handles intra-node migration. + if (src.host == dst.host && src.shard != dst.shard) { + return false; + } + return r2_hosts.contains(dst.host); + }; + + lblogger.debug("Replica sets of tablets being co-located: ({}: {}), ({}, {})", t1_id, r1, t2_id, r2); + + auto ret = first_non_matching_replicas(r1, r2); + if (!ret) { + // this shouldn't happen in practice, since the above call should always produce a pair of + // replicas to co-locate, since we only got here if the sibling tablets aren't fully co-located. + on_internal_error(lblogger, format("Unable to find replicas to co-locate for sibling tablets ({}: {}), and ({}, {})", + t1_id, r1, t2_id, r2)); + } + + // Emits migration for replica of t2 to co-habit same shard as replica of t1. + auto src = ret->second; + auto dst = ret->first; + + // If migration will violate replication constraint, skip to next pair of replicas of sibling tablets. + auto skip = check_constraints(src, dst); + if (skip) { + lblogger.debug("Replication constraint check failed, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", + t2_id, src, t1_id, dst); + return make_ready_future<>(); + } + + auto mig = create_migration_info(t2_id, src, dst); + auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), *t2.info, mig); + if (!can_accept_load(nodes, mig_streaming_info)) { + // FIXME: we can try another pair of non-colocated replicas of same sibling tablets. + lblogger.debug("Load limit reached, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", + t2_id, src, t1_id, dst); + return make_ready_future<>(); + } + apply_load(nodes, mig_streaming_info); + + lblogger.info("Created migration for replica ({}, {}) to co-habit same shard as ({}, {})", t2_id, src, t1_id, dst); + plan.add(std::move(mig)); + return make_ready_future<>(); + }); + } + plan.merge_resize_plan(std::move(resize_plan)); + + co_return std::move(plan); + } + + future make_resize_plan(const migration_plan& plan) { table_resize_plan resize_plan; if (!_tm->tablets().balancing_enabled()) { @@ -978,13 +1155,21 @@ public: continue; } - // If all replicas have completed split work for the current sequence number, it means that - // load balancer can emit finalize decision, for split to be completed. - if (table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + auto finalize_decision = [&] { _stats.for_cluster().resizes_finalized++; resize_plan.finalize_resize.insert(table); + }; + + // If all replicas have completed split work for the current sequence number, it means that + // load balancer can emit finalize decision, for split to be completed. + if (tmap.needs_split() && table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + finalize_decision(); lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", table, table_stats->split_ready_seq_number); + // If all sibling tablets are co-located across all DCs, then merge can be finalized. + } else if (tmap.needs_merge() && co_await all_sibling_tablet_replicas_colocated(table, tmap) && !bypass_merge_completion()) { + finalize_decision(); + lblogger.info("Finalizing resize decision for table {} as all replicas are co-located", table); } } @@ -1048,6 +1233,12 @@ public: return utils::get_local_injector().enter("tablet_allocator_shuffle"); } + // If cluster cannot agree on tablet merge feature, then merge will not be finalized since + // not all nodes in the cluster can handle the finalization step. + bool bypass_merge_completion() const { + return !_db.features().tablet_merge || utils::get_local_injector().enter("tablet_merge_completion_bypass"); + } + size_t rand_int() const { static thread_local std::default_random_engine re{std::random_device{}()}; static thread_local std::uniform_int_distribution dist; @@ -2453,6 +2644,12 @@ public: plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain)); } + if (_tm->tablets().balancing_enabled() && plan.empty()) { + auto dc_merge_plan = co_await make_merge_colocation_plan(nodes); + lblogger.info("Prepared {} migrations for co-locating sibling tablets in DC {}", dc_merge_plan.tablet_migration_count(), dc); + plan.merge(std::move(dc_merge_plan)); + } + co_await utils::clear_gently(nodes); co_return std::move(plan); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 1d4923c14a..2036b9ea3d 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -106,6 +106,7 @@ struct table_resize_plan { resize[id] = std::move(other_resize); } } + finalize_resize.merge(std::move(other.finalize_resize)); } }; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index fe87ef2638..e63b3657fd 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -1354,11 +1354,23 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { tmap.set_resize_decision(resize_decision); }); } +} + +static +future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) { for (auto table_id : plan.resize_plan().finalize_resize) { - const auto& old_tmap = tm.tablets().get_tablet_map(table_id); - testlog.info("Setting new tablet map of size {}", old_tmap.tablet_count() * 2); - tablet_map tmap(old_tmap.tablet_count() * 2); - tm.tablets().set_tablet_map(table_id, std::move(tmap)); + auto tm = stm.get(); + const auto& old_tmap = tm->tablets().get_tablet_map(table_id); + + auto new_tmap = co_await talloc.resize_tablets(tm, table_id); + auto new_resize_decision = locator::resize_decision{}; + new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); + new_tmap.set_resize_decision(std::move(new_resize_decision)); + + co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) { + tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); + return make_ready_future<>(); + }); } } @@ -1414,6 +1426,7 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, loc apply_plan(tm, plan); return make_ready_future<>(); }).get(); + handle_resize_finalize(talloc, stm, plan).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } @@ -2602,7 +2615,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { auto host1 = host_id(next_uuid()); auto host2 = host_id(next_uuid()); - auto table1 = table_id(next_uuid()); + auto table1 = add_table(e).get(); unsigned shard_count = 2; @@ -2614,11 +2627,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { } }); - stm.mutate_token_metadata([&] (token_metadata& tm) { + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { tm.update_host_id(host1, ip1); tm.update_host_id(host2, ip2); tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); tablet_map tmap(2); for (auto tid : tmap.tablet_ids()) { @@ -2632,7 +2647,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); tm.set_tablets(std::move(tmeta)); - return make_ready_future<>(); }).get(); auto tablet_count = [&] { @@ -2655,19 +2669,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { const auto initial_ready_seq_number = std::numeric_limits::min(); - // there are 2 tablets, each with avg size hitting merge threshold, so merge request is emitted - { - locator::load_stats load_stats = { - .tables = { - { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, - } - }; - - do_rebalance_tablets(std::move(load_stats)); - BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets); - BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); - } - // avg size moved above target size, so merge is cancelled { locator::load_stats load_stats = { @@ -2709,6 +2710,19 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets * 2); BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); } + + // Check that balancer detects table size dropped to 0 and reduces tablet count down to 1 through merges. + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE_EQUAL(tablet_count(), 1); + } + }).get(); } From 534ce7340fb4ea86cfb96f0dedd171050826578c Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 2 Dec 2024 10:45:02 -0300 Subject: [PATCH 28/31] service: Handle exception when retrying split It might happen sleep will fail during shutdown, so we should handle failure for shutdown to proceed gracefully. Signed-off-by: Raphael S. Carvalho --- service/storage_service.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 93b5baf4ae..0583f18486 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5453,7 +5453,11 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep sleep = true; } if (sleep) { - co_await split_retry.retry(_group0_as); + try { + co_await split_retry.retry(_group0_as); + } catch (...) { + slogger.warn("Sleep in split monitor failed with {}", std::current_exception()); + } } } } From 76ab293505b07ed9b35e08ded8a11d06904cf6b5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 12 Sep 2024 12:05:57 -0300 Subject: [PATCH 29/31] tests/topology_experimental_raft: Add tablet merge test Passed ./test.py --mode=dev ... --repeat=50. Signed-off-by: Raphael S. Carvalho --- .../test_tablets.py | 4 + .../test_tablets_merge.py | 303 ++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 test/topology_experimental_raft/test_tablets_merge.py diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 6677fb6d88..db79e64a81 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -37,6 +37,10 @@ async def inject_error_on(manager, error_name, servers): errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] await asyncio.gather(*errs) +async def disable_injection_on(manager, error_name, servers): + errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo], ranges: str = ''): node = server.ip_addr await manager.servers_see_each_other(servers) diff --git a/test/topology_experimental_raft/test_tablets_merge.py b/test/topology_experimental_raft/test_tablets_merge.py new file mode 100644 index 0000000000..40b8906646 --- /dev/null +++ b/test/topology_experimental_raft/test_tablets_merge.py @@ -0,0 +1,303 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from cassandra.query import SimpleStatement, ConsistencyLevel + +from test.pylib.internal_types import ServerInfo +from test.pylib.manager_client import ManagerClient +from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier +from test.topology.conftest import skip_mode + +import pytest +import asyncio +import logging +import time +import random + +logger = logging.getLogger(__name__) + +async def inject_error_one_shot_on(manager, error_name, servers): + errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + + +async def inject_error_on(manager, error_name, servers): + errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] + await asyncio.gather(*errs) + +async def disable_injection_on(manager, error_name, servers): + errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] + await asyncio.gather(*errs) + + +async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str): + host = manager.cql.cluster.metadata.get_host(server.ip_addr) + + # read_barrier is needed to ensure that local tablet metadata on the queried node + # reflects the finalized tablet movement. + await read_barrier(manager.api, server.ip_addr) + + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where " + f"table_id = {table_id}", host=host) + return rows[0].tablet_count + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_merge_simple(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--logger-log-level', 'table=debug', + '--logger-log-level', 'load_balancer=debug', + '--target-tablet-size-in-bytes', '30000', + ] + servers = [await manager.server_add(config={ + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + }, cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + + # Initial average table size of 400k (1 tablet), so triggers some splits. + total_keys = 200 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;") + assert len(rows) == len(keys) + + await check() + + await manager.api.flush_keyspace(servers[0].ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count == 1 + + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + + # Increases the chance of tablet migration concurrent with split + await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await check() + time.sleep(2) # Give load balancer some time to do work + + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await check() + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > 1 + + # Allow shuffling of tablet replicas to make co-location work harder + async def shuffle(): + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + time.sleep(2) + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + await shuffle() + + # This will allow us to simulate some balancing after co-location with shuffling, to make sure that + # balancer won't break co-location. + await inject_error_on(manager, "tablet_merge_completion_bypass", servers) + + # Shrinks table significantly, forcing merge. + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + # To avoid race of major with migration + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) + # Waits for balancer to co-locate sibling tablets + await s1_log.wait_for("All sibling tablets are co-located") + # Do some shuffling to make sure balancer works with co-located tablets + await shuffle() + + old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + await inject_error_on(manager, "replica_merge_completion_wait", servers) + await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) + + await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) + await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count < old_tablet_count + await check() + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await check() + +# Multiple cycles of split and merge, with topology changes in parallel and RF > 1. +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=info', + '--logger-log-level', 'table=info', + '--logger-log-level', 'raft_topology=info', + '--logger-log-level', 'group0_raft_sm=info', + '--logger-log-level', 'load_balancer=info', + '--target-tablet-size-in-bytes', '30000', + ] + config = { + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + } + servers = [await manager.server_add(config=config, cmdline=cmdline), + await manager.server_add(config=config, cmdline=cmdline), + await manager.server_add(config=config, cmdline=cmdline)] + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + + async def perform_topology_ops(): + logger.info("Topology ops in background") + server_id_to_decommission = servers[-1].server_id + logger.info("Decommissioning old server with id {}".format(server_id_to_decommission)) + await manager.decommission_node(server_id_to_decommission) + servers.pop() + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + logger.info("Completed topology ops") + + for cycle in range(2): + logger.info("Running split-merge cycle #{}".format(cycle)) + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + logger.info("Inserting data") + # Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits. + total_keys = 200 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;") + assert len(rows) == len(keys) + + await check() + + logger.info("Flushing keyspace") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + # Increases the chance of tablet migration concurrent with split + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + logger.info("Enabling balancing") + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + topology_ops_task = asyncio.create_task(perform_topology_ops()) + + await check() + + logger.info("Waiting for split") + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + logger.info("Waiting for topology ops") + await topology_ops_task + + await check() + + old_tablet_count = tablet_count + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > old_tablet_count + logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) + + # Allow shuffling of tablet replicas to make co-location work harder + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + # This will allow us to simulate some balancing after co-location with shuffling, to make sure that + # balancer won't break co-location. + await inject_error_on(manager, "tablet_merge_completion_bypass", servers) + + logger.info("Deleting data") + # Delete almost all keys, enough to trigger a few merges. + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + # To avoid race of major with migration + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + logger.info("Flushing keyspace and performing major") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + logger.info("Waiting for merge decision") + await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) + # Waits for balancer to co-locate sibling tablets + await s1_log.wait_for("All sibling tablets are co-located") + # Do some shuffling to make sure balancer works with co-located tablets + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + + old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + + topology_ops_task = asyncio.create_task(perform_topology_ops()) + + await inject_error_on(manager, "replica_merge_completion_wait", servers) + await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) + await disable_injection_on(manager, "tablet_allocator_shuffle", servers) + + await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) + await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) + + logger.info("Waiting for topology ops") + await topology_ops_task + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count < old_tablet_count + logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) + await check() + + logger.info("Flushing keyspace and performing major") + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + await manager.api.keyspace_compaction(server.ip_addr, "test") + await check() From 8344722a267513ab16da18532a3513653f99b802 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 14 Oct 2024 16:55:16 -0300 Subject: [PATCH 30/31] tests/boost: Add test to verify correctness of balancer decisions during merge Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 1 + test/boost/tablets_test.cc | 242 ++++++++++++++++++++++++++++++++++-- 2 files changed, 230 insertions(+), 13 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index f381b49942..18d35a184b 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -2252,6 +2252,7 @@ public: throw std::runtime_error(fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", tablet, src, nodes_to_drain, nodes_by_load_dst, replicas)); } + lblogger.debug("Adding replica {} of candidate {} to skipped list with the viable targets {}", src, candidate, skip.viable_targets); src_node_info.skipped_candidates.emplace_back(src, tablets, std::move(skip.viable_targets)); }; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index e63b3657fd..e7ccc01895 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -1380,6 +1380,7 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) { for (auto&& mig : plan.migrations()) { tm.tablets().mutate_tablet_map(mig.tablet.table, [&] (tablet_map& tmap) { auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); + testlog.trace("Replacing tablet {} replica from {} to {}", mig.tablet.tablet, mig.src, mig.dst); tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); tmap.set_tablet(mig.tablet.tablet, tinfo); }); @@ -1411,6 +1412,9 @@ size_t get_tablet_count(const tablet_metadata& tm) { return count; } +static +void check_tablet_invariants(const tablet_metadata& tmeta); + static void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, locator::load_stats_ptr load_stats = {}, std::unordered_set skiplist = {}) { // Sanity limit to avoid infinite loops. @@ -2421,13 +2425,37 @@ void check_tablet_invariants(const tablet_metadata& tmeta) { std::unordered_set hosts; // Uniqueness of hosts for (const auto& replica: tinfo.replicas) { - BOOST_REQUIRE(hosts.insert(replica.host).second); + auto ret = hosts.insert(replica.host).second; + if (!ret) { + testlog.error("Failed tablet invariant check for tablet {}: {}", tid, tinfo.replicas); + } + BOOST_REQUIRE(ret); } return make_ready_future<>(); }).get(); } } +static +std::vector +allocate_replicas_in_racks(const std::vector& racks, int rf, + const std::unordered_map>& hosts_by_rack) { + // Choose replicas randomly while loading racks evenly. + std::vector replica_hosts; + for (int i = 0; i < rf; ++i) { + auto rack = racks[i % racks.size()]; + auto& rack_hosts = hosts_by_rack.at(rack.rack); + while (true) { + auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; + if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { + replica_hosts.push_back(candidate_host); + break; + } + } + } + return replica_hosts; +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { const int n_hosts = 6; @@ -2481,18 +2509,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { tablet_map tmap(1 << log2_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. - std::vector replica_hosts; - for (int i = 0; i < rf; ++i) { - auto rack = racks[i % racks.size()]; - auto& rack_hosts = hosts_by_rack[rack.rack]; - while (true) { - auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; - if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { - replica_hosts.push_back(candidate_host); - break; - } - } - } + std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); @@ -2607,6 +2624,205 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) { }, std::move(cfg)).get(); } +using rack_vector = std::vector; +using hosts_by_rack_map = std::unordered_map>; + +// runs in seastar thread. +static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, + const unsigned shard_count, const unsigned initial_tablets, + std::function set_tablets) { + rack_vector racks; + for (int i = 0; i < n_racks; i++) { + racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)}); + } + + testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); + + std::vector hosts; + for (int i = 0; i < n_hosts; ++i) { + hosts.push_back(host_id(next_uuid())); + } + + auto table1 = add_table(e).get(); + + hosts_by_rack_map hosts_by_rack; + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = inet_address("192.168.0.1"), + .this_host_id = hosts[0], + .local_dc_rack = racks[std::min(1, n_racks - 1)] + } + }); + + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + tablet_metadata tmeta; + + int i = 0; + for (auto h : hosts) { + auto ip = inet_address(format("192.168.0.{}", ++i)); + tm.update_host_id(h, ip); + auto rack = racks[i % racks.size()]; + hosts_by_rack[rack.rack].push_back(h); + tm.update_topology(h, rack, node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h); + testlog.debug("adding host {}, ip {}, rack {}, token {}", h, ip, rack.rack, token(tests::d2t(1. / hosts.size()))); + } + + tablet_map tmap(initial_tablets); + locator::resize_decision decision; + // leaves growing mode, allowing for merge decision. + decision.sequence_number = decision.next_sequence_number(); + tmap.set_resize_decision(std::move(decision)); + set_tablets(tm, tmap, racks, hosts_by_rack); + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + }).get(); + + auto tablet_count = [&] { + return stm.get()->tablets().get_tablet_map(table1).tablet_count(); + }; + auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { + rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + }; + + const uint64_t target_tablet_size = service::default_target_tablet_size; + auto merge_threshold = [&] () -> uint64_t { + return (target_tablet_size * 0.5f) * tablet_count(); + }; + + while (tablet_count() > 1) { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = merge_threshold() - 1 }}, + } + }; + + auto old_tablet_count = tablet_count(); + check_tablet_invariants(stm.get()->tablets()); + do_rebalance_tablets(std::move(load_stats)); + check_tablet_invariants(stm.get()->tablets()); + BOOST_REQUIRE_LT(tablet_count(), old_tablet_count); + } +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) { + do_with_cql_env_thread([] (auto& e) { + auto seed = tests::random::get_int(); + std::mt19937 random_engine{seed}; + + testlog.info("test_load_balancing_merge_colocation - seed {}", seed); + + for (auto i = 0; i < 10; i++) { + const int rf = tests::random::get_int(3, 3); + const int n_racks = rf; + const int n_hosts = tests::random::get_int(n_racks * rf, n_racks * rf * 2); + const unsigned shard_count = tests::random::get_int(2, 12); + const unsigned total_shard_count = n_hosts * shard_count; + const unsigned initial_tablets = std::bit_ceil(tests::random::get_int(total_shard_count, total_shard_count * 10)); + + auto set_tablets = [rf, shard_count] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + for (auto tid : tmap.tablet_ids()) { + testlog.debug("allocating replica in racks with rf {}", rf); + std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); + tablet_replica_set replicas; + replicas.reserve(replica_hosts.size()); + for (auto h : replica_hosts) { + replicas.push_back(tablet_replica {h, tests::random::get_int(0, shard_count - 1)}); + } + testlog.debug("allocating replicas for tablet {}: {}", tid, replicas); + tmap.set_tablet(tid, tablet_info {std::move(replicas)}); + } + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + } + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) { + do_with_cql_env_thread([] (auto& e) { + const int rf = 2; + const int n_racks = 1; + const int n_hosts = 2; + const unsigned shard_count = 2; + const unsigned initial_tablets = 2; + + auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + auto& hosts = hosts_by_rack.at(racks.front().rack); + auto host1 = hosts[0]; + auto host2 = hosts[1]; + tmap.set_tablet(tablet_id(0), tablet_info { + tablet_replica_set { + tablet_replica {host1, shard_id(0)}, + tablet_replica {host2, shard_id(0)}, + } + }); + tmap.set_tablet(tablet_id(1), tablet_info { + tablet_replica_set { + tablet_replica {host2, shard_id(0)}, + tablet_replica {host1, shard_id(0)}, + } + }); + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) { + do_with_cql_env_thread([] (auto& e) { + const int rf = 3; + const int n_racks = 1; + const int n_hosts = 4; + const unsigned shard_count = 2; + const unsigned initial_tablets = 2; + + auto set_tablets = [&] (token_metadata& tm, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { + auto& rack = racks.front(); + auto& hosts = hosts_by_rack.at(rack.rack); + BOOST_REQUIRE(hosts.size() == 4); + auto a = hosts[0]; + auto b = hosts[1]; + auto c = hosts[2]; + auto d = hosts[3]; + + // nodes = {A, B, C, D} + // tablet1 = {A, B, C} + // tablet2 = {A, B, D} + // viable target for {tablet1, B} is D. + // viable target for {tablet2, B} is C. + // + // Decomission should succeed by migrating away even co-located replicas of sibling tablets that don't share viable targets. + // That should produce: + // tablet1 = {A, D, C} + // tablet2 = {A, C, D} + + auto decision = tmap.resize_decision(); + decision.way = locator::resize_decision::merge{}; + tmap.set_resize_decision(std::move(decision)); + tm.update_topology(b, rack, node::state::being_decommissioned, shard_count); + + tmap.set_tablet(tablet_id(0), tablet_info { + tablet_replica_set { + tablet_replica {a, shard_id(0)}, + tablet_replica {b, shard_id(0)}, + tablet_replica {c, shard_id(0)}, + } + }); + tmap.set_tablet(tablet_id(1), tablet_info { + tablet_replica_set { + tablet_replica {a, shard_id(0)}, + tablet_replica {b, shard_id(0)}, + tablet_replica {d, shard_id(0)}, + } + }); + }; + + do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); + }).get(); +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { inet_address ip1("192.168.0.1"); From d93a0040e5c1a23cb2e9be204d57721be63c612b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 12 Sep 2024 12:15:04 -0300 Subject: [PATCH 31/31] docs: Document tablet merging Signed-off-by: Raphael S. Carvalho --- docs/dev/topology-over-raft.md | 53 +++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 79ec3cedef..d824dac8eb 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -330,7 +330,7 @@ Invariants: on behalf of previous transitions can still run in the cluster, but they can have no side effects. This is ensured by the proper use of the topology guard mechanism (see the "Topology guards" section). -# Tablet splitting +# Tablet resize Each table has its resize metadata stored in group0. @@ -345,6 +345,8 @@ for a given table, which can be done by dividing average table size[1] by the ta [1]: The average size of a table is the total size across all DCs divided by the number of replicas across all DCs. +## Tablet splitting + A table will need split if its average size surpasses the split threshold, which is 100% of the target tablet size, which defaults to 5G. The reasoning is that after split we want average size to return to the target size. By the same reason, merge threshold is 50% of target size. @@ -376,6 +378,55 @@ which is a result of splitting each preexisting tablet into two, is committed to The replicas will react to that by remapping its compaction groups into a new set which is, at least, twice as large as the old one. +## Tablet merging + +A table will need merge if its average size is below the merge threshold, which is 50% of the target +tablet size, which defaults to 5G. The reasoning is that after merge we want average size to return +to the target size. This hysteresis is important to avoid oscillations between splits and merges. + +The initial tablet count (the parameter in schema) is respected while the table is in "growing mode". +Every table starts in this mode and will leave it if for example there was a need to split beyond +the initial tablet count. After a table leaves the mode, the average size can be trusted to determine +that the table is shrinking. + +When the load balancer decides to merge a table, the resize_type field in tablet metadata will be set +to 'merge' and resize_seq_number is bumped to the next sequence number. +Similar to split, the load balancer might decide to revoke an ongoing merge if it realizes that after +merge, a split will be needed. + +The merge preparation phase is done by co-locating replicas of sibling tablets on the same node:shard, +through migrations (the mechanism). Unlike split, all the preparation is done by the coordinator. +We say that a pair of tablets are siblings if they will become one after merge. This is built on the +power-of-two constraint. For example, if a table has 4 tablets, the siblings are (0, 1) and (2, 3). +The co-location algorithm is simple. The balancer will produce a migration for "odd" tablet to follow the +"even" one. For example, a replica of tablet 1 will be moved to where a replica of tablet 0 lives. +If the "odd" tablet lives on the same node but on different shard, an intra-node migration is performed. + +Without co-location, the merge completion handler wouldn't be able to find data of replicas to be merged +in the same location. Making it impossible for coordinator to merge the replica sets, and the replica +layer to combine the data together. + +Merge has low priority, so the co-location migrations will be emitted when there's no more important +work to do (e.g. node draining or regular balancing). The regular balancing will not undo the co-location +work done so far by migrating co-located replicas together (treating them as merged). + +Once the balancer realizes replicas of all sibling tablets are co-located, a decision will be emitted +to finalize the merge. A pair of sibling tablets is considered co-located if their replica sets are +equal, i.e. (s1 + s2) == s1. The finalization is serialized with migration, as shrinking tablet count +would interfere with the migration process that requires tablet id stability. + +When the coordinator leaves the migration track, and there are tables waiting for merge to be finalized, +the state machine will transition into `tablet_resize_finalization` state. At this moment, there will +be no migration running in the system. A global token metadata barrier is executed to make sure that no +process will hold stale topology when resizing the tablet map. That's important since the requests must +find a replica state consistent with the one in group0. +The handler of `tablet_resize_finalization` state will check if the decision is still to merge for a +table, and if so, the tablet map will have its size reduced by a factor of 2. When replicas of sibling +tablets are co-located, their replica sets can be merged into one, since (s1 + s2) == s1. +Once the new map is committed to group0, replicas will react to that by resizing their internal structure +to match the new tablet count, and also merging the compaction groups (sstable(s) + memtable) that +belonged to sibling tablets together. + # Sharding with tablets Each table can have different shard assignment for a given token computed from the placement of tablet replicas,