table: coroutinize update_effective_replication_map
It's better to wait on deregistering the old main compaction_groups:s in handle_tablet_split_completion rather than leaving work in the background. Especially since their respective storage_groups are being destroyed by handle_tablet_split_completion. handle_tablet_split_completion keeps a continuation chain for all non-ready compaction_group stop fibers. and returns it so that update_effective_replication_map can await it, leaving no cleanup work in the background. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -593,7 +593,7 @@ private:
|
||||
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
|
||||
// each tablet split into two, so this replica will remap all of its compaction groups
|
||||
// that were previously split.
|
||||
void handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap);
|
||||
future<> handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap);
|
||||
|
||||
sstables::compaction_type_options::split split_compaction_options() const noexcept;
|
||||
|
||||
@@ -846,7 +846,7 @@ public:
|
||||
void set_schema(schema_ptr);
|
||||
db::commitlog* commitlog() const;
|
||||
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
|
||||
void update_effective_replication_map(locator::effective_replication_map_ptr);
|
||||
future<> update_effective_replication_map(locator::effective_replication_map_ptr);
|
||||
[[gnu::always_inline]] bool uses_tablets() const;
|
||||
future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id);
|
||||
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
|
||||
|
||||
@@ -1903,7 +1903,7 @@ locator::table_load_stats table::table_load_stats(std::function<bool(locator::gl
|
||||
return stats;
|
||||
}
|
||||
|
||||
void table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) {
|
||||
future<> table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) {
|
||||
auto table_id = _schema->id();
|
||||
storage_group_vector new_storage_groups;
|
||||
new_storage_groups.resize(new_tmap.tablet_count());
|
||||
@@ -1924,6 +1924,8 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato
|
||||
table_id, new_tmap.tablet_count(), old_tablet_count*split_size));
|
||||
}
|
||||
|
||||
// Stop the released main compaction groups asynchronously
|
||||
future<> stop_fut = make_ready_future<>();
|
||||
for (unsigned id = 0; id < _storage_groups.size(); id++) {
|
||||
auto& sg = _storage_groups[id];
|
||||
if (!sg) {
|
||||
@@ -1934,6 +1936,16 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato
|
||||
"therefore groups cannot be remapped with the new tablet count.",
|
||||
id, table_id));
|
||||
}
|
||||
// Remove old main groups, they're unused, but they need to be deregistered properly
|
||||
auto cg_ptr = std::move(sg->main_compaction_group());
|
||||
auto f = cg_ptr->stop();
|
||||
if (!f.available() || f.failed()) [[unlikely]] {
|
||||
stop_fut = stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable {
|
||||
return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) {
|
||||
tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex));
|
||||
});
|
||||
});
|
||||
}
|
||||
unsigned first_new_id = id << growth_factor;
|
||||
auto split_ready_groups = std::move(*sg).split_ready_compaction_groups();
|
||||
if (split_ready_groups.size() != split_size) {
|
||||
@@ -1951,15 +1963,10 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato
|
||||
|
||||
auto old_groups = std::exchange(_storage_groups, std::move(new_storage_groups));
|
||||
|
||||
// Remove old main groups in background, they're unused, but they need to be deregistered properly
|
||||
(void) do_with(std::move(old_groups), _async_gate.hold(), [] (storage_group_vector& groups, gate::holder&) {
|
||||
return do_for_each(groups, [] (std::unique_ptr<storage_group>& sg) {
|
||||
return sg->main_compaction_group()->stop();
|
||||
});
|
||||
});
|
||||
return stop_fut;
|
||||
}
|
||||
|
||||
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
||||
future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
||||
auto old_erm = std::exchange(_erm, std::move(erm));
|
||||
|
||||
if (uses_tablets()) {
|
||||
@@ -1974,7 +1981,7 @@ void table::update_effective_replication_map(locator::effective_replication_map_
|
||||
if (new_tablet_count > old_tablet_count) {
|
||||
tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets",
|
||||
_schema->ks_name(), _schema->cf_name(), old_tablet_count, new_tablet_count);
|
||||
handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id));
|
||||
co_await handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id));
|
||||
}
|
||||
}
|
||||
if (old_erm) {
|
||||
|
||||
@@ -2864,7 +2864,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
|
||||
// Apply changes on all shards
|
||||
try {
|
||||
co_await container().invoke_on_all([&] (storage_service& ss) {
|
||||
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
|
||||
ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
|
||||
auto& db = ss._db.local();
|
||||
|
||||
@@ -2878,7 +2878,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
auto& table_erms = pending_table_erms[this_shard_id()];
|
||||
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
|
||||
auto& cf = db.find_column_family(it->first);
|
||||
cf.update_effective_replication_map(std::move(it->second));
|
||||
co_await cf.update_effective_replication_map(std::move(it->second));
|
||||
if (cf.uses_tablets()) {
|
||||
register_tablet_split_candidate(it->first);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user