Files
scylladb/replica/compaction_group.hh
Raphael S. Carvalho 474e962e01 compaction: Restrict tombstone GC sstable set to repaired sstables for tombstone_gc=repair mode
When tombstone_gc=repair, the repaired compaction view's sstable_set_for_tombstone_gc()
previously returned all sstables across all three views (unrepaired, repairing, repaired).
This is correct but unnecessarily expensive: the unrepaired and repairing sets are never
the source of a GC-blocking shadow when tombstone_gc=repair, for base tables.

The key ordering guarantee that makes this safe is:
- topology_coordinator sends send_tablet_repair RPC and waits for it to complete.
  Inside that RPC, mark_sstable_as_repaired() runs on all replicas, moving D from
  repairing → repaired (repaired_at stamped on disk).
- Only after the RPC returns does the coordinator commit repair_time + sstables_repaired_at
  to Raft.
- gc_before = repair_time - propagation_delay only advances once that Raft commit applies.

Therefore, when a tombstone T in the repaired set first becomes GC-eligible (its
deletion_time < gc_before), any data D it shadows is already in the repaired set on
every replica. This holds because:
- The memtable is flushed before the repairing snapshot is taken (take_storage_snapshot
  calls sg->flush()), capturing all data present at repair time.
- Hints and batchlog are flushed before the snapshot, ensuring remotely-hinted writes
  arrive before the snapshot boundary.
- Legitimate unrepaired data has timestamps close to 'now', always newer than any
  GC-eligible tombstone (USING TIMESTAMP to write backdated data is user error / UB).

Excluding the repairing and unrepaired sets from the GC shadow check cannot cause any
tombstone to be wrongly collected. The memtable check is also skipped for the same
reason: memtable data is either newer than the GC-eligible tombstone, or was flushed
into the repairing/repaired set before gc_before advanced.

Safety restriction — materialized views:
The optimization IS applied to materialized view tables. Two possible paths could inject
D_view into the MV's unrepaired set after MV repair: view hints and staging via the
view-update-generator. Both are safe:

(1) View hints: flush_hints() creates a sync point covering BOTH _hints_manager (base
mutations) AND _hints_for_views_manager (view mutations). It waits until ALL pending view
hints — including D_view entries queued in _hints_for_views_manager while the target MV
replica was down — have been replayed to the target node before take_storage_snapshot() is
called. D_view therefore lands in the MV's repairing sstable and is promoted to repaired.
When a repaired compaction then checks for shadows it finds D_view in the repaired set,
keeping T_mv non-purgeable.

(2) View-update-generator staging path: Base table repair can write a missing D_base to a
replica via a staging sstable. The view-update-generator processes the staging sstable
ASYNCHRONOUSLY: it may fire arbitrarily later, even after MV repair has committed
repair_time and T_mv has been GC'd from the repaired set. However, the staging processor
calls stream_view_replica_updates() which performs a READ-BEFORE-WRITE via
as_mutation_source_excluding_staging(): it reads the CURRENT base table state before
building the view update. If T_base was written to the base table (as it always is before
the base replica can be repaired and the MV tombstone can become GC-eligible), the
view_update_builder sees T_base as the existing partition tombstone. D_base's row marker
(ts_d < ts_t) is expired by T_base, so the view update is a no-op: D_view is never
dispatched to the MV replica. No resurrection can occur regardless of how long staging is
delayed.

A potential sub-edge-case is T_base being purged BEFORE staging fires (leaving D_base as
the sole survivor, so stream_view_replica_updates would dispatch D_view). This is blocked
by an additional invariant: for tablet-based tables, the repair writer stamps repaired_at
on staging sstables (repair_writer_impl::create_writer sets mark_as_repaired = true and
perform_component_rewrite writes repaired_at = sstables_repaired_at + 1 on every staging
sstable). After base repair commits sstables_repaired_at to Raft, the staging sstable
satisfies is_repaired(sstables_repaired_at, staging_sst) and therefore appears in
make_repaired_sstable_set(). Any subsequent base repair that advances sstables_repaired_at
further still includes the staging sstable (its repaired_at ≤ new sstables_repaired_at).
D_base in the staging sstable thus shadows T_base in every repaired compaction's shadow
check, keeping T_base non-purgeable as long as D_base remains in staging.

A base table hint also cannot bypass this. A base hint is replayed as a base mutation. The
resulting view update is generated synchronously on the base replica and sent to the MV
replica via _hints_for_views_manager (path 1 above), not via staging.

USING TIMESTAMP with timestamps predating (gc_before + propagation_delay) is explicitly
UB and excluded from the safety argument.

For tombstone_gc modes other than repair (timeout, immediate, disabled) the invariant
does not hold for base tables either, so the full storage-group set is returned.

Implementation:
- Add compaction_group::is_repaired_view(v): pointer comparison against _repaired_view.
- Add compaction_group::make_repaired_sstable_set(): iterates _main_sstables and inserts
  only sstables classified as repaired (repair::is_repaired(sstables_repaired_at, sst)).
- Add storage_group::make_repaired_sstable_set(): collects repaired sstables across all
  compaction groups in the storage group.
- Add table::make_repaired_sstable_set_for_tombstone_gc(): collects repaired sstables from
  all compaction groups across all storage groups (needed for multi-tablet tables).
- Add compaction_group_view::skip_memtable_for_tombstone_gc(): returns true iff the
  repaired-only optimization is active; used by get_max_purgeable_timestamp() in
  compaction.cc to bypass the memtable shadow check.
- is_tombstone_gc_repaired_only() private helper gates both methods: requires
  is_repaired_view(this) && tombstone_gc_mode == repair. No is_view() exclusion.
- Add error injection "view_update_generator_pause_before_processing" in
  process_staging_sstables() to support testing the staging-delay scenario.
- New test test_tombstone_gc_mv_optimization_safe_via_hints: stops servers[2], writes
  D_base + T_base (view hints queued for servers[2]'s MV replica), restarts, runs MV
  tablet repair (flush_hints delivers D_view + T_mv before snapshot), triggers repaired
  compaction, and asserts the MV row is NOT visible — T_mv preserved because D_view
  landed in the repaired set via the hints-before-snapshot path.
- New test test_tombstone_gc_mv_safe_staging_processor_delay: runs base repair before
  writing T_base so D_base is staged on servers[0] via row-sync; blocks the
  view-update-generator with an error injection; writes T_base + T_mv; runs MV repair
  (fast path, T_mv GC-eligible); triggers repaired compaction (T_mv purged — no D_view
  in repaired set); asserts no resurrection; releases injection; waits for staging to
  complete; asserts no resurrection after a second flush+compaction. Demonstrates that
  the read-before-write in stream_view_replica_updates() makes the optimization safe even
  when staging fires after T_mv has been GC'd.

The expected gain is reduced bloom filter and memtable key-lookup I/O during repaired
compactions: the unrepaired set is typically the largest (it holds all recent writes),
yet for tombstone_gc=repair it never influences GC decisions.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-20 16:59:09 -03:00

529 lines
24 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <seastar/core/condition-variable.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include "database_fwd.hh"
#include "compaction/compaction_descriptor.hh"
#include "compaction/compaction_backlog_manager.hh"
#include "compaction/compaction_strategy_state.hh"
// FIXME: un-nest compaction_reenabler, so we can forward declare it and remove this include.
#include "compaction/compaction_manager.hh"
#include "locator/tablets.hh"
#include "replica/logstor/compaction.hh"
#include "replica/logstor/segment_manager.hh"
#include "sstables/sstable_set.hh"
#include "utils/chunked_vector.hh"
#include <absl/container/flat_hash_map.h>
#pragma once
namespace compaction {
class compaction_manager;
}
namespace locator {
class effective_replication_map;
}
namespace replica {
namespace logstor {
class primary_index;
}
using enable_backlog_tracker = bool_class<class enable_backlog_tracker_tag>;
enum class repair_sstable_classification {
unrepaired,
repairing,
repaired,
};
using repair_classifier_func = std::function<repair_sstable_classification(const sstables::shared_sstable&, int64_t sstables_repaired_at)>;
// Compaction group is a set of SSTables which are eligible to be compacted together.
// By this definition, we can say:
// - A group contains SSTables that are owned by the same shard.
// - Also, a group will be owned by a single table. Different tables own different groups.
// - Each group can be thought of an isolated LSM tree, where Memtable(s) and SSTable(s) are
// isolated from other groups.
class compaction_group {
table& _t;
// The compaction group views are the logical compaction groups, each having its own logical
// set of sstables. Even though they share the same instance of sstable_set, compaction will
// only see the sstables belonging to a particular view, with the help of the classifier.
// This way, we guarantee that sstables falling under different groups cannot be compacted
// together.
class compaction_group_view;
// This is held throughout group lifetime, in order to have compaction disabled on non-compacting views.
std::vector<compaction::compaction_reenabler> _compaction_disabler_for_views;
// Logical compaction group representing the unrepaired sstables.
std::unique_ptr<compaction_group_view> _unrepaired_view;
// Logical compaction group representing the repairing sstables. Compaction disabled altogether on it.
std::unique_ptr<compaction_group_view> _repairing_view;
// Logical compaction group representing the repaired sstables.
std::unique_ptr<compaction_group_view> _repaired_view;
size_t _group_id;
// Tokens included in this compaction_groups
dht::token_range _token_range;
compaction::compaction_strategy_state _compaction_strategy_state;
// Holds list of memtables for this group
lw_shared_ptr<memtable_list> _memtables;
// SSTable set which contains all non-maintenance sstables
lw_shared_ptr<sstables::sstable_set> _main_sstables;
// Holds SSTables created by maintenance operations, which need reshaping before integration into the main set
lw_shared_ptr<sstables::sstable_set> _maintenance_sstables;
// sstables that have been compacted (so don't look up in query) but
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
seastar::condition_variable _staging_done_condition;
// Gates async operations confined to a single group.
seastar::named_gate _async_gate;
// Gates flushes.
seastar::named_gate _flush_gate;
// Gates sstable being added to the group.
// This prevents the group from being considered empty when sstables are being added.
// Crucial for tablet split which ACKs split for a table when all pre-split groups are empty.
seastar::named_gate _sstable_add_gate;
bool _tombstone_gc_enabled = true;
std::optional<compaction::compaction_backlog_tracker> _backlog_tracker;
repair_classifier_func _repair_sstable_classifier;
counter_id _counter_id;
lw_shared_ptr<logstor::segment_set> _logstor_segments;
std::optional<logstor::separator_buffer> _logstor_separator;
std::vector<future<>> _separator_flushes;
seastar::semaphore _separator_flush_sem{1};
private:
std::unique_ptr<compaction_group_view> make_compacting_view();
std::unique_ptr<compaction_group_view> make_non_compacting_view();
// Adds new sstable to the set of sstables
// Doesn't update the cache. The cache must be synchronized in order for reads to see
// the writes contained in this sstable.
// Cache must be synchronized atomically with this, otherwise write atomicity may not be respected.
// Doesn't trigger compaction.
// Strong exception guarantees.
lw_shared_ptr<sstables::sstable_set>
do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
enable_backlog_tracker backlog_tracker);
// Update compaction backlog tracker with the same changes applied to the underlying sstable set.
void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
// Input SSTables that weren't added to any SSTable set, are considered unused and can be unlinked.
// An input SSTable remains linked if it wasn't actually compacted, yet compaction manager wants
// it to be moved from its original sstable set (e.g. maintenance) into a new one (e.g. main).
std::vector<sstables::shared_sstable> unused_sstables_for_deletion(compaction::compaction_completion_desc desc) const;
// Tracks the maximum timestamp observed across all SSTables in this group.
// This is used by the compacting reader to determine if a memtable contains entries
// with timestamps that overlap with those in the SSTables of the compaction group.
// For this purpose, tracking the maximum seen timestamp is sufficient rather than the
// actual maximum across all SSTables. So, the variable is updated only when a new SSTable
// is added to the group. While `set_main_sstables` and `set_maintenance_sstables` can
// replace entire sstable sets, they are still called only by compaction, so the maximum
// seen timestamp remains the same and there is no need to update the variable in those cases.
api::timestamp_type _max_seen_timestamp = api::missing_timestamp;
public:
compaction_group(table& t, size_t gid, dht::token_range token_range, repair_classifier_func repair_classifier);
~compaction_group();
// Create a group with same metadata of base like range, id, but with empty data (sstable & memtable).
static lw_shared_ptr<compaction_group> make_empty_group(const compaction_group& base);
void update_id(size_t id) {
_group_id = id;
}
void update_id_and_range(size_t id, dht::token_range token_range) {
_group_id = id;
_token_range = std::move(token_range);
}
size_t group_id() const noexcept {
return _group_id;
}
const schema_ptr& schema() const;
// Stops all activity in the group, synchronizes with in-flight writes, before
// flushing memtable(s), so all data can be found in the SSTable set.
future<> stop(sstring reason) noexcept;
bool stopped() const noexcept;
bool empty() const noexcept;
// This removes all the storage belonging to the group. In order to avoid data
// resurrection, makes sure that all data is flushed into SSTables before
// proceeding with atomic deletion on them.
future<> cleanup();
// Clear sstable sets
void clear_sstables();
// Clear memtable(s) content
future<> clear_memtables();
future<> flush() noexcept;
bool can_flush() const;
bool needs_flush() const;
const dht::token_range& token_range() const noexcept {
return _token_range;
}
void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
_tombstone_gc_enabled = tombstone_gc_enabled;
}
bool tombstone_gc_enabled() const noexcept {
return _tombstone_gc_enabled;
}
int64_t get_sstables_repaired_at() const noexcept;
future<> update_repaired_at_for_merge();
void set_counter_id(counter_id cid) noexcept {
_counter_id = cid;
}
counter_id get_counter_id() const noexcept {
return _counter_id;
}
void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept;
lw_shared_ptr<memtable_list>& memtables() noexcept;
size_t memtable_count() const noexcept;
// Returns minimum timestamp from memtable list
api::timestamp_type min_memtable_timestamp() const;
// Returns maximum timestamp from memtable list
api::timestamp_type max_memtable_timestamp() const;
// Returns minimum timestamp of live data from memtable list
api::timestamp_type min_memtable_live_timestamp() const;
// Returns minimum timestamp of live row markers from memtable list
api::timestamp_type min_memtable_live_row_marker_timestamp() const;
// Returns true if memtable(s) contains key.
bool memtable_has_key(const dht::decorated_key& key) const;
// Add sstable to main set
void add_sstable(sstables::shared_sstable sstable);
// Add sstable to maintenance set
void add_maintenance_sstable(sstables::shared_sstable sst);
api::timestamp_type max_seen_timestamp() const { return _max_seen_timestamp; }
// Update main and/or maintenance sstable sets based in info in completion descriptor,
// where input sstables will be replaced by output ones, row cache ranges are possibly
// invalidated and statistics are updated.
future<> update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc);
// Merges all sstables from another group into this one.
future<> merge_sstables_from(compaction_group& group);
// Merges all logstor segments from another group into this one.
future<> merge_logstor_segments_from(compaction_group& group);
const lw_shared_ptr<sstables::sstable_set>& main_sstables() const noexcept;
sstables::sstable_set make_main_sstable_set() const;
void set_main_sstables(lw_shared_ptr<sstables::sstable_set> new_main_sstables);
const lw_shared_ptr<sstables::sstable_set>& maintenance_sstables() const noexcept;
lw_shared_ptr<sstables::sstable_set> make_maintenance_sstable_set() const;
void set_maintenance_sstables(lw_shared_ptr<sstables::sstable_set> new_maintenance_sstables);
// Makes a sstable set, which includes all sstables managed by this group
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const;
std::vector<sstables::shared_sstable> all_sstables() const;
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept;
// Triggers regular compaction.
void trigger_compaction();
void trigger_logstor_compaction();
bool compaction_disabled() const;
future<unsigned> estimate_pending_compactions() const;
compaction::compaction_backlog_tracker& get_backlog_tracker();
void register_backlog_tracker(compaction::compaction_backlog_tracker new_backlog_tracker);
size_t live_sstable_count() const noexcept;
uint64_t live_disk_space_used() const noexcept;
size_t logstor_disk_space_used() const noexcept;
sstables::file_size_stats live_disk_space_used_full_stats() const noexcept;
uint64_t total_disk_space_used() const noexcept;
sstables::file_size_stats total_disk_space_used_full_stats() const noexcept;
// With static sharding, i.e. vnodes, there will be only one active view.
compaction::compaction_group_view& as_view_for_static_sharding() const;
// Default view to be used on newly created sstables, e.g. those produced by repair or memtable.
compaction::compaction_group_view& view_for_unrepaired_data() const;
// Gets the view a sstable currently belongs to.
compaction::compaction_group_view& view_for_sstable(const sstables::shared_sstable& sst) const;
utils::small_vector<compaction::compaction_group_view*, 3> all_views() const;
// Returns true iff v is the repaired view of this compaction group.
bool is_repaired_view(const compaction::compaction_group_view* v) const noexcept;
// Returns an sstable set containing only repaired sstables (those classified as repaired).
lw_shared_ptr<sstables::sstable_set> make_repaired_sstable_set() const;
seastar::condition_variable& get_staging_done_condition() noexcept {
return _staging_done_condition;
}
seastar::named_gate& async_gate() noexcept {
return _async_gate;
}
seastar::named_gate& flush_gate() noexcept {
return _flush_gate;
}
seastar::named_gate& sstable_add_gate() noexcept {
return _sstable_add_gate;
}
compaction::compaction_manager& get_compaction_manager() noexcept;
const compaction::compaction_manager& get_compaction_manager() const noexcept;
logstor::segment_manager& get_logstor_segment_manager() noexcept;
const logstor::segment_manager& get_logstor_segment_manager() const noexcept;
logstor::compaction_manager& get_logstor_compaction_manager() noexcept;
const logstor::compaction_manager& get_logstor_compaction_manager() const noexcept;
logstor::primary_index& get_logstor_index() noexcept;
future<> split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info);
void set_repair_sstable_classifier(repair_classifier_func repair_sstable_classifier) {
_repair_sstable_classifier = std::move(repair_sstable_classifier);
}
void add_logstor_segment(logstor::segment_descriptor& desc) {
_logstor_segments->add_segment(desc);
}
future<> discard_logstor_segments();
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
logstor::separator_buffer& get_separator_buffer(size_t write_size);
logstor::segment_set& logstor_segments() noexcept {
return *_logstor_segments;
}
const logstor::segment_set& logstor_segments() const noexcept {
return *_logstor_segments;
}
future<utils::chunked_vector<logstor::segment_snapshot>> take_logstor_snapshot();
friend class storage_group;
};
using compaction_group_ptr = lw_shared_ptr<compaction_group>;
using const_compaction_group_ptr = lw_shared_ptr<const compaction_group>;
// Storage group is responsible for storage that belongs to a single tablet.
// A storage group can manage 1 or more compaction groups, each of which can be compacted independently.
// If a tablet needs splitting, the storage group can be put in splitting mode, allowing the storage
// in main compaction groups to be split into two new compaction groups, all of which will be managed
// by the same storage group.
//
// With vnodes, a table instance in a given shard will have a single group. With tablets, a table in a
// shard will have as many groups as there are tablet replicas owned by that shard.
class storage_group {
compaction_group_ptr _main_cg;
// Holds compaction groups that now belongs to same tablet after merge. Compaction groups here will
// eventually have all their data moved into main group.
std::vector<compaction_group_ptr> _merging_groups;
std::vector<compaction_group_ptr> _split_ready_groups;
seastar::named_gate _async_gate;
private:
bool splitting_mode() const {
return !_split_ready_groups.empty();
}
size_t to_idx(locator::tablet_range_side) const;
public:
storage_group(compaction_group_ptr cg);
seastar::named_gate& async_gate() {
return _async_gate;
}
// Closes storage group without stopping its compaction groups that might be referenced elsewhere.
future<> close() noexcept {
return _async_gate.close();
}
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
// Selects the compaction group for the given token. Computes the range side
// from the token only when in splitting mode. This avoids the cost of computing
// range side on the hot path when it's not needed.
compaction_group_ptr& select_compaction_group(dht::token, const locator::tablet_map&) noexcept;
// Selects the compaction group for an sstable spanning a token range.
// If the first and last tokens fall on different sides of the split point,
// the sstable belongs to the main compaction group.
compaction_group_ptr& select_compaction_group(dht::token first, dht::token last, const locator::tablet_map&) noexcept;
uint64_t live_disk_space_used() const;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const;
utils::small_vector<compaction_group_ptr, 3> compaction_groups_immediate();
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups_immediate() const;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
void add_merging_group(compaction_group_ptr);
const std::vector<compaction_group_ptr>& merging_groups() const;
future<> remove_empty_merging_groups();
// Puts the storage group in split mode, in which it internally segregates data
// into two sstable sets and two memtable sets corresponding to the two adjacent
// tablets post-split.
// Preexisting sstables and memtables are not split yet.
// Returns true if post-conditions for split() are met.
bool set_split_mode();
// Like set_split_mode() but triggers splitting for old sstables and memtables and waits
// for it:
// 1) Flushes all memtables which were created in non-split mode, and waits for that to complete.
// 2) Compacts all sstables which overlap with the split point
// Returns a future which resolves when this process is complete.
future<> split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info);
// Make an sstable set spanning all sstables in the storage_group
lw_shared_ptr<const sstables::sstable_set> make_sstable_set() const;
// Like make_sstable_set(), but restricted to repaired sstables only across all compaction groups.
lw_shared_ptr<const sstables::sstable_set> make_repaired_sstable_set() const;
future<utils::chunked_vector<logstor::segment_snapshot>> take_logstor_snapshot() const;
// Flush all memtables.
future<> flush() noexcept;
future<> flush_separator() noexcept;
bool can_flush() const;
bool needs_flush() const;
api::timestamp_type min_memtable_timestamp() const;
api::timestamp_type min_memtable_live_timestamp() const;
api::timestamp_type min_memtable_live_row_marker_timestamp() const;
bool compaction_disabled() const;
// Returns true when all compacted sstables were already deleted.
bool no_compacted_sstable_undeleted() const;
future<> stop(sstring reason = "table removal") noexcept;
// Clear sstable sets
void clear_sstables();
};
using storage_group_ptr = lw_shared_ptr<storage_group>;
using storage_group_map = absl::flat_hash_map<size_t, storage_group_ptr, absl::Hash<size_t>>;
class storage_group_manager {
protected:
storage_group_map _storage_groups;
protected:
virtual future<> stop() = 0;
public:
virtual ~storage_group_manager();
// How concurrent loop and updates on the group map works without a lock:
//
// Firstly, all yielding loops will work on a copy of map, to prevent a
// concurrent update to the map from interfering with it.
//
// scenario 1:
// T
// 1 loop on the map
// 2 storage group X is stopped on cleanup
// 3 loop reaches X
//
// Here, X is stopped before it is reached. This is handled by teaching
// iteration to skip groups that were stopped by cleanup (implemented
// using gate).
// X survives its removal from the map since it is a lw_shared_ptr.
//
//
// scenario 2:
// T
// 1 loop on the map
// 2 loop reaches X
// 3 storage group X is stopped on cleanup
//
// Here, X is stopped while being used, but that also happens during shutdown.
// When X is stopped, flush happens and compactions are all stopped (exception
// is not propagated upwards) and new ones cannot start afterward.
//
//
// scenario 3:
// T
// 1 loop on the map
// 2 storage groups are split
// 3 loop reaches old groups
//
// Here, the loop continues post storage group split, which rebuilds the old
// map into a new one. This is handled by allowing the old map to still access
// the compaction groups that were reassigned according to the new tablet count.
// We don't move the compaction groups, but rather they're still visible by old
// and new storage groups.
// Important to not return storage_group_id in yielding variants, since ids can be
// invalidated when storage group count changes (e.g. split or merge).
future<> parallel_foreach_storage_group(std::function<future<>(storage_group&)> f);
future<> for_each_storage_group_gently(std::function<future<>(storage_group&)> f);
void for_each_storage_group(std::function<void(size_t, storage_group&)> f) const;
const storage_group_map& storage_groups() const;
future<> stop_storage_groups() noexcept;
void clear_storage_groups();
void remove_storage_group(size_t id);
storage_group& storage_group_for_id(const schema_ptr&, size_t i) const;
storage_group* maybe_storage_group_for_id(const schema_ptr&, size_t i) const;
// Caller must keep the current effective_replication_map_ptr valid
// until the storage_group_manager finishes update_effective_replication_map
//
// refresh_mutation_source must be called when there are changes to data source
// structures but logical state of data is not changed (e.g. when state for a
// new tablet replica is allocated).
virtual void update_effective_replication_map(const locator::effective_replication_map_ptr& old_erm,
const locator::effective_replication_map& erm,
noncopyable_function<void()> refresh_mutation_source) = 0;
virtual compaction_group& compaction_group_for_token(dht::token token) const = 0;
virtual compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const = 0;
virtual compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const = 0;
virtual compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const = 0;
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::combined_load_stats table_load_stats() const = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
virtual future<> wait_for_background_tablet_resize_work() = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
};
}