Merge 'Add tablet merge support' from Raphael Raph Carvalho

The goal of merge is to reduce the tablet count for a shrinking table. Similar to how split increases the count while the table is growing. The load balancer decision to merge is implemented today (came with infrastructure introduced for split), but it wasn't handled until now.

Initial tablet count is respected while the table is in "growing mode". For example, the table leaves it if there was a need to split above the initial tablet count. After the table leaves the mode, the average size can be trusted to determine that the table is shrinking. Merge decision is emitted if the average tablet size is 50% of the target. Hysteresis is applied to avoid oscillations between split and merges.

Similar to split, the decision to merge is recorded in tablet map's resize_type field with the string "merge". This is important in case of coordinator failover, so new coordinator continues from where the old left off.

Unlike split, the preparation phase during merge is not done by the replica (with split compactions), but rather by the coordinator by co-locating sibling tablets in the same node's shard. We can define sibling tablets as tablets that have contiguous range and will become one after merge. The concept is based on the power-of-two constraint and token contiguity. For example, in a table with 4 tablets, tablets of ids 0 and 1 are siblings, 2 and 3 are also siblings.

The algorithm for co-locating sibling tablets is very simple. The balancer is responsible for it, and it will emit migrations so that "odd" tablet will follow the "even" one. For example, tablet 1 will be migrated to where tablet 0 lives. Co-location is low in priority, it's not the end of the world to delay merge, but it's not ideal to delay e.g. decommission or even regular load balancing as that can translate into temporary unbalancing, impacting the user activities. So co-location migrations will happen when there is no more important work to do.
While regular balancing is higher in priority, it will not undo the co-location work done so far. It does that by treating co-located tablets as if they were already merged. The load inversion convergence check was adjusted so balancer understand when two tablets are being migrated instead of one, to avoid oscillations.

When balancer completes co-location work for a table undergoing merge, it will put the id of the table into the resize_plan, which is about communicating with the topology coordinator that a table is ready for it. With all sibling tablets co-located, the coordinator can resize the tablet map (reduce it by a factor of 2) and record the new map into group0. All the replicas will react to it (on token metadata update) by merging the storage (memtable(s) + sstables) of sibling tablets into one.

Fixes #18181.

system test details:

test: https://github.com/pehala/scylla-cluster-tests/blob/tablets_split_merge/tablets_split_merge_test.py
yaml file: https://github.com/pehala/scylla-cluster-tests/blob/tablets_split_merge/test-cases/features/tablets/tablets-split-merge-test.yaml

instance type: i3.8xlarge
nodes: 3
target tablet size: 0.5G (scaled down by 10, to make it easier to trigger splits and merges)
description: multiple cycles of growing and shrinking the data set in order to trigger splits and merges.
data_set_size: ~100G
initial_tablets: 64, so it grew to 128 tablets on split, and back to 64 on merge.

latency of reads and writes that happened in parallel to split and merge:
```
$ for i in scylla-bench*; do cat $i | grep "Mode\|99th:\|99\.9th:"; done
Mode:			 write
  99.9th:	 3.145727ms
  99th:		 1.998847ms
  99.9th:	 3.145727ms
  99th:		 2.031615ms
Mode:			 read
  99.9th:	 3.145727ms
  99th:		 2.031615ms
  99.9th:	 3.145727ms
  99th:		 2.031615ms
Mode:			 write
  99.9th:	 3.047423ms
  99th:		 1.933311ms
  99.9th:	 3.047423ms
  99th:		 1.933311ms
Mode:			 read
  99.9th:	 3.145727ms
  99th:		 1.900543ms
  99.9th:	 3.145727ms
  99th:		 1.900543ms
Mode:			 write
  99.9th:	 5.079039ms
  99th:		 3.604479ms
  99.9th:	 35.389439ms
  99th:		 25.624575ms
Mode:			 write
  99.9th:	 3.047423ms
  99th:		 1.998847ms
  99.9th:	 3.047423ms
  99th:		 1.998847ms
Mode:			 read
  99.9th:	 3.080191ms
  99th:		 2.031615ms
  99.9th:	 3.112959ms
  99th:		 2.031615ms
```

Closes scylladb/scylladb#20572

* github.com:scylladb/scylladb:
  docs: Document tablet merging
  tests/boost: Add test to verify correctness of balancer decisions during merge
  tests/topology_experimental_raft: Add tablet merge test
  service: Handle exception when retrying split
  service: Co-locate sibling tablets for a table undergoing merge
  gms: Add cluster feature for tablet merge
  service: Make merge of resize plan commutative
  replica: Implement merging of compaction groups on merge completion
  replica: Handle tablet merge completion
  service: Implement tablet map resize for merge
  locator: Introduce merge_tablet_info()
  service: Rename topology::transition_state::tablet_split_finalization
  service: Respect initial_tablet_count if table is in growing mode
  service: Wire migration_tablet_set into the load balancer
  locator: Add tablet_map::sibling_tablets()
  service: Introduce sorted_replicas_for_tablet_load()
  locator/tablets: Extend tablet_replica equality comparator to three-way
  service: Introduce alias to per-table candidate map type
  service: Add replication constraint check variant for migration_tablet_set
  service: Add convergence check variant for migration_tablet_set
  service: Add migration helpers for migration_tablet_set
  service/tablet_allocator: Introduce migration_tablet_set
  service: Introduce migration_plan::add(migrations_vector)
  locator/tablets: Introduce tablet_map::for_each_sibling_tablets()
  locator/tablets: Introduce tablet_map::needs_merge()
  locator/tablets: Introduce resize_decision::initial_decision()
  locator/tablets: Fix return type of three-way comparison operators
  service: Extract update of node load on migrations
  service: Extract converge check for intra-node migration
  service: Extract erase of tablet replicas from candidate list
  scripts/tablet-mon: Allow visualization of tablet id
This commit is contained in:
Tomasz Grabiec
2024-12-06 18:06:20 +01:00
16 changed files with 1709 additions and 177 deletions

View File

