/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include "database_fwd.hh" #include "compaction/compaction_descriptor.hh" #include "compaction/compaction_backlog_manager.hh" #include "compaction/compaction_strategy_state.hh" // FIXME: un-nest compaction_reenabler, so we can forward declare it and remove this include. #include "compaction/compaction_manager.hh" #include "locator/tablets.hh" #include "sstables/sstable_set.hh" #include "utils/chunked_vector.hh" #include #pragma once namespace compaction { class compaction_manager; } namespace locator { class effective_replication_map; } namespace replica { using enable_backlog_tracker = bool_class; enum class repair_sstable_classification { unrepaired, repairing, repaired, }; using repair_classifier_func = std::function; // Compaction group is a set of SSTables which are eligible to be compacted together. // By this definition, we can say: // - A group contains SSTables that are owned by the same shard. // - Also, a group will be owned by a single table. Different tables own different groups. // - Each group can be thought of an isolated LSM tree, where Memtable(s) and SSTable(s) are // isolated from other groups. class compaction_group { table& _t; // The compaction group views are the logical compaction groups, each having its own logical // set of sstables. Even though they share the same instance of sstable_set, compaction will // only see the sstables belonging to a particular view, with the help of the classifier. // This way, we guarantee that sstables falling under different groups cannot be compacted // together. class compaction_group_view; // This is held throughout group lifetime, in order to have compaction disabled on non-compacting views. std::vector _compaction_disabler_for_views; // Logical compaction group representing the unrepaired sstables. std::unique_ptr _unrepaired_view; // Logical compaction group representing the repairing sstables. Compaction disabled altogether on it. std::unique_ptr _repairing_view; // Logical compaction group representing the repaired sstables. std::unique_ptr _repaired_view; size_t _group_id; // Tokens included in this compaction_groups dht::token_range _token_range; compaction::compaction_strategy_state _compaction_strategy_state; // Holds list of memtables for this group lw_shared_ptr _memtables; // SSTable set which contains all non-maintenance sstables lw_shared_ptr _main_sstables; // Holds SSTables created by maintenance operations, which need reshaping before integration into the main set lw_shared_ptr _maintenance_sstables; // sstables that have been compacted (so don't look up in query) but // have not been deleted yet, so must not GC any tombstones in other sstables // that may delete data in these sstables: std::vector _sstables_compacted_but_not_deleted; seastar::condition_variable _staging_done_condition; // Gates async operations confined to a single group. seastar::named_gate _async_gate; // Gates flushes. seastar::named_gate _flush_gate; bool _tombstone_gc_enabled = true; std::optional _backlog_tracker; repair_classifier_func _repair_sstable_classifier; private: std::unique_ptr make_compacting_view(); std::unique_ptr make_non_compacting_view(); // Adds new sstable to the set of sstables // Doesn't update the cache. The cache must be synchronized in order for reads to see // the writes contained in this sstable. // Cache must be synchronized atomically with this, otherwise write atomicity may not be respected. // Doesn't trigger compaction. // Strong exception guarantees. lw_shared_ptr do_add_sstable(lw_shared_ptr sstables, sstables::shared_sstable sstable, enable_backlog_tracker backlog_tracker); // Update compaction backlog tracker with the same changes applied to the underlying sstable set. void backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables); // Input SSTables that weren't added to any SSTable set, are considered unused and can be unlinked. // An input SSTable remains linked if it wasn't actually compacted, yet compaction manager wants // it to be moved from its original sstable set (e.g. maintenance) into a new one (e.g. main). std::vector unused_sstables_for_deletion(compaction::compaction_completion_desc desc) const; // Tracks the maximum timestamp observed across all SSTables in this group. // This is used by the compacting reader to determine if a memtable contains entries // with timestamps that overlap with those in the SSTables of the compaction group. // For this purpose, tracking the maximum seen timestamp is sufficient rather than the // actual maximum across all SSTables. So, the variable is updated only when a new SSTable // is added to the group. While `set_main_sstables` and `set_maintenance_sstables` can // replace entire sstable sets, they are still called only by compaction, so the maximum // seen timestamp remains the same and there is no need to update the variable in those cases. api::timestamp_type _max_seen_timestamp = api::missing_timestamp; public: compaction_group(table& t, size_t gid, dht::token_range token_range, repair_classifier_func repair_classifier); ~compaction_group(); // Create a group with same metadata of base like range, id, but with empty data (sstable & memtable). static lw_shared_ptr make_empty_group(const compaction_group& base); void update_id(size_t id) { _group_id = id; } void update_id_and_range(size_t id, dht::token_range token_range) { _group_id = id; _token_range = std::move(token_range); } size_t group_id() const noexcept { return _group_id; } const schema_ptr& schema() const; // Stops all activity in the group, synchronizes with in-flight writes, before // flushing memtable(s), so all data can be found in the SSTable set. future<> stop(sstring reason) noexcept; bool stopped() const noexcept; bool empty() const noexcept; // This removes all the storage belonging to the group. In order to avoid data // resurrection, makes sure that all data is flushed into SSTables before // proceeding with atomic deletion on them. future<> cleanup(); // Clear sstable sets void clear_sstables(); // Clear memtable(s) content future<> clear_memtables(); future<> flush() noexcept; bool can_flush() const; const dht::token_range& token_range() const noexcept { return _token_range; } void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { _tombstone_gc_enabled = tombstone_gc_enabled; } bool tombstone_gc_enabled() const noexcept { return _tombstone_gc_enabled; } int64_t get_sstables_repaired_at() const noexcept; future<> update_repaired_at_for_merge(); void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept; lw_shared_ptr& memtables() noexcept; size_t memtable_count() const noexcept; // Returns minimum timestamp from memtable list api::timestamp_type min_memtable_timestamp() const; // Returns minimum timestamp of live data from memtable list api::timestamp_type min_memtable_live_timestamp() const; // Returns minimum timestamp of live row markers from memtable list api::timestamp_type min_memtable_live_row_marker_timestamp() const; // Returns true if memtable(s) contains key. bool memtable_has_key(const dht::decorated_key& key) const; // Add sstable to main set void add_sstable(sstables::shared_sstable sstable); // Add sstable to maintenance set void add_maintenance_sstable(sstables::shared_sstable sst); api::timestamp_type max_seen_timestamp() const { return _max_seen_timestamp; } // Update main and/or maintenance sstable sets based in info in completion descriptor, // where input sstables will be replaced by output ones, row cache ranges are possibly // invalidated and statistics are updated. future<> update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc); // Merges all sstables from another group into this one. future<> merge_sstables_from(compaction_group& group); const lw_shared_ptr& main_sstables() const noexcept; sstables::sstable_set make_main_sstable_set() const; void set_main_sstables(lw_shared_ptr new_main_sstables); const lw_shared_ptr& maintenance_sstables() const noexcept; lw_shared_ptr make_maintenance_sstable_set() const; void set_maintenance_sstables(lw_shared_ptr new_maintenance_sstables); // Makes a sstable set, which includes all sstables managed by this group lw_shared_ptr make_sstable_set() const; std::vector all_sstables() const; const std::vector& compacted_undeleted_sstables() const noexcept; // Triggers regular compaction. void trigger_compaction(); bool compaction_disabled() const; future estimate_pending_compactions() const; compaction::compaction_backlog_tracker& get_backlog_tracker(); void register_backlog_tracker(compaction::compaction_backlog_tracker new_backlog_tracker); size_t live_sstable_count() const noexcept; uint64_t live_disk_space_used() const noexcept; sstables::file_size_stats live_disk_space_used_full_stats() const noexcept; uint64_t total_disk_space_used() const noexcept; sstables::file_size_stats total_disk_space_used_full_stats() const noexcept; // With static sharding, i.e. vnodes, there will be only one active view. compaction::compaction_group_view& as_view_for_static_sharding() const; // Default view to be used on newly created sstables, e.g. those produced by repair or memtable. compaction::compaction_group_view& view_for_unrepaired_data() const; // Gets the view a sstable currently belongs to. compaction::compaction_group_view& view_for_sstable(const sstables::shared_sstable& sst) const; utils::small_vector all_views() const; seastar::condition_variable& get_staging_done_condition() noexcept { return _staging_done_condition; } seastar::named_gate& async_gate() noexcept { return _async_gate; } seastar::named_gate& flush_gate() noexcept { return _flush_gate; } compaction::compaction_manager& get_compaction_manager() noexcept; const compaction::compaction_manager& get_compaction_manager() const noexcept; future<> split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info); void set_repair_sstable_classifier(repair_classifier_func repair_sstable_classifier) { _repair_sstable_classifier = std::move(repair_sstable_classifier); } friend class storage_group; }; using compaction_group_ptr = lw_shared_ptr; using const_compaction_group_ptr = lw_shared_ptr; // Storage group is responsible for storage that belongs to a single tablet. // A storage group can manage 1 or more compaction groups, each of which can be compacted independently. // If a tablet needs splitting, the storage group can be put in splitting mode, allowing the storage // in main compaction groups to be split into two new compaction groups, all of which will be managed // by the same storage group. // // With vnodes, a table instance in a given shard will have a single group. With tablets, a table in a // shard will have as many groups as there are tablet replicas owned by that shard. class storage_group { compaction_group_ptr _main_cg; // Holds compaction groups that now belongs to same tablet after merge. Compaction groups here will // eventually have all their data moved into main group. std::vector _merging_groups; std::vector _split_ready_groups; seastar::named_gate _async_gate; private: bool splitting_mode() const { return !_split_ready_groups.empty(); } size_t to_idx(locator::tablet_range_side) const; public: storage_group(compaction_group_ptr cg); seastar::named_gate& async_gate() { return _async_gate; } // Closes storage group without stopping its compaction groups that might be referenced elsewhere. future<> close() noexcept { return _async_gate.close(); } const dht::token_range& token_range() const noexcept; size_t memtable_count() const noexcept; const compaction_group_ptr& main_compaction_group() const noexcept; const std::vector& split_ready_compaction_groups() const; compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept; uint64_t live_disk_space_used() const noexcept; void for_each_compaction_group(std::function action) const noexcept; utils::small_vector compaction_groups() noexcept; utils::small_vector compaction_groups() const noexcept; utils::small_vector split_unready_groups() const; bool split_unready_groups_are_empty() const; void add_merging_group(compaction_group_ptr); const std::vector& merging_groups() const; future<> remove_empty_merging_groups(); // Puts the storage group in split mode, in which it internally segregates data // into two sstable sets and two memtable sets corresponding to the two adjacent // tablets post-split. // Preexisting sstables and memtables are not split yet. // Returns true if post-conditions for split() are met. bool set_split_mode(); // Like set_split_mode() but triggers splitting for old sstables and memtables and waits // for it: // 1) Flushes all memtables which were created in non-split mode, and waits for that to complete. // 2) Compacts all sstables which overlap with the split point // Returns a future which resolves when this process is complete. future<> split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info); // Make an sstable set spanning all sstables in the storage_group lw_shared_ptr make_sstable_set() const; // Flush all memtables. future<> flush() noexcept; bool can_flush() const; api::timestamp_type min_memtable_timestamp() const; api::timestamp_type min_memtable_live_timestamp() const; api::timestamp_type min_memtable_live_row_marker_timestamp() const; bool compaction_disabled() const; // Returns true when all compacted sstables were already deleted. bool no_compacted_sstable_undeleted() const; future<> stop(sstring reason = "table removal") noexcept; // Clear sstable sets void clear_sstables(); }; using storage_group_ptr = lw_shared_ptr; using storage_group_map = absl::flat_hash_map>; class storage_group_manager { protected: storage_group_map _storage_groups; protected: virtual future<> stop() = 0; public: virtual ~storage_group_manager(); // How concurrent loop and updates on the group map works without a lock: // // Firstly, all yielding loops will work on a copy of map, to prevent a // concurrent update to the map from interfering with it. // // scenario 1: // T // 1 loop on the map // 2 storage group X is stopped on cleanup // 3 loop reaches X // // Here, X is stopped before it is reached. This is handled by teaching // iteration to skip groups that were stopped by cleanup (implemented // using gate). // X survives its removal from the map since it is a lw_shared_ptr. // // // scenario 2: // T // 1 loop on the map // 2 loop reaches X // 3 storage group X is stopped on cleanup // // Here, X is stopped while being used, but that also happens during shutdown. // When X is stopped, flush happens and compactions are all stopped (exception // is not propagated upwards) and new ones cannot start afterward. // // // scenario 3: // T // 1 loop on the map // 2 storage groups are split // 3 loop reaches old groups // // Here, the loop continues post storage group split, which rebuilds the old // map into a new one. This is handled by allowing the old map to still access // the compaction groups that were reassigned according to the new tablet count. // We don't move the compaction groups, but rather they're still visible by old // and new storage groups. // Important to not return storage_group_id in yielding variants, since ids can be // invalidated when storage group count changes (e.g. split or merge). future<> parallel_foreach_storage_group(std::function(storage_group&)> f); future<> for_each_storage_group_gently(std::function(storage_group&)> f); void for_each_storage_group(std::function f) const; const storage_group_map& storage_groups() const; future<> stop_storage_groups() noexcept; void clear_storage_groups(); void remove_storage_group(size_t id); storage_group& storage_group_for_id(const schema_ptr&, size_t i) const; storage_group* maybe_storage_group_for_id(const schema_ptr&, size_t i) const; // Caller must keep the current effective_replication_map_ptr valid // until the storage_group_manager finishes update_effective_replication_map // // refresh_mutation_source must be called when there are changes to data source // structures but logical state of data is not changed (e.g. when state for a // new tablet replica is allocated). virtual void update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) = 0; virtual compaction_group& compaction_group_for_token(dht::token token) const = 0; virtual compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const = 0; virtual compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const = 0; virtual size_t log2_storage_groups() const = 0; virtual storage_group& storage_group_for_token(dht::token) const = 0; virtual utils::chunked_vector storage_groups_for_token_range(dht::token_range tr) const = 0; virtual locator::combined_load_stats table_load_stats(std::function tablet_filter) const noexcept = 0; virtual bool all_storage_groups_split() = 0; virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0; virtual future<> maybe_split_compaction_group_of(size_t idx) = 0; virtual future> maybe_split_sstable(const sstables::shared_sstable& sst) = 0; virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0; virtual lw_shared_ptr make_sstable_set() const = 0; }; }