@@ -333,7 +333,7 @@ Invariants:
on behalf of previous transitions can still run in the cluster, but they can have no side effects. This is ensured
by the proper use of the topology guard mechanism (see the "Topology guards" section).
# Tablet splitting
# Tablet resize
Each table has its resize metadata stored in group0.
@@ -349,6 +349,8 @@ for a given table, which can be done by dividing average table size[1] by the ta
[1]: The average size of a table is the total size across all DCs divided by the number of replicas across
all DCs.
## Tablet splitting
A table will need split if its average size surpasses the split threshold, which is 100% of the target
tablet size, which defaults to 5G. The reasoning is that after split we want average size to return
to the target size. By the same reason, merge threshold is 50% of target size.
@@ -373,13 +375,62 @@ emits a decision to finalize the split request. The finalization is serialized w
doubling tablet count would interfere with the migration process.
When the state machine leaves the migration track, and there are tablets waiting for tablet split to
be finalized, the topology will transition into `tablet_split_finalization` state. At this moment, there will
be finalized, the topology will transition into `tablet_resize_finalization` state. At this moment, there will
be no migration running in the system. A global token metadata barrier is executed to make sure that no
process e.g. repair will be holding stale metadata when finalizing split. After that, the new tablet map,
which is a result of splitting each preexisting tablet into two, is committed to group0.
The replicas will react to that by remapping its compaction groups into a new set which is, at least,
twice as large as the old one.
## Tablet merging
A table will need merge if its average size is below the merge threshold, which is 50% of the target
tablet size, which defaults to 5G. The reasoning is that after merge we want average size to return
to the target size. This hysteresis is important to avoid oscillations between splits and merges.
The initial tablet count (the parameter in schema) is respected while the table is in "growing mode".
Every table starts in this mode and will leave it if for example there was a need to split beyond
the initial tablet count. After a table leaves the mode, the average size can be trusted to determine
that the table is shrinking.
When the load balancer decides to merge a table, the resize_type field in tablet metadata will be set
to 'merge' and resize_seq_number is bumped to the next sequence number.
Similar to split, the load balancer might decide to revoke an ongoing merge if it realizes that after
merge, a split will be needed.
The merge preparation phase is done by co-locating replicas of sibling tablets on the same node:shard,
through migrations (the mechanism). Unlike split, all the preparation is done by the coordinator.
We say that a pair of tablets are siblings if they will become one after merge. This is built on the
power-of-two constraint. For example, if a table has 4 tablets, the siblings are (0, 1) and (2, 3).
The co-location algorithm is simple. The balancer will produce a migration for "odd" tablet to follow the
"even" one. For example, a replica of tablet 1 will be moved to where a replica of tablet 0 lives.
If the "odd" tablet lives on the same node but on different shard, an intra-node migration is performed.
Without co-location, the merge completion handler wouldn't be able to find data of replicas to be merged
in the same location. Making it impossible for coordinator to merge the replica sets, and the replica
layer to combine the data together.
Merge has low priority, so the co-location migrations will be emitted when there's no more important
work to do (e.g. node draining or regular balancing). The regular balancing will not undo the co-location
work done so far by migrating co-located replicas together (treating them as merged).
Once the balancer realizes replicas of all sibling tablets are co-located, a decision will be emitted
to finalize the merge. A pair of sibling tablets is considered co-located if their replica sets are
equal, i.e. (s1 + s2) == s1. The finalization is serialized with migration, as shrinking tablet count
would interfere with the migration process that requires tablet id stability.
When the coordinator leaves the migration track, and there are tables waiting for merge to be finalized,
the state machine will transition into `tablet_resize_finalization` state. At this moment, there will
be no migration running in the system. A global token metadata barrier is executed to make sure that no
process will hold stale topology when resizing the tablet map. That's important since the requests must
find a replica state consistent with the one in group0.
The handler of `tablet_resize_finalization` state will check if the decision is still to merge for a
table, and if so, the tablet map will have its size reduced by a factor of 2. When replicas of sibling
tablets are co-located, their replica sets can be merged into one, since (s1 + s2) == s1.
Once the new map is committed to group0, replicas will react to that by resizing their internal structure
to match the new tablet count, and also merging the compaction groups (sstable(s) + memtable) that
belonged to sibling tablets together.
# Sharding with tablets
Each table can have different shard assignment for a given token computed from the placement of tablet replicas,

View File

@@ -145,6 +145,7 @@ public:
gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv };
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
// A feature just for use in tests. It must not be advertised unless
// the "features_enable_test_feature" injection is enabled.

View File

@@ -31,6 +31,14 @@ namespace locator {
seastar::logger tablet_logger("tablets");
std::optional<std::pair<tablet_id, tablet_id>> tablet_map::sibling_tablets(tablet_id t) const {
if (tablet_count() == 1) {
return std::nullopt;
}
auto first_sibling = tablet_id(t.value() & ~0x1);
return std::make_pair(first_sibling, *next_tablet(first_sibling));
}
static
write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) {
@@ -152,6 +160,32 @@ bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info&
return false;
}
tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info)
: replicas(std::move(replicas))
, repair_time(repair_time)
, repair_task_info(std::move(repair_task_info))
{}
tablet_info::tablet_info(tablet_replica_set replicas)
: tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{})
{}
std::optional<tablet_info> merge_tablet_info(tablet_info a, tablet_info b) {
if (a.repair_task_info.is_valid() || b.repair_task_info.is_valid()) {
return {};
}
auto sorted = [] (tablet_replica_set rs) {
std::ranges::sort(rs, std::less<tablet_replica>());
return rs;
};
if (sorted(a.replicas) != sorted(b.replicas)) {
return {};
}
auto repair_time = std::max(a.repair_time, b.repair_time);
return tablet_info(std::move(a.replicas), repair_time, a.repair_task_info);
}
std::optional<tablet_replica> get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
auto leaving = substract_sets(tinfo.replicas, trinfo.next);
@@ -407,6 +441,24 @@ future<> tablet_map::for_each_tablet(seastar::noncopyable_function<future<>(tabl
}
}
future<> tablet_map::for_each_sibling_tablets(seastar::noncopyable_function<future<>(tablet_desc, std::optional<tablet_desc>)> func) const {
auto make_desc = [this] (tablet_id tid) {
return tablet_desc{tid, &get_tablet_info(tid), get_tablet_transition_info(tid)};
};
if (_tablets.size() == 1) {
co_return co_await func(make_desc(first_tablet()), std::nullopt);
}
for (std::optional<tablet_id> tid = first_tablet(); tid; tid = next_tablet(*tid)) {
auto tid1 = tid;
auto tid2 = tid = next_tablet(*tid);
if (!tid2) {
// Cannot happen with power-of-two invariant.
throw std::logic_error(format("Cannot retrieve sibling tablet with tablet count {}", tablet_count()));
}
co_await func(make_desc(*tid1), make_desc(*tid2));
}
}
void tablet_map::clear_transitions() {
_transitions.clear();
}
@@ -541,6 +593,10 @@ bool tablet_map::needs_split() const {
return std::holds_alternative<resize_decision::split>(_resize_decision.way);
}
bool tablet_map::needs_merge() const {
return std::holds_alternative<resize_decision::merge>(_resize_decision.way);
}
const locator::resize_decision& tablet_map::resize_decision() const {
return _resize_decision;
}
@@ -581,6 +637,10 @@ resize_decision::seq_number_t resize_decision::next_sequence_number() const {
return (sequence_number == std::numeric_limits<seq_number_t>::max()) ? 0 : sequence_number + 1;
}
bool resize_decision::initial_decision() const {
return sequence_number == 0;
}
table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept {
size_in_bytes = size_in_bytes + s.size_in_bytes;
split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number);

View File

@@ -47,7 +47,7 @@ struct tablet_id {
explicit tablet_id(size_t id) : id(id) {}
size_t value() const { return id; }
explicit operator size_t() const { return id; }
bool operator<=>(const tablet_id&) const = default;
auto operator<=>(const tablet_id&) const = default;
};
/// Identifies tablet (not be confused with tablet replica) in the scope of the whole cluster.
@@ -55,14 +55,14 @@ struct global_tablet_id {
table_id table;
tablet_id tablet;
bool operator<=>(const global_tablet_id&) const = default;
auto operator<=>(const global_tablet_id&) const = default;
};
struct tablet_replica {
host_id host;
shard_id shard;
bool operator==(const tablet_replica&) const = default;
auto operator<=>(const tablet_replica&) const = default;
};
using tablet_replica_set = utils::small_vector<tablet_replica, 3>;
@@ -171,9 +171,19 @@ struct tablet_info {
db_clock::time_point repair_time;
locator::tablet_task_info repair_task_info;
tablet_info() = default;
tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info);
tablet_info(tablet_replica_set);
bool operator==(const tablet_info&) const = default;
};
// Merges tablet_info b into a, but with following constraints:
// - they cannot have active repair task, since each task has a different id
// - their replicas must be all co-located.
// If tablet infos are mergeable, merged info is returned. Otherwise, nullopt.
std::optional<tablet_info> merge_tablet_info(tablet_info a, tablet_info b);
/// Represents states of the tablet migration state machine.
///
/// The stage serves two major purposes:
@@ -312,6 +322,8 @@ struct resize_decision {
bool operator==(const resize_decision&) const;
sstring type_name() const;
seq_number_t next_sequence_number() const;
// Returns true if this is the initial decision, before split or merge was emitted.
bool initial_decision() const;
};
struct table_load_stats {
@@ -346,6 +358,12 @@ struct repair_scheduler_config {
using load_stats_ptr = lw_shared_ptr<const load_stats>;
struct tablet_desc {
tablet_id tid;
const tablet_info* info; // cannot be null.
const tablet_transition_info* transition; // null if there's no transition.
};
/// Stores information about tablets of a single table.
///
/// The map contains a constant number of tablets, tablet_count().
@@ -440,6 +458,11 @@ public:
return tablet_id(size_t(t) + 1);
}
// Returns the pair of sibling tablets for a given tablet id.
// For example, if id 1 is provided, a pair of 0 and 1 is returned.
// Returns disengaged optional when sibling pair cannot be found.
std::optional<std::pair<tablet_id, tablet_id>> sibling_tablets(tablet_id t) const;
/// Returns true iff tablet has a given replica.
/// If tablet is in transition, considers both previous and next replica set.
bool has_replica(tablet_id, tablet_replica) const;
@@ -451,6 +474,10 @@ public:
/// Calls a given function for each tablet in the map in token ownership order.
future<> for_each_tablet(seastar::noncopyable_function<future<>(tablet_id, const tablet_info&)> func) const;
/// Calls a given function for each sibling tablet in the map in token ownership order.
/// If tablet count == 1, then there will be only one call and 2nd tablet_desc is disengaged.
future<> for_each_sibling_tablets(seastar::noncopyable_function<future<>(tablet_desc, std::optional<tablet_desc>)> func) const;
const auto& transitions() const {
return _transitions;
}
@@ -480,6 +507,7 @@ public:
bool operator==(const tablet_map&) const = default;
bool needs_split() const;
bool needs_merge() const;
/// Returns the token_range in which the given token will belong to after a tablet split
dht::token_range get_token_range_after_split(const token& t) const noexcept;

View File

@@ -91,6 +91,9 @@ public:
compaction_group(table& t, size_t gid, dht::token_range token_range);
~compaction_group();
void update_id(size_t id) {
_group_id = id;
}
void update_id_and_range(size_t id, dht::token_range token_range) {
_group_id = id;
_token_range = std::move(token_range);
@@ -155,6 +158,9 @@ public:
// invalidated and statistics are updated.
future<> update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc);
// Merges all sstables from another group into this one.
future<> merge_sstables_from(compaction_group& group);
const lw_shared_ptr<sstables::sstable_set>& main_sstables() const noexcept;
void set_main_sstables(lw_shared_ptr<sstables::sstable_set> new_main_sstables);
@@ -163,6 +169,7 @@ public:
// Makes a sstable set, which includes all sstables managed by this group
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const;
std::vector<sstables::shared_sstable> all_sstables() const;
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept;
// Triggers regular compaction.
@@ -203,6 +210,9 @@ using const_compaction_group_ptr = lw_shared_ptr<const compaction_group>;
// shard will have as many groups as there are tablet replicas owned by that shard.
class storage_group {
compaction_group_ptr _main_cg;
// Holds compaction groups that now belongs to same tablet after merge. Compaction groups here will
// eventually have all their data moved into main group.
std::vector<compaction_group_ptr> _merging_groups;
std::vector<compaction_group_ptr> _split_ready_groups;
seastar::gate _async_gate;
private:
@@ -231,6 +241,13 @@ public:
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
void add_merging_group(compaction_group_ptr);
const std::vector<compaction_group_ptr>& merging_groups() const;
future<> remove_empty_merging_groups();
// Puts the storage group in split mode, in which it internally segregates data
// into two sstable sets and two memtable sets corresponding to the two adjacent
// tablets post-split.
@@ -271,6 +288,8 @@ using storage_group_map = absl::flat_hash_map<size_t, storage_group_ptr, absl::H
class storage_group_manager {
protected:
storage_group_map _storage_groups;
protected:
virtual future<> stop() = 0;
public:
virtual ~storage_group_manager();

View File

@@ -11,6 +11,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/switch_to.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/defer.hh>
@@ -636,7 +637,8 @@ const storage_group_map& storage_group_manager::storage_groups() const {
}
future<> storage_group_manager::stop_storage_groups() noexcept {
return parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); });
co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); });
co_await stop();
}
void storage_group_manager::clear_storage_groups() {
@@ -687,6 +689,10 @@ public:
_storage_groups = std::move(r);
}
future<> stop() override {
return make_ready_future<>();
}
future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override { return make_ready_future(); }
compaction_group& compaction_group_for_token(dht::token token) const noexcept override {
@@ -738,6 +744,8 @@ class tablet_storage_group_manager final : public storage_group_manager {
// current split, and not a previously revoked (stale) decision.
// The minimum value, which is a negative number, is not used by coordinator for first decision.
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
future<> _merge_completion_fiber;
condition_variable _merge_completion_event;
private:
const schema_ptr& schema() const {
return _t.schema();
@@ -758,6 +766,17 @@ private:
// that were previously split.
future<> handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
// Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into
// the new tablet id (X >> 1). In practice, that means storage groups for X and X+1
// are merged into a new storage group with id (X >> 1).
future<> handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
// When merge completes, compaction groups of sibling tablets are added to same storage
// group, but they're not merged yet into one, since the merge completion handler happens
// inside the erm updater which must complete ASAP. Therefore, those groups will be merged
// into a single one (main) in background.
future<> merge_completion_fiber();
storage_group& storage_group_for_id(size_t i) const {
return storage_group_manager::storage_group_for_id(schema(), i);
}
@@ -796,6 +815,7 @@ public:
: _t(t)
, _my_host_id(erm.get_token_metadata().get_my_id())
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
, _merge_completion_fiber(merge_completion_fiber())
{
storage_group_map ret;
@@ -813,6 +833,11 @@ public:
_storage_groups = std::move(ret);
}
future<> stop() override {
_merge_completion_event.signal();
return std::exchange(_merge_completion_fiber, make_ready_future<>());
}
future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override;
compaction_group& compaction_group_for_token(dht::token token) const noexcept override;
@@ -875,6 +900,9 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
action(_main_cg);
for (auto& cg : _merging_groups) {
action(cg);
}
for (auto& cg : _split_ready_groups) {
action(cg);
}
@@ -896,6 +924,17 @@ utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_gro
return cgs;
}
utils::small_vector<compaction_group_ptr, 3> storage_group::split_unready_groups() const {
utils::small_vector<compaction_group_ptr, 3> cgs;
cgs.push_back(_main_cg);
std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs));
return cgs;
}
bool storage_group::split_unready_groups_are_empty() const {
return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty));
}
bool storage_group::set_split_mode() {
if (!splitting_mode()) {
auto create_cg = [this] () -> compaction_group_ptr {
@@ -908,8 +947,23 @@ bool storage_group::set_split_mode() {
_split_ready_groups = std::move(split_ready_groups);
}
// The storage group is considered "split ready" if its main compaction group is empty.
return _main_cg->empty();
// The storage group is considered "split ready" if all split unready groups (main + merging) are empty.
return split_unready_groups_are_empty();
}
void storage_group::add_merging_group(compaction_group_ptr cg) {
_merging_groups.push_back(std::move(cg));
}
const std::vector<compaction_group_ptr>& storage_group::merging_groups() const {
return _merging_groups;
}
future<> storage_group::remove_empty_merging_groups() {
for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) {
co_await group->stop("tablet merge");
}
std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty));
}
future<> storage_group::split(sstables::compaction_type_options::split opt) {
@@ -918,24 +972,34 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) {
}
co_await utils::get_local_injector().inject("delay_split_compaction", 5s);
if (_main_cg->empty()) {
if (split_unready_groups_are_empty()) {
co_return;
}
auto holder = _main_cg->async_gate().hold();
co_await _main_cg->flush();
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{});
co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{});
for (auto cg : split_unready_groups()) {
if (cg->async_gate().is_closed()) {
continue;
}
auto holder = cg->async_gate().hold();
co_await cg->flush();
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
co_await cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{});
co_await cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{});
}
}
lw_shared_ptr<const sstables::sstable_set> storage_group::make_sstable_set() const {
if (!splitting_mode()) {
if (_split_ready_groups.empty() && _merging_groups.empty()) {
return _main_cg->make_sstable_set();
}
const auto& schema = _main_cg->_t.schema();
std::vector<lw_shared_ptr<sstables::sstable_set>> underlying;
underlying.reserve(1 + _split_ready_groups.size());
underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size());
underlying.emplace_back(_main_cg->make_sstable_set());
for (const auto& cg : _merging_groups) {
if (!cg->empty()) {
underlying.emplace_back(cg->make_sstable_set());
}
}
for (const auto& cg : _split_ready_groups) {
underlying.emplace_back(cg->make_sstable_set());
}
@@ -1141,7 +1205,9 @@ future<> table::parallel_foreach_compaction_group(std::function<future<>(compact
void table::for_each_compaction_group(std::function<void(compaction_group&)> action) {
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
action(*cg);
if (auto holder = try_hold_gate(cg->async_gate())) {
action(*cg);
}
});
});
}
@@ -1149,7 +1215,9 @@ void table::for_each_compaction_group(std::function<void(compaction_group&)> act
void table::for_each_compaction_group(std::function<void(const compaction_group&)> action) const {
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
action(*cg);
if (auto holder = try_hold_gate(cg->async_gate())) {
action(*cg);
}
});
});
}
@@ -1803,6 +1871,35 @@ compaction_group::delete_unused_sstables(sstables::compaction_completion_desc de
return delete_sstables_atomically(std::move(sstables_to_remove));
}
std::vector<sstables::shared_sstable> compaction_group::all_sstables() const {
std::vector<sstables::shared_sstable> all;
auto main_sstables = _main_sstables->all();
auto maintenance_sstables = _maintenance_sstables->all();
all.reserve(main_sstables->size() + maintenance_sstables->size());
std::ranges::copy(*main_sstables, std::back_inserter(all));
std::ranges::copy(*maintenance_sstables, std::back_inserter(all));
return all;
}
future<>
compaction_group::merge_sstables_from(compaction_group& group) {
auto& cs = _t.get_compaction_strategy();
auto permit = co_await seastar::get_units(_t._sstable_set_mutation_sem, 1);
table::sstable_list_builder builder(std::move(permit));
auto sstables_to_merge = group.all_sstables();
// re-build new list for this group with sstables of the group being merged.
auto res = co_await builder.build_new_list(*main_sstables(), cs.make_sstable_set(_t.schema()), sstables_to_merge, {});
// execute:
std::invoke([&] noexcept {
set_main_sstables(std::move(res.new_sstable_set));
group.clear_sstables();
// FIXME: backlog adjustment is not exception safe.
backlog_tracker_adjust_charges({}, sstables_to_merge);
});
_t.rebuild_statistics();
}
future<>
compaction_group::update_sstable_sets_on_compaction_completion(sstables::compaction_completion_desc desc) {
// Build a new list of _sstables: We remove from the existing list the
@@ -2393,7 +2490,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
// Stop the released main compaction groups asynchronously
future<> stop_fut = make_ready_future<>();
for (auto& [id, sg] : _storage_groups) {
if (!sg->main_compaction_group()->empty()) {
if (!sg->split_unready_groups_are_empty()) {
on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \
"therefore groups cannot be remapped with the new tablet count.",
id, table_id));
@@ -2428,6 +2525,79 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
return stop_fut;
}
future<> tablet_storage_group_manager::merge_completion_fiber() {
co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group);
while (!_t.async_gate().is_closed()) {
try {
co_await for_each_storage_group_gently([] (storage_group& sg) -> future<> {
auto main_group = sg.main_compaction_group();
for (auto& group : sg.merging_groups()) {
// Synchronize with ongoing writes that might be blocked waiting for memory.
// Also, disabling compaction provides stability on the sstable set.
co_await group->stop("tablet merge");
// Flushes memtable, so all the data can be moved.
co_await group->flush();
co_await main_group->merge_sstables_from(*group);
}
co_await sg.remove_empty_merging_groups();
});
} catch (...) {
tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name());
}
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
tlogger.info("Merge completion fiber finished, about to sleep");
});
co_await _merge_completion_event.wait();
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
}
}
future<> tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
auto table_id = schema()->id();
size_t old_tablet_count = old_tmap.tablet_count();
size_t new_tablet_count = new_tmap.tablet_count();
storage_group_map new_storage_groups;
unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count);
unsigned merge_size = 1 << log2_reduce_factor;
if (merge_size != 2) {
throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}",
old_tablet_count, new_tablet_count, table_id));
}
for (auto& [id, sg] : _storage_groups) {
// Pick first (even) tablet of each sibling pair.
if (id % merge_size != 0) {
continue;
}
auto new_tid = id >> log2_reduce_factor;
auto new_cg = make_lw_shared<compaction_group>(_t, new_tid, new_tmap.get_token_range(locator::tablet_id(new_tid)));
auto new_sg = make_lw_shared<storage_group>(std::move(new_cg));
for (unsigned i = 0; i < merge_size; i++) {
auto group_id = id + i;
auto it = _storage_groups.find(group_id);
if (it == _storage_groups.end()) {
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
}
auto& sg = it->second;
sg->for_each_compaction_group([&new_sg, new_tid] (const compaction_group_ptr& cg) {
cg->update_id(new_tid);
new_sg->add_merging_group(cg);
});
}
new_storage_groups[new_tid] = std::move(new_sg);
}
_storage_groups = std::move(new_storage_groups);
_merge_completion_event.signal();
return make_ready_future<>();
}
future<> tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) {
auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id());
auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map);
@@ -2439,6 +2609,11 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map);
co_return;
} else if (new_tablet_count < old_tablet_count) {
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
co_await handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
co_return;
}
// Allocate storage group if tablet is migrating in.
@@ -2846,9 +3021,7 @@ size_t compaction_group::memtable_count() const noexcept {
}
size_t storage_group::memtable_count() const noexcept {
auto memtable_count = [] (const compaction_group_ptr& cg) { return cg ? cg->memtable_count() : 0; };
return memtable_count(_main_cg) +
std::ranges::fold_left(_split_ready_groups | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
}
future<> table::flush(std::optional<db::replay_position> pos) {
@@ -3801,6 +3974,9 @@ future<> storage_group::stop(sstring reason) noexcept {
co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
return cg_ptr->stop(reason);
});
co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
return cg_ptr->stop(reason);
});
co_await std::move(closed_gate_fut);
}

View File

@@ -435,8 +435,17 @@ def update_from_cql(initial=False):
changed = True
tablets_by_shard = set()
tablet_id_by_table = {}
def tablet_id_for_table(table_id):
if table_id not in tablet_id_by_table:
tablet_id_by_table[table_id] = 0
ret = tablet_id_by_table[table_id]
tablet_id_by_table[table_id] += 1
return ret
for tablet in session.execute(tablets_query):
id = (tablet.table_id, tablet.last_token)
id = (tablet.table_id, tablet.last_token, tablet_id_for_table(tablet.table_id))
replicas = set(tablet.replicas)
new_replicas = set(tablet.new_replicas) if tablet.new_replicas else replicas
@@ -540,6 +549,7 @@ window_width = min(window_width, 3000)
window_height = min(window_height, 2000)
window = pygame.display.set_mode((window_width, window_height), pygame.RESIZABLE)
pygame.display.set_caption('Tablets')
number_font = pygame.font.SysFont(None, 20)
def draw_tablet(tablet, x, y):
tablet.x = x
@@ -567,6 +577,11 @@ def draw_tablet(tablet, x, y):
border_top_left_radius=tablet_radius,
border_top_right_radius=tablet_radius)
number_text = str(tablet.id[2])
number_image = number_font.render(number_text, True, BLACK)
window.blit(number_image, (x + tablet_frame_size + (w - number_image.get_width()) / 2,
y + tablet_frame_size + (h-1 - number_image.get_height()) / 2))
def draw_node_frame(x, y, x2, y2, color):
pygame.draw.rect(window, color, (x, y, x2 - x, y2 - y), node_frame_thickness,
border_radius=tablet_radius + tablet_frame_size + node_frame_mid)

View File

@@ -743,7 +743,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
[[fallthrough]];
case topology::transition_state::tablet_migration:
[[fallthrough]];
case topology::transition_state::tablet_split_finalization:
case topology::transition_state::tablet_resize_finalization:
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
@@ -5416,7 +5416,11 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
sleep = true;
}
if (sleep) {
co_await split_retry.retry(_group0_as);
try {
co_await split_retry.retry(_group0_as);
} catch (...) {
slogger.warn("Sleep in split monitor failed with {}", std::current_exception());
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -106,6 +106,7 @@ struct table_resize_plan {
resize[id] = std::move(other_resize);
}
}
finalize_resize.merge(std::move(other.finalize_resize));
}
};
@@ -153,6 +154,12 @@ public:
_migrations.emplace_back(std::move(info));
}
void add(migrations_vector migrations) {
for (auto&& mig : migrations) {
add(std::move(mig));
}
}
void merge(migration_plan&& other) {
std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations));
_has_nodes_to_drain |= other._has_nodes_to_drain;
@@ -166,8 +173,8 @@ public:
const table_resize_plan& resize_plan() const { return _resize_plan; }
void set_resize_plan(table_resize_plan resize_plan) {
_resize_plan = std::move(resize_plan);
void merge_resize_plan(table_resize_plan resize_plan) {
_resize_plan.merge(std::move(resize_plan));
}
const tablet_repair_plan& repair_plan() const { return _repair_plan; }
@@ -228,7 +235,7 @@ public:
void set_use_table_aware_balancing(bool);
future<locator::tablet_map> split_tablets(locator::token_metadata_ptr, table_id);
future<locator::tablet_map> resize_tablets(locator::token_metadata_ptr, table_id);
/// Should be called when the node is no longer a leader.
void on_leadership_lost();

View File

@@ -1537,7 +1537,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}
future<> handle_tablet_split_finalization(group0_guard g) {
future<> handle_tablet_resize_finalization(group0_guard g) {
// Executes a global barrier to guarantee that any process (e.g. repair) holding stale version
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
@@ -1550,7 +1550,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
for (auto& table_id : plan.resize_plan().finalize_resize) {
auto s = _db.find_schema(table_id);
auto new_tablet_map = co_await _tablet_allocator.split_tablets(tm, table_id);
auto new_tablet_map = co_await _tablet_allocator.resize_tablets(tm, table_id);
updates.emplace_back(co_await replica::tablet_map_to_mutation(
new_tablet_map,
table_id,
@@ -2168,8 +2168,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard), false);
break;
case topology::transition_state::tablet_split_finalization:
co_await handle_tablet_split_finalization(std::move(guard));
case topology::transition_state::tablet_resize_finalization:
co_await handle_tablet_resize_finalization(std::move(guard));
break;
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
@@ -2616,8 +2616,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// Returns true if the state machine was transitioned into tablet migration path.
future<bool> maybe_start_tablet_migration(group0_guard);
// Returns true if the state machine was transitioned into tablet split finalization path.
future<bool> maybe_start_tablet_split_finalization(group0_guard, const table_resize_plan& plan);
// Returns true if the state machine was transitioned into tablet resize finalization path.
future<bool> maybe_start_tablet_resize_finalization(group0_guard, const table_resize_plan& plan);
future<locator::load_stats> refresh_tablet_load_stats();
future<> start_tablet_load_stats_refresher();
@@ -2717,10 +2717,10 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
co_await generate_migration_updates(updates, guard, plan);
// We only want to consider transitioning into tablet split finalization path, if there's no other work
// We only want to consider transitioning into tablet resize finalization path, if there's no other work
// to be done (e.g. start migration or/and emit split decision).
if (updates.empty()) {
co_return co_await maybe_start_tablet_split_finalization(std::move(guard), plan.resize_plan());
co_return co_await maybe_start_tablet_resize_finalization(std::move(guard), plan.resize_plan());
}
updates.emplace_back(
@@ -2733,7 +2733,7 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
co_return true;
}
future<bool> topology_coordinator::maybe_start_tablet_split_finalization(group0_guard guard, const table_resize_plan& plan) {
future<bool> topology_coordinator::maybe_start_tablet_resize_finalization(group0_guard guard, const table_resize_plan& plan) {
if (plan.finalize_resize.empty()) {
co_return false;
}
@@ -2745,11 +2745,11 @@ future<bool> topology_coordinator::maybe_start_tablet_split_finalization(group0_
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_split_finalization)
.set_transition_state(topology::transition_state::tablet_resize_finalization)
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet split finalization");
co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet resize finalization");
co_return true;
}

View File

@@ -147,18 +147,27 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::write_both_read_old, "write both read old"},
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_migration, "tablet migration"},
{topology::transition_state::tablet_split_finalization, "tablet split finalization"},
{topology::transition_state::tablet_resize_finalization, "tablet resize finalization"},
{topology::transition_state::tablet_draining, "tablet draining"},
{topology::transition_state::left_token_ring, "left token ring"},
{topology::transition_state::rollback_to_normal, "rollback to normal"},
};
// Allows old deprecated names to be recognized and point to the correct transition.
static std::unordered_map<sstring, topology::transition_state> deprecated_name_to_transition_state = {
{"tablet split finalization", topology::transition_state::tablet_resize_finalization},
};
topology::transition_state transition_state_from_string(const sstring& s) {
for (auto&& e : transition_state_to_name_map) {
if (e.second == s) {
return e.first;
}
}
auto it = deprecated_name_to_transition_state.find(s);
if (it != deprecated_name_to_transition_state.end()) {
return it->second;
}
on_internal_error(tsmlogger, format("cannot map name {} to transition_state", s));
}

View File

@@ -110,7 +110,7 @@ struct topology {
write_both_read_old,
write_both_read_new,
tablet_migration,
tablet_split_finalization,
tablet_resize_finalization,
left_token_ring,
rollback_to_normal,
};

View File

@@ -33,6 +33,7 @@
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
#include "service/topology_coordinator.hh"
#include "service/topology_state_machine.hh"
#include <boost/regex.hpp>
@@ -1354,11 +1355,23 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) {
tmap.set_resize_decision(resize_decision);
});
}
}
static
future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) {
for (auto table_id : plan.resize_plan().finalize_resize) {
const auto& old_tmap = tm.tablets().get_tablet_map(table_id);
testlog.info("Setting new tablet map of size {}", old_tmap.tablet_count() * 2);
tablet_map tmap(old_tmap.tablet_count() * 2);
tm.tablets().set_tablet_map(table_id, std::move(tmap));
auto tm = stm.get();
const auto& old_tmap = tm->tablets().get_tablet_map(table_id);
auto new_tmap = co_await talloc.resize_tablets(tm, table_id);
auto new_resize_decision = locator::resize_decision{};
new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number();
new_tmap.set_resize_decision(std::move(new_resize_decision));
co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) {
tm.tablets().set_tablet_map(table_id, std::move(new_tmap));
return make_ready_future<>();
});
}
}
@@ -1368,6 +1381,7 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) {
for (auto&& mig : plan.migrations()) {
tm.tablets().mutate_tablet_map(mig.tablet.table, [&] (tablet_map& tmap) {
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
testlog.trace("Replacing tablet {} replica from {} to {}", mig.tablet.tablet, mig.src, mig.dst);
tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst);
tmap.set_tablet(mig.tablet.tablet, tinfo);
});
@@ -1399,6 +1413,9 @@ size_t get_tablet_count(const tablet_metadata& tm) {
return count;
}
static
void check_tablet_invariants(const tablet_metadata& tmeta);
static
void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, locator::load_stats_ptr load_stats = {}, std::unordered_set<host_id> skiplist = {}) {
// Sanity limit to avoid infinite loops.
@@ -1414,6 +1431,7 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, loc
apply_plan(tm, plan);
return make_ready_future<>();
}).get();
handle_resize_finalize(talloc, stm, plan).get();
}
throw std::runtime_error("rebalance_tablets(): convergence not reached within limit");
}
@@ -2420,13 +2438,37 @@ void check_tablet_invariants(const tablet_metadata& tmeta) {
std::unordered_set<host_id> hosts;
// Uniqueness of hosts
for (const auto& replica: tinfo.replicas) {
BOOST_REQUIRE(hosts.insert(replica.host).second);
auto ret = hosts.insert(replica.host).second;
if (!ret) {
testlog.error("Failed tablet invariant check for tablet {}: {}", tid, tinfo.replicas);
}
BOOST_REQUIRE(ret);
}
return make_ready_future<>();
}).get();
}
}
static
std::vector<host_id>
allocate_replicas_in_racks(const std::vector<endpoint_dc_rack>& racks, int rf,
const std::unordered_map<sstring, std::vector<host_id>>& hosts_by_rack) {
// Choose replicas randomly while loading racks evenly.
std::vector<host_id> replica_hosts;
for (int i = 0; i < rf; ++i) {
auto rack = racks[i % racks.size()];
auto& rack_hosts = hosts_by_rack.at(rack.rack);
while (true) {
auto candidate_host = rack_hosts[tests::random::get_int<shard_id>(0, rack_hosts.size() - 1)];
if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) {
replica_hosts.push_back(candidate_host);
break;
}
}
}
return replica_hosts;
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
do_with_cql_env_thread([] (auto& e) {
const int n_hosts = 6;
@@ -2480,18 +2522,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
tablet_map tmap(1 << log2_tablets);
for (auto tid : tmap.tablet_ids()) {
// Choose replicas randomly while loading racks evenly.
std::vector<host_id> replica_hosts;
for (int i = 0; i < rf; ++i) {
auto rack = racks[i % racks.size()];
auto& rack_hosts = hosts_by_rack[rack.rack];
while (true) {
auto candidate_host = rack_hosts[tests::random::get_int<shard_id>(0, rack_hosts.size() - 1)];
if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) {
replica_hosts.push_back(candidate_host);
break;
}
}
}
std::vector<host_id> replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack);
tablet_replica_set replicas;
for (auto h : replica_hosts) {
auto shard_count = tm.get_topology().find_node(h)->get_shard_count();
@@ -2606,6 +2637,205 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) {
}, std::move(cfg)).get();
}
using rack_vector = std::vector<endpoint_dc_rack>;
using hosts_by_rack_map = std::unordered_map<sstring, std::vector<host_id>>;
// runs in seastar thread.
static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts,
const unsigned shard_count, const unsigned initial_tablets,
std::function<void(token_metadata&, tablet_map&, const rack_vector&, const hosts_by_rack_map&)> set_tablets) {
rack_vector racks;
for (int i = 0; i < n_racks; i++) {
racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)});
}
testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets);
std::vector<host_id> hosts;
for (int i = 0; i < n_hosts; ++i) {
hosts.push_back(host_id(next_uuid()));
}
auto table1 = add_table(e).get();
hosts_by_rack_map hosts_by_rack;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = inet_address("192.168.0.1"),
.this_host_id = hosts[0],
.local_dc_rack = racks[std::min(1, n_racks - 1)]
}
});
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
tablet_metadata tmeta;
int i = 0;
for (auto h : hosts) {
auto ip = inet_address(format("192.168.0.{}", ++i));
tm.update_host_id(h, ip);
auto rack = racks[i % racks.size()];
hosts_by_rack[rack.rack].push_back(h);
tm.update_topology(h, rack, node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h);
testlog.debug("adding host {}, ip {}, rack {}, token {}", h, ip, rack.rack, token(tests::d2t(1. / hosts.size())));
}
tablet_map tmap(initial_tablets);
locator::resize_decision decision;
// leaves growing mode, allowing for merge decision.
decision.sequence_number = decision.next_sequence_number();
tmap.set_resize_decision(std::move(decision));
set_tablets(tm, tmap, racks, hosts_by_rack);
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
}).get();
auto tablet_count = [&] {
return stm.get()->tablets().get_tablet_map(table1).tablet_count();
};
auto do_rebalance_tablets = [&] (locator::load_stats load_stats) {
rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats)));
};
const uint64_t target_tablet_size = service::default_target_tablet_size;
auto merge_threshold = [&] () -> uint64_t {
return (target_tablet_size * 0.5f) * tablet_count();
};
while (tablet_count() > 1) {
locator::load_stats load_stats = {
.tables = {
{ table1, table_load_stats{ .size_in_bytes = merge_threshold() - 1 }},
}
};
auto old_tablet_count = tablet_count();
check_tablet_invariants(stm.get()->tablets());
do_rebalance_tablets(std::move(load_stats));
check_tablet_invariants(stm.get()->tablets());
BOOST_REQUIRE_LT(tablet_count(), old_tablet_count);
}
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) {
do_with_cql_env_thread([] (auto& e) {
auto seed = tests::random::get_int<int32_t>();
std::mt19937 random_engine{seed};
testlog.info("test_load_balancing_merge_colocation - seed {}", seed);
for (auto i = 0; i < 10; i++) {
const int rf = tests::random::get_int<int>(3, 3);
const int n_racks = rf;
const int n_hosts = tests::random::get_int<unsigned>(n_racks * rf, n_racks * rf * 2);
const unsigned shard_count = tests::random::get_int<unsigned>(2, 12);
const unsigned total_shard_count = n_hosts * shard_count;
const unsigned initial_tablets = std::bit_ceil<unsigned>(tests::random::get_int<unsigned>(total_shard_count, total_shard_count * 10));
auto set_tablets = [rf, shard_count] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
for (auto tid : tmap.tablet_ids()) {
testlog.debug("allocating replica in racks with rf {}", rf);
std::vector<host_id> replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack);
tablet_replica_set replicas;
replicas.reserve(replica_hosts.size());
for (auto h : replica_hosts) {
replicas.push_back(tablet_replica {h, tests::random::get_int<shard_id>(0, shard_count - 1)});
}
testlog.debug("allocating replicas for tablet {}: {}", tid, replicas);
tmap.set_tablet(tid, tablet_info {std::move(replicas)});
}
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) {
do_with_cql_env_thread([] (auto& e) {
const int rf = 2;
const int n_racks = 1;
const int n_hosts = 2;
const unsigned shard_count = 2;
const unsigned initial_tablets = 2;
auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
auto& hosts = hosts_by_rack.at(racks.front().rack);
auto host1 = hosts[0];
auto host2 = hosts[1];
tmap.set_tablet(tablet_id(0), tablet_info {
tablet_replica_set {
tablet_replica {host1, shard_id(0)},
tablet_replica {host2, shard_id(0)},
}
});
tmap.set_tablet(tablet_id(1), tablet_info {
tablet_replica_set {
tablet_replica {host2, shard_id(0)},
tablet_replica {host1, shard_id(0)},
}
});
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) {
do_with_cql_env_thread([] (auto& e) {
const int rf = 3;
const int n_racks = 1;
const int n_hosts = 4;
const unsigned shard_count = 2;
const unsigned initial_tablets = 2;
auto set_tablets = [&] (token_metadata& tm, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
auto& rack = racks.front();
auto& hosts = hosts_by_rack.at(rack.rack);
BOOST_REQUIRE(hosts.size() == 4);
auto a = hosts[0];
auto b = hosts[1];
auto c = hosts[2];
auto d = hosts[3];
// nodes = {A, B, C, D}
// tablet1 = {A, B, C}
// tablet2 = {A, B, D}
// viable target for {tablet1, B} is D.
// viable target for {tablet2, B} is C.
//
// Decomission should succeed by migrating away even co-located replicas of sibling tablets that don't share viable targets.
// That should produce:
// tablet1 = {A, D, C}
// tablet2 = {A, C, D}
auto decision = tmap.resize_decision();
decision.way = locator::resize_decision::merge{};
tmap.set_resize_decision(std::move(decision));
tm.update_topology(b, rack, node::state::being_decommissioned, shard_count);
tmap.set_tablet(tablet_id(0), tablet_info {
tablet_replica_set {
tablet_replica {a, shard_id(0)},
tablet_replica {b, shard_id(0)},
tablet_replica {c, shard_id(0)},
}
});
tmap.set_tablet(tablet_id(1), tablet_info {
tablet_replica_set {
tablet_replica {a, shard_id(0)},
tablet_replica {b, shard_id(0)},
tablet_replica {d, shard_id(0)},
}
});
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
do_with_cql_env_thread([] (auto& e) {
inet_address ip1("192.168.0.1");
@@ -2614,7 +2844,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
auto table1 = add_table(e).get();
unsigned shard_count = 2;
@@ -2627,11 +2857,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
}
});
stm.mutate_token_metadata([&] (token_metadata& tm) {
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count);
tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1);
co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2);
tablet_map tmap(2);
for (auto tid : tmap.tablet_ids()) {
@@ -2645,7 +2877,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
auto tablet_count = [&] {
@@ -2668,19 +2899,6 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
const auto initial_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
// there are 2 tablets, each with avg size hitting merge threshold, so merge request is emitted
{
locator::load_stats load_stats = {
.tables = {
{ table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }},
}
};
do_rebalance_tablets(std::move(load_stats));
BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets);
BOOST_REQUIRE(std::holds_alternative<locator::resize_decision::merge>(resize_decision().way));
}
// avg size moved above target size, so merge is cancelled
{
locator::load_stats load_stats = {
@@ -2722,6 +2940,19 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets * 2);
BOOST_REQUIRE(std::holds_alternative<locator::resize_decision::none>(resize_decision().way));
}
// Check that balancer detects table size dropped to 0 and reduces tablet count down to 1 through merges.
{
locator::load_stats load_stats = {
.tables = {
{ table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }},
}
};
do_rebalance_tablets(std::move(load_stats));
BOOST_REQUIRE_EQUAL(tablet_count(), 1);
}
}).get();
}
@@ -3330,3 +3561,10 @@ SEASTAR_TEST_CASE(test_explicit_tablets_disable) {
co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0);
co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128);
}
SEASTAR_TEST_CASE(test_recognition_of_deprecated_name_for_resize_transition) {
using transition_state = service::topology::transition_state;
BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet split finalization"), transition_state::tablet_resize_finalization);
BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet resize finalization"), transition_state::tablet_resize_finalization);
return make_ready_future<>();
}

View File

@@ -37,6 +37,10 @@ async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo], ranges: str = ''):
node = server.ip_addr
await manager.servers_see_each_other(servers)

View File

@@ -0,0 +1,303 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
from test.topology.conftest import skip_mode
import pytest
import asyncio
import logging
import time
import random
logger = logging.getLogger(__name__)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str):
host = manager.cql.cluster.metadata.get_host(server.ip_addr)
# read_barrier is needed to ensure that local tablet metadata on the queried node
# reflects the finalized tablet movement.
await read_barrier(manager.api, server.ip_addr)
table_id = await manager.get_table_id(keyspace_name, table_name)
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where "
f"table_id = {table_id}", host=host)
return rows[0].tablet_count
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_merge_simple(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--logger-log-level', 'load_balancer=debug',
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
# Initial average table size of 400k (1 tablet), so triggers some splits.
total_keys = 200
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, "test")
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count == 1
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
# Increases the chance of tablet migration concurrent with split
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
time.sleep(2) # Give load balancer some time to do work
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await check()
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count > 1
# Allow shuffling of tablet replicas to make co-location work harder
async def shuffle():
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
time.sleep(2)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await shuffle()
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
# Shrinks table significantly, forcing merge.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
# To avoid race of major with migration
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.keyspace_compaction(server.ip_addr, "test")
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await shuffle()
old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count < old_tablet_count
await check()
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.keyspace_compaction(server.ip_addr, "test")
await check()
# Multiple cycles of split and merge, with topology changes in parallel and RF > 1.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=info',
'--logger-log-level', 'table=info',
'--logger-log-level', 'raft_topology=info',
'--logger-log-level', 'group0_raft_sm=info',
'--logger-log-level', 'load_balancer=info',
'--target-tablet-size-in-bytes', '30000',
]
config = {
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}
servers = [await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline)]
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async def perform_topology_ops():
logger.info("Topology ops in background")
server_id_to_decommission = servers[-1].server_id
logger.info("Decommissioning old server with id {}".format(server_id_to_decommission))
await manager.decommission_node(server_id_to_decommission)
servers.pop()
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
logger.info("Completed topology ops")
for cycle in range(2):
logger.info("Running split-merge cycle #{}".format(cycle))
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Inserting data")
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
total_keys = 200
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO test.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async("SELECT * FROM test.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
logger.info("Flushing keyspace")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
# Increases the chance of tablet migration concurrent with split
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
logger.info("Enabling balancing")
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
topology_ops_task = asyncio.create_task(perform_topology_ops())
await check()
logger.info("Waiting for split")
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
await check()
old_tablet_count = tablet_count
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count > old_tablet_count
logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
# Allow shuffling of tablet replicas to make co-location work harder
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
logger.info("Deleting data")
# Delete almost all keys, enough to trigger a few merges.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
# To avoid race of major with migration
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.keyspace_compaction(server.ip_addr, "test")
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waiting for merge decision")
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
old_tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
topology_ops_task = asyncio.create_task(perform_topology_ops())
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count < old_tablet_count
logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
await check()
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.keyspace_compaction(server.ip_addr, "test")
await check()