/* * Copyright (C) 2018-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include #include #include #include #include "dht/decorated_key.hh" #include "replica/database.hh" #include "replica/data_dictionary_impl.hh" #include "replica/compaction_group.hh" #include "replica/query_state.hh" #include "sstables/shared_sstable.hh" #include "sstables/sstable_set.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" #include "utils/assert.hh" #include "utils/logalloc.hh" #include "utils/checked-file-impl.hh" #include "utils/managed_bytes.hh" #include "view_info.hh" #include "db/data_listeners.hh" #include "memtable-sstable.hh" #include "compaction/compaction_manager.hh" #include "compaction/compaction_group_view.hh" #include "sstables/sstable_directory.hh" #include "db/system_keyspace.hh" #include "db/extensions.hh" #include "query/query-result-writer.hh" #include "db/view/view_update_generator.hh" #include "utils/error_injection.hh" #include "utils/histogram_metrics_helper.hh" #include "mutation/mutation_source_metadata.hh" #include "gms/gossiper.hh" #include "gms/feature_service.hh" #include "db/config.hh" #include "db/commitlog/commitlog.hh" #include "utils/lister.hh" #include "dht/token.hh" #include "dht/i_partitioner.hh" #include "replica/global_table_ptr.hh" #include "locator/tablets.hh" #include "utils/error_injection.hh" #include "readers/reversing.hh" #include "readers/empty.hh" #include "readers/multi_range.hh" #include "readers/combined.hh" #include "readers/compacting.hh" #include "replica/schema_describe_helper.hh" #include "repair/incremental.hh" namespace replica { static logging::logger tlogger("table"); static seastar::metrics::label column_family_label("cf"); static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; table_holder::table_holder(table& t) : _holder(t.async_gate()) , _table_ptr(t.shared_from_this()) { } sstables::generation_type table::calculate_generation_for_new_table() { auto ret = _sstable_generation_generator(); tlogger.debug("{}.{} new sstable generation {}", schema()->ks_name(), schema()->cf_name(), ret); return ret; } mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& pr, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, const sstables::sstable_predicate& predicate, sstables::integrity_check integrity) const { // CAVEAT: if make_sstable_reader() is called on a single partition // we want to optimize and read exactly this partition. As a // consequence, fast_forward_to() will *NOT* work on the result, // regardless of what the fwd_mr parameter says. if (pr.is_singular() && pr.start()->value().has_key()) { return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), _stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity); } else { return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate, nullptr, integrity); } } lw_shared_ptr compaction_group::make_sstable_set() const { // Bypasses compound set if maintenance one is empty. If a SSTable is added later into maintenance, // the sstable set is refreshed to reflect the current state. if (!_maintenance_sstables->size()) { return _main_sstables; } return make_lw_shared(sstables::make_compound_sstable_set(_t.schema(), { _main_sstables, _maintenance_sstables })); } int64_t compaction_group::get_sstables_repaired_at() const noexcept { try { auto tid = locator::tablet_id(group_id()); auto erm = _t.get_effective_replication_map(); if (!erm) { return 0; } if (!erm->get_replication_strategy().uses_tablets()) { return 0; } auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_t.schema()->id()); auto& tinfo = tmap.get_tablet_info(tid); return tinfo.sstables_repaired_at; } catch (locator::no_such_tablet_map) { return 0; } } lw_shared_ptr table::make_compound_sstable_set() const { return _sg_manager->make_sstable_set(); } lw_shared_ptr compaction_group::make_maintenance_sstable_set() const { return make_lw_shared(sstables::make_partitioned_sstable_set(_t.schema(), token_range())); } void table::refresh_compound_sstable_set() { _sstables = make_compound_sstable_set(); } // Exposed for testing, not performance critical. future table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const { return do_with(dht::partition_range::make_singular(key), [s = std::move(s), permit = std::move(permit), this] (auto& range) mutable { return with_closeable(this->make_mutation_reader(std::move(s), std::move(permit), range), [] (mutation_reader& reader) { return read_mutation_from_mutation_reader(reader).then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; } return std::make_unique(std::move(mo->partition())); }); }); }); } future table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const { return find_partition(s, std::move(permit), partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) { if (!p) { return make_ready_future(); } auto r = p->find_row(*s, clustering_key); if (r) { // FIXME: remove copy if only one data source return make_ready_future(std::make_unique(*s, column_kind::regular_column, *r)); } else { return make_ready_future(); } }); } void table::add_memtables_to_reader_list(std::vector& readers, const schema_ptr& s, const reader_permit& permit, const dht::partition_range& range, const query::partition_slice& slice, const tracing::trace_state_ptr& trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, std::function reserve_fn) const { auto add_memtables_from_cg = [&] (compaction_group& cg) mutable { for (auto&& mt: *cg.memtables()) { if (auto reader_opt = mt->make_mutation_reader_opt(s, permit, range, slice, trace_state, fwd, fwd_mr)) { readers.emplace_back(std::move(*reader_opt)); } } }; // point queries can be optimized as they span a single compaction group. if (range.is_singular() && range.start()->value().has_key()) { const dht::ring_position& pos = range.start()->value(); auto& sg = storage_group_for_token(pos.token()); reserve_fn(sg.memtable_count()); sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { add_memtables_from_cg(*cg); }); return; } auto token_range = range.transform(std::mem_fn(&dht::ring_position::token)); auto sgs = storage_groups_for_token_range(token_range); reserve_fn(std::ranges::fold_left(sgs | std::views::transform(std::mem_fn(&storage_group::memtable_count)), uint64_t(0), std::plus{})); for (auto& sg : sgs) { for (auto& cg : sg->compaction_groups()) { add_memtables_from_cg(*cg); } } } mutation_reader table::make_mutation_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { if (_virtual_reader) [[unlikely]] { return (*_virtual_reader).make_mutation_reader(s, std::move(permit), range, slice, trace_state, fwd, fwd_mr); } std::vector readers; // We're assuming that cache and memtables are both read atomically // for single-key queries, so we don't need to special case memtable // undergoing a move to cache. At any given point in time between // deferring points the sum of data in memtable and cache is coherent. If // single-key queries for each data source were performed across deferring // points, it would be possible that partitions which are ahead of the // memtable cursor would be placed behind the cache cursor, resulting in // those partitions being missing in the combined reader. // // We need to handle this in range queries though, as they are always // deferring. scanning_reader from memtable.cc is falling back to reading // the sstable when memtable is flushed. After memtable is moved to cache, // new readers will no longer use the old memtable, but until then // performance may suffer. We should fix this when we add support for // range queries in cache, so that scans can always be satisfied form // memtable and cache only, as long as data is not evicted. // // https://github.com/scylladb/scylla/issues/309 // https://github.com/scylladb/scylla/issues/185 add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) { readers.reserve(memtable_count + 1); }); const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache); if (cache_enabled() && !bypass_cache) { if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) { readers.emplace_back(std::move(*reader_opt)); } } else { readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr)); } auto rd = make_combined_reader(s, permit, std::move(readers), fwd, fwd_mr); if (_config.data_listeners && !_config.data_listeners->empty()) { rd = _config.data_listeners->on_read(s, range, slice, std::move(rd)); } return rd; } sstables::shared_sstable table::make_streaming_sstable_for_write() { auto newtab = make_sstable(sstables::sstable_state::normal); tlogger.debug("Created sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name()); return newtab; } sstables::shared_sstable table::make_streaming_staging_sstable() { auto newtab = make_sstable(sstables::sstable_state::staging); tlogger.debug("Created staging sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name()); return newtab; } static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, const compaction::compaction_manager& cm, gc_clock::time_point compaction_time, bool compaction_enabled, bool compaction_can_gc) { utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_enabled", fmt::to_string(compaction_enabled)); utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_can_gc", fmt::to_string(compaction_can_gc)); if (!compaction_enabled) { return underlying; } return make_compacting_reader( std::move(underlying), compaction_time, compaction_can_gc ? can_always_purge : can_never_purge, cm.get_tombstone_gc_state(), streamed_mutation::forwarding::no); } mutation_reader table::make_streaming_reader(schema_ptr s, reader_permit permit, const dht::partition_range_vector& ranges, gc_clock::time_point compaction_time) const { auto& slice = s->full_slice(); auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { std::vector readers; add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) { readers.reserve(memtable_count + 1); }); readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); }); return maybe_compact_for_streaming( make_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, nullptr, mutation_reader::forwarding::no), get_compaction_manager(), compaction_time, _config.enable_compacting_data_for_streaming_and_repair(), _config.enable_tombstone_gc_for_streaming_and_repair()); } mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, gc_clock::time_point compaction_time) const { auto trace_state = tracing::trace_state_ptr(); const auto fwd = streamed_mutation::forwarding::no; std::vector readers; add_memtables_to_reader_list(readers, schema, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) { readers.reserve(memtable_count + 1); }); readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes)); return maybe_compact_for_streaming( make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr), get_compaction_manager(), compaction_time, _config.enable_compacting_data_for_streaming_and_repair(), _config.enable_tombstone_gc_for_streaming_and_repair()); } mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, lw_shared_ptr sstables, gc_clock::time_point compaction_time) const { auto& slice = schema->full_slice(); auto trace_state = tracing::trace_state_ptr(); const auto fwd = streamed_mutation::forwarding::no; const auto fwd_mr = mutation_reader::forwarding::no; return maybe_compact_for_streaming( sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes), get_compaction_manager(), compaction_time, _config.enable_compacting_data_for_streaming_and_repair(), _config.enable_tombstone_gc_for_streaming_and_repair()); } mutation_reader table::make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr ts) { if (!range.is_singular()) { throw std::runtime_error("table::make_cache_reader(): only singular ranges are supported"); } return _cache.make_nonpopulating_reader(std::move(schema), std::move(permit), range, slice, std::move(ts)); } future> table::lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout) { SCYLLA_ASSERT(m.schema() == _counter_cell_locks->schema()); return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout); } void table::for_each_active_memtable(noncopyable_function action) { for_each_compaction_group([&] (compaction_group& cg) { action(cg.memtables()->active_memtable()); }); } api::timestamp_type compaction_group::min_memtable_timestamp() const { if (_memtables->empty()) { return api::max_timestamp; } return std::ranges::min( *_memtables | std::views::transform( [](const shared_memtable& m) { return m->get_min_timestamp(); } )); } api::timestamp_type compaction_group::min_memtable_live_timestamp() const { if (_memtables->empty()) { return api::max_timestamp; } return std::ranges::min( *_memtables | std::views::transform( [](const shared_memtable& m) { return m->get_min_live_timestamp(); } )); } api::timestamp_type compaction_group::min_memtable_live_row_marker_timestamp() const { if (_memtables->empty()) { return api::max_timestamp; } return std::ranges::min( *_memtables | std::views::transform( [](const shared_memtable& m) { return m->get_min_live_row_marker_timestamp(); } )); } bool compaction_group::memtable_has_key(const dht::decorated_key& key) const { if (_memtables->empty()) { return false; } return std::ranges::any_of(*_memtables, std::bind(&memtable::contains_partition, std::placeholders::_1, std::ref(key))); } api::timestamp_type storage_group::min_memtable_timestamp() const { return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_timestamp))); } api::timestamp_type storage_group::min_memtable_live_timestamp() const { return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_live_timestamp))); } api::timestamp_type storage_group::min_memtable_live_row_marker_timestamp() const { return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_live_row_marker_timestamp))); } api::timestamp_type table::min_memtable_timestamp() const { return std::ranges::min(storage_groups() | std::views::values | std::views::transform(std::mem_fn(&storage_group::min_memtable_timestamp))); } api::timestamp_type table::min_memtable_live_timestamp() const { return std::ranges::min(storage_groups() | std::views::values | std::views::transform(std::mem_fn(&storage_group::min_memtable_live_timestamp))); } api::timestamp_type table::min_memtable_live_row_marker_timestamp() const { return std::ranges::min(storage_groups() | std::views::values | std::views::transform(std::mem_fn(&storage_group::min_memtable_live_row_marker_timestamp))); } static bool belongs_to_current_shard(const std::vector& shards) { return std::ranges::contains(shards, this_shard_id()); } static bool belongs_to_other_shard(const std::vector& shards) { return shards.size() != size_t(belongs_to_current_shard(shards)); } sstables::shared_sstable table::make_sstable(sstables::sstable_state state) { auto& sstm = get_sstables_manager(); return sstm.make_sstable(_schema, *_storage_opts, calculate_generation_for_new_table(), state, sstm.get_preferred_sstable_version(), sstables::sstable::format_types::big); } sstables::shared_sstable table::make_sstable() { return make_sstable(sstables::sstable_state::normal); } db_clock::time_point table::get_truncation_time() const { if (!_truncated_at) [[unlikely]] { on_internal_error(dblog, ::format("truncation time is not set, table {}.{}", _schema->ks_name(), _schema->cf_name())); } return *_truncated_at; } void table::notify_bootstrap_or_replace_start() { _is_bootstrap_or_replace = true; } void table::notify_bootstrap_or_replace_end() { _is_bootstrap_or_replace = false; trigger_offstrategy_compaction(); } inline void table::add_sstable_to_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { tracker.replace_sstables({}, {std::move(sstable)}); } inline void table::remove_sstable_from_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { tracker.replace_sstables({std::move(sstable)}, {}); } void compaction_group::backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables) { // If group was closed / is being closed, it's ok to ignore request to adjust backlog tracker, // since that might result in an exception due to the group being deregistered from compaction // manager already. And the group is being removed anyway, so that won't have any practical // impact. if (_async_gate.is_closed()) { return; } auto& tracker = get_backlog_tracker(); tracker.replace_sstables(old_sstables, new_sstables); } lw_shared_ptr compaction_group::do_add_sstable(lw_shared_ptr sstables, sstables::shared_sstable sstable, enable_backlog_tracker backlog_tracker) { if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) { on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename())); } // allow in-progress reads to continue using old list auto new_sstables = make_lw_shared(*sstables); new_sstables->insert(sstable); if (backlog_tracker) { table::add_sstable_to_backlog_tracker(get_backlog_tracker(), sstable); } _max_seen_timestamp = std::max(_max_seen_timestamp, sstable->get_stats_metadata().max_timestamp); return new_sstables; } void compaction_group::add_sstable(sstables::shared_sstable sstable) { _main_sstables = do_add_sstable(_main_sstables, std::move(sstable), enable_backlog_tracker::yes); } const lw_shared_ptr& compaction_group::main_sstables() const noexcept { return _main_sstables; } sstables::sstable_set compaction_group::make_main_sstable_set() const { // Uses view for unrepaired data, but doesn't really matter since this is only building an empty set, // and will get through the view info like group id and token range. return _t._compaction_strategy.make_sstable_set(view_for_unrepaired_data()); } void compaction_group::set_main_sstables(lw_shared_ptr new_main_sstables) { _main_sstables = std::move(new_main_sstables); } void compaction_group::add_maintenance_sstable(sstables::shared_sstable sst) { _maintenance_sstables = do_add_sstable(_maintenance_sstables, std::move(sst), enable_backlog_tracker::no); } const lw_shared_ptr& compaction_group::maintenance_sstables() const noexcept { return _maintenance_sstables; } void compaction_group::set_maintenance_sstables(lw_shared_ptr new_maintenance_sstables) { _maintenance_sstables = std::move(new_maintenance_sstables); } void table::add_sstable(compaction_group& cg, sstables::shared_sstable sstable) { cg.add_sstable(std::move(sstable)); refresh_compound_sstable_set(); } void table::add_maintenance_sstable(compaction_group& cg, sstables::shared_sstable sst) { cg.add_maintenance_sstable(std::move(sst)); refresh_compound_sstable_set(); } void table::do_update_off_strategy_trigger() { _off_strategy_trigger.rearm(timer<>::clock::now() + std::chrono::minutes(5)); } // If there are more sstables to be added to the off-strategy sstable set, call // update_off_strategy_trigger to update the timer and delay to trigger // off-strategy compaction. The off-strategy compaction will be triggered when // the timer is expired. void table::update_off_strategy_trigger() { if (_off_strategy_trigger.armed()) { do_update_off_strategy_trigger(); } } // Call enable_off_strategy_trigger to enable the automatic off-strategy // compaction trigger. void table::enable_off_strategy_trigger() { do_update_off_strategy_trigger(); } storage_group_manager::~storage_group_manager() = default; // exception-less attempt to hold gate. // TODO: move it to seastar. static std::optional try_hold_gate(gate& g) noexcept { return g.is_closed() ? std::nullopt : std::make_optional(g.hold()); } future<> storage_group_manager::parallel_foreach_storage_group(std::function(storage_group&)> f) { co_await coroutine::parallel_for_each(_storage_groups | std::views::values, [&] (const storage_group_ptr sg) -> future<> { // Table-wide ops, like 'nodetool compact', are inherently racy with migrations, so it's okay to skip // storage of tablets being migrated away. if (auto holder = try_hold_gate(sg->async_gate())) { co_await f(*sg.get()); } }); } future<> storage_group_manager::for_each_storage_group_gently(std::function(storage_group&)> f) { auto storage_groups = _storage_groups | std::views::values | std::ranges::to(); for (auto& sg: storage_groups) { if (auto holder = try_hold_gate(sg->async_gate())) { co_await f(*sg.get()); } } } void storage_group_manager::for_each_storage_group(std::function f) const { for (auto& [id, sg]: _storage_groups) { if (auto holder = try_hold_gate(sg->async_gate())) { f(id, *sg); } } } const storage_group_map& storage_group_manager::storage_groups() const { return _storage_groups; } future<> storage_group_manager::stop_storage_groups() noexcept { co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); }); co_await stop(); } void storage_group_manager::clear_storage_groups() { for (auto& [id, sg]: _storage_groups) { sg->clear_sstables(); } } void storage_group_manager::remove_storage_group(size_t id) { if (auto it = _storage_groups.find(id); it != _storage_groups.end()) { _storage_groups.erase(it); } else { throw std::out_of_range(format("remove_storage_group: storage group with id={} not found", id)); } } storage_group& storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const { auto it = _storage_groups.find(i); if (it == _storage_groups.end()) [[unlikely]] { throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name())); } return *it->second.get(); } storage_group* storage_group_manager::maybe_storage_group_for_id(const schema_ptr& s, size_t i) const { auto it = _storage_groups.find(i); return it != _storage_groups.end() ? &*it->second.get() : nullptr; } class single_storage_group_manager final : public storage_group_manager { replica::table& _t; storage_group_ptr _single_sg; compaction_group* _single_cg; compaction_group& get_compaction_group() const noexcept { return *_single_cg; } public: single_storage_group_manager(replica::table& t) : _t(t) { storage_group_map r; // Incremental repair is not supported with vnodes, so all sstables will be considered unrepaired. auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&, int64_t sstables_repaired_at) { return repair_sstable_classification::unrepaired; }; // this might not reflect real vnode range for this node, but with 256 tokens, the actual // first and last tokens are likely to be ~0.5% of the edges, so any measurement against // this accurate enough token range will be likely up to ~1% off. // TODO: we could fed actual vnode range here, but we might bump into a chicken and egg // problem if e.g. a system table is created before tokens were allocated. auto full_token_range = dht::token_range::make(dht::first_token(), dht::last_token()); auto cg = make_lw_shared(_t, size_t(0), std::move(full_token_range), noop_repair_sstable_classifier); _single_cg = cg.get(); auto sg = make_lw_shared(std::move(cg)); _single_sg = sg; r[0] = std::move(sg); _storage_groups = std::move(r); } future<> stop() override { return make_ready_future<>(); } void update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override {} compaction_group& compaction_group_for_token(dht::token token) const override { return get_compaction_group(); } utils::chunked_vector storage_groups_for_token_range(dht::token_range tr) const override { utils::chunked_vector ret; ret.push_back(_single_sg); return ret; } compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override { return get_compaction_group(); } compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override { return get_compaction_group(); } size_t log2_storage_groups() const override { return 0; } storage_group& storage_group_for_token(dht::token token) const override { return *_single_sg; } locator::combined_load_stats table_load_stats(std::function) const override { return locator::combined_load_stats{ .table_ls = locator::table_load_stats{ .size_in_bytes = _single_sg->live_disk_space_used(), .split_ready_seq_number = std::numeric_limits::min()}, .tablet_ls = locator::tablet_load_stats{} }; } bool all_storage_groups_split() override { return true; } future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); } future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); } future> maybe_split_sstable(const sstables::shared_sstable& sst) override { return make_ready_future>(std::vector{sst}); } dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); } lw_shared_ptr make_sstable_set() const override { return get_compaction_group().make_sstable_set(); } }; class tablet_storage_group_manager final : public storage_group_manager { replica::table& _t; locator::host_id _my_host_id; const locator::tablet_map* _tablet_map; future<> _stop_fut = make_ready_future(); // Every table replica that completes split work will load the seq number from tablet metadata into its local // state. So when coordinator pull the local state of a table, it will know whether the table is ready for the // current split, and not a previously revoked (stale) decision. // The minimum value, which is a negative number, is not used by coordinator for first decision. locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); future<> _merge_completion_fiber; condition_variable _merge_completion_event; // Holds compaction reenabler which disables compaction temporarily during tablet merge std::vector _compaction_reenablers_for_merging; private: const schema_ptr& schema() const { return _t.schema(); } const locator::tablet_map& tablet_map() const noexcept { return *_tablet_map; } size_t tablet_count() const noexcept { return tablet_map().tablet_count(); } future split_compaction_options() const noexcept; // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with // each tablet split into two, so this replica will remap all of its compaction groups // that were previously split. void handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); // Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into // the new tablet id (X >> 1). In practice, that means storage groups for X and X+1 // are merged into a new storage group with id (X >> 1). void handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap); // When merge completes, compaction groups of sibling tablets are added to same storage // group, but they're not merged yet into one, since the merge completion handler happens // inside the erm updater which must complete ASAP. Therefore, those groups will be merged // into a single one (main) in background. future<> merge_completion_fiber(); storage_group& storage_group_for_id(size_t i) const { return storage_group_manager::storage_group_for_id(schema(), i); } size_t tablet_id_for_token(dht::token t) const noexcept { return tablet_map().get_tablet_id(t).value(); } std::pair storage_group_of(dht::token t) const { auto [id, side] = tablet_map().get_tablet_id_and_range_side(t); auto idx = id.value(); #ifndef SCYLLA_BUILD_MODE_RELEASE if (idx >= tablet_count()) { on_fatal_internal_error(tlogger, format("storage_group_of: index out of range: idx={} size_log2={} size={} token={}", idx, log2_storage_groups(), tablet_count(), t)); } auto& sg = storage_group_for_id(idx); if (!t.is_minimum() && !t.is_maximum() && !sg.token_range().contains(t, dht::token_comparator())) { on_fatal_internal_error(tlogger, format("storage_group_of: storage_group idx={} range={} does not contain token={}", idx, sg.token_range(), t)); } #endif return { idx, side }; } repair_classifier_func make_repair_sstable_classifier_func() const { // FIXME: implement it for incremental repair! return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) { bool is_repaired = repair::is_repaired(sstables_repaired_at, sst); if (is_repaired) { return repair_sstable_classification::repaired; } else { if (!sst->being_repaired.uuid().is_null()) { return repair_sstable_classification::repairing; } else { return repair_sstable_classification::unrepaired; } } }; } storage_group_ptr allocate_storage_group(const locator::tablet_map& tmap, locator::tablet_id tid, dht::token_range range) const { auto cg = make_lw_shared(_t, tid.value(), std::move(range), make_repair_sstable_classifier_func()); auto sg = make_lw_shared(std::move(cg)); if (tmap.needs_split()) { sg->set_split_mode(); } return sg; } public: tablet_storage_group_manager(table& t, const locator::effective_replication_map& erm) : _t(t) , _my_host_id(erm.get_token_metadata().get_my_id()) , _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id())) , _merge_completion_fiber(merge_completion_fiber()) { storage_group_map ret; auto& tmap = tablet_map(); auto local_replica = locator::tablet_replica{_my_host_id, this_shard_id()}; for (auto tid : tmap.tablet_ids()) { if (!tmap.has_replica(tid, local_replica)) { continue; } // if the tablet was cleaned up already on this replica, don't allocate a storage group for it. auto trinfo = tmap.get_tablet_transition_info(tid); if (trinfo && locator::is_post_cleanup(local_replica, tmap.get_tablet_info(tid), *trinfo)) { continue; } auto range = tmap.get_token_range(tid); tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name()); ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range)); } _storage_groups = std::move(ret); } future<> stop() override { _merge_completion_event.signal(); return when_all(std::exchange(_merge_completion_fiber, make_ready_future<>()), std::exchange(_stop_fut, make_ready_future())).discard_result(); } void update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) override; compaction_group& compaction_group_for_token(dht::token token) const override; utils::chunked_vector storage_groups_for_token_range(dht::token_range tr) const override; compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override; compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override; size_t log2_storage_groups() const override { return log2ceil(tablet_map().tablet_count()); } storage_group& storage_group_for_token(dht::token token) const override { return storage_group_for_id(storage_group_of(token).first); } locator::combined_load_stats table_load_stats(std::function tablet_filter) const override; bool all_storage_groups_split() override; future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override; future<> maybe_split_compaction_group_of(size_t idx) override; future> maybe_split_sstable(const sstables::shared_sstable& sst) override; dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override { return tablet_map().get_token_range_after_split(token); } lw_shared_ptr make_sstable_set() const override { // FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time. return make_tablet_sstable_set(schema(), *this, *_tablet_map); } }; bool table::uses_tablets() const { return _erm && _erm->get_replication_strategy().uses_tablets(); } storage_group::storage_group(compaction_group_ptr cg) : _main_cg(cg) , _async_gate(format("[storage_group {}.{} {}]", cg->schema()->ks_name(), cg->schema()->cf_name(), cg->group_id())) { } const dht::token_range& storage_group::token_range() const noexcept { return _main_cg->token_range(); } const compaction_group_ptr& storage_group::main_compaction_group() const noexcept { return _main_cg; } const std::vector& storage_group::split_ready_compaction_groups() const { return _split_ready_groups; } size_t storage_group::to_idx(locator::tablet_range_side side) const { return size_t(side); } compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_range_side side) noexcept { if (splitting_mode()) { return _split_ready_groups[to_idx(side)]; } return _main_cg; } void storage_group::for_each_compaction_group(std::function action) const { action(_main_cg); for (auto& cg : _merging_groups) { action(cg); } for (auto& cg : _split_ready_groups) { action(cg); } } utils::small_vector storage_group::compaction_groups() { utils::small_vector cgs; for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) { cgs.push_back(cg); }); return cgs; } utils::small_vector storage_group::compaction_groups() const { utils::small_vector cgs; for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) { cgs.push_back(cg); }); return cgs; } utils::small_vector storage_group::split_unready_groups() const { utils::small_vector cgs; cgs.push_back(_main_cg); std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs)); return cgs; } bool storage_group::split_unready_groups_are_empty() const { return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty)); } bool storage_group::set_split_mode() { // A group being stopped (e.g. during migration cleanup) cannot satisfy split mode. // Also, a race can happen if new groups are added while old ones are being stopped, // so the new ones can be left unstopped, potentially resulting in use-after-free. if (_async_gate.is_closed()) { return false; } if (!splitting_mode()) { auto create_cg = [this] () -> compaction_group_ptr { // TODO: use the actual sub-ranges instead, to help incremental selection on the read path. return compaction_group::make_empty_group(*_main_cg); }; tlogger.debug("storage_group::set_split_mode: Set sstables_repaired_at={} for split old_group={} old_range={}", _main_cg->get_sstables_repaired_at(), _main_cg->group_id(), _main_cg->token_range()); std::vector split_ready_groups(2); split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg(); split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg(); _split_ready_groups = std::move(split_ready_groups); } // The storage group is considered "split ready" if all split unready groups (main + merging) are empty. return split_unready_groups_are_empty(); } void storage_group::add_merging_group(compaction_group_ptr cg) { _merging_groups.push_back(std::move(cg)); } const std::vector& storage_group::merging_groups() const { return _merging_groups; } future<> storage_group::remove_empty_merging_groups() { if (_async_gate.is_closed()) { co_return; } for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) { co_await group->stop("tablet merge"); } std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty)); } future<> compaction_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) { auto& cm = get_compaction_manager(); for (auto view : all_views()) { auto lock_holder = co_await cm.get_incremental_repair_read_lock(*view, "storage_group_split"); // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. co_await cm.perform_offstrategy(*view, tablet_split_task_info); co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info); } } future<> storage_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) { if (set_split_mode()) { co_return; } co_await utils::get_local_injector().inject("delay_split_compaction", 5s); if (split_unready_groups_are_empty()) { co_return; } for (auto cg : split_unready_groups()) { if (cg->async_gate().is_closed()) { continue; } auto holder = cg->async_gate().hold(); co_await cg->flush(); co_await cg->split(opt, tablet_split_task_info); } } lw_shared_ptr storage_group::make_sstable_set() const { if (_split_ready_groups.empty() && _merging_groups.empty()) { return _main_cg->make_sstable_set(); } const auto& schema = _main_cg->_t.schema(); std::vector> underlying; underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size()); underlying.emplace_back(_main_cg->make_sstable_set()); for (const auto& cg : _merging_groups) { if (!cg->empty()) { underlying.emplace_back(cg->make_sstable_set()); } } for (const auto& cg : _split_ready_groups) { underlying.emplace_back(cg->make_sstable_set()); } return make_lw_shared(sstables::make_compound_sstable_set(schema, std::move(underlying))); } lw_shared_ptr table::sstable_set_for_tombstone_gc(const compaction_group& cg) const { auto& sg = storage_group_for_id(cg.group_id()); return sg.make_sstable_set(); } bool tablet_storage_group_manager::all_storage_groups_split() { auto& tmap = tablet_map(); if (_split_ready_seq_number == tmap.resize_decision().sequence_number) { return true; } bool split_ready = true; for (const storage_group_ptr& sg : _storage_groups | std::views::values) { split_ready &= sg->set_split_mode(); } // The table replica will say to coordinator that its split status is ready by // mirroring the sequence number from tablet metadata into its local state, // which is pulled periodically by coordinator. if (split_ready) { _split_ready_seq_number = tmap.resize_decision().sequence_number; tlogger.info0("Setting split ready sequence number to {} for table {}.{}", _split_ready_seq_number, schema()->ks_name(), schema()->cf_name()); } return split_ready; } bool table::all_storage_groups_split() { return _sg_manager->all_storage_groups_split(); } future tablet_storage_group_manager::split_compaction_options() const noexcept { // Split must work with a snapshot of tablet map, since it expects stability // throughout its execution. auto erm = _t.get_effective_replication_map(); auto tablet_map_ptr = co_await erm->get_token_metadata().tablets().get_tablet_map_ptr(schema()->id()); co_return compaction::compaction_type_options::split([tablet_map_ptr = make_lw_shared(std::move(tablet_map_ptr))] (dht::token t) { // Classifies the input stream into either left or right side. auto [_, side] = (*tablet_map_ptr)->get_tablet_id_and_range_side(t); return mutation_writer::token_group_id(side); }); } future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) { compaction::compaction_type_options::split opt = co_await split_compaction_options(); co_await utils::get_local_injector().inject("split_storage_groups_wait", [] (auto& handler) -> future<> { dblog.info("split_storage_groups_wait: waiting"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); dblog.info("split_storage_groups_wait: done"); }, false); co_await for_each_storage_group_gently([opt, tablet_split_task_info] (storage_group& storage_group) { return storage_group.split(opt, tablet_split_task_info); }); } future<> table::split_all_storage_groups(tasks::task_info tablet_split_task_info) { auto holder = async_gate().hold(); co_await _sg_manager->split_all_storage_groups(tablet_split_task_info); } future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t idx) { if (!tablet_map().needs_split()) { co_return; } tasks::task_info tablet_split_task_info{tasks::task_id{tablet_map().resize_task_info().tablet_task_id.uuid()}, 0}; auto sg = _storage_groups[idx]; if (!sg) { on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard", idx, schema()->ks_name(), schema()->cf_name())); } co_return co_await sg->split(co_await split_compaction_options(), tablet_split_task_info); } future> tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) { if (!tablet_map().needs_split()) { co_return std::vector{sst}; } auto& cg = compaction_group_for_sstable(sst); auto holder = cg.async_gate().hold(); auto& view = cg.view_for_sstable(sst); auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable"); co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, co_await split_compaction_options()); } future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { auto holder = async_gate().hold(); co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value()); } future> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) { auto holder = async_gate().hold(); co_return co_await _sg_manager->maybe_split_sstable(sst); } dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept { return _sg_manager->get_token_range_after_split(token); } std::unique_ptr table::make_storage_group_manager() { std::unique_ptr ret; if (uses_tablets()) { ret = std::make_unique(*this, *_erm); } else { ret = std::make_unique(*this); } return ret; } compaction_group* table::get_compaction_group(size_t id) const { return storage_group_for_id(id).main_compaction_group().get(); } storage_group& table::storage_group_for_token(dht::token token) const { return _sg_manager->storage_group_for_token(token); } storage_group& table::storage_group_for_id(size_t i) const { return _sg_manager->storage_group_for_id(_schema, i); } compaction_group& tablet_storage_group_manager::compaction_group_for_token(dht::token token) const { auto [idx, range_side] = storage_group_of(token); auto& sg = storage_group_for_id(idx); return *sg.select_compaction_group(range_side); } compaction_group& table::compaction_group_for_token(dht::token token) const { return _sg_manager->compaction_group_for_token(token); } utils::chunked_vector tablet_storage_group_manager::storage_groups_for_token_range(dht::token_range tr) const { utils::chunked_vector ret; auto cmp = dht::token_comparator(); size_t candidate_start = tr.start() ? tablet_id_for_token(tr.start()->value()) : size_t(0); size_t candidate_end = tr.end() ? tablet_id_for_token(tr.end()->value()) : (tablet_count() - 1); while (candidate_start <= candidate_end) { auto it = _storage_groups.find(candidate_start++); if (it == _storage_groups.end()) { continue; } auto& sg = it->second; if (sg && tr.overlaps(sg->token_range(), cmp)) { ret.push_back(sg); } } return ret; } utils::chunked_vector table::storage_groups_for_token_range(dht::token_range tr) const { return _sg_manager->storage_groups_for_token_range(tr); } compaction_group& tablet_storage_group_manager::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const { return compaction_group_for_token(dht::get_token(*s, key)); } compaction_group& table::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const { return _sg_manager->compaction_group_for_key(key, s); } compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(const sstables::shared_sstable& sst) const { auto [first_id, first_range_side] = storage_group_of(sst->get_first_decorated_key().token()); auto [last_id, last_range_side] = storage_group_of(sst->get_last_decorated_key().token()); if (first_id != last_id) { on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}", sst->get_filename(), first_id, last_id)); } try { auto& sg = storage_group_for_id(first_id); if (first_range_side != last_range_side) { return *sg.main_compaction_group(); } return *sg.select_compaction_group(first_range_side); } catch (std::out_of_range& e) { on_internal_error(tlogger, format("Unable to load SSTable {} : {}", sst->get_filename(), e.what())); } } compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const { return _sg_manager->compaction_group_for_sstable(sst); } future<> table::parallel_foreach_compaction_group(std::function(compaction_group&)> action) { co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> { co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> { tlogger.info("foreach_compaction_group_wait: waiting"); while (!handler.poll_for_message() && !_async_gate.is_closed() && !sg.async_gate().is_closed()) { co_await sleep(std::chrono::milliseconds(5)); } tlogger.info("foreach_compaction_group_wait: released"); }); co_await coroutine::parallel_for_each(sg.compaction_groups(), [&] (compaction_group_ptr cg) -> future<> { if (auto holder = try_hold_gate(cg->async_gate())) { co_await action(*cg); } }); }); } void table::for_each_compaction_group(std::function action) { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { if (auto holder = try_hold_gate(cg->async_gate())) { action(*cg); } }); }); } void table::for_each_compaction_group(std::function action) const { _sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) { sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) { if (auto holder = try_hold_gate(cg->async_gate())) { action(*cg); } }); }); } const storage_group_map& table::storage_groups() const { return _sg_manager->storage_groups(); } future> table::take_storage_snapshot(dht::token_range tr) { utils::chunked_vector ret; for (auto& sg : storage_groups_for_token_range(tr)) { co_await utils::get_local_injector().inject("take_storage_snapshot", utils::wait_for_message(60s)); // We don't care about sstables in snapshot being unlinked, as the file // descriptors remain opened until last reference to them are gone. // Also, we should be careful with taking a deletion lock here as a // deadlock might occur due to memtable flush backpressure waiting on // compaction to reduce the backlog. co_await sg->flush(); // The sstable set must be obtained *after* the deletion lock is taken, // otherwise components of sstables in the set might be unlinked from the filesystem // by compaction while we are waiting for the lock. auto deletion_guard = co_await get_sstable_list_permit(); // It's vital that we build a set on the storage group level, since sstables // might move across compaction groups in the background. co_await sg->make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> { ret.push_back({ .sst = sst, .files = co_await sst->readable_file_for_all_components(), }); }); } co_return std::move(ret); } future> table::take_sstable_set_snapshot() { auto deletion_guard = co_await get_sstable_list_permit(); utils::chunked_vector result; co_await get_sstable_set().for_each_sstable_gently([&] (sstables::shared_sstable sst) { result.push_back(sst); }); co_return result; } future> table::clone_tablet_storage(locator::tablet_id tid) { utils::chunked_vector ret; auto holder = async_gate().hold(); auto& sg = storage_group_for_id(tid.value()); auto sg_holder = sg.async_gate().hold(); co_await sg.flush(); // The sstable set must be obtained *after* the deletion lock is taken, // otherwise components of sstables in the set might be unlinked from the filesystem // by compaction while we are waiting for the lock. auto deletion_guard = co_await get_sstable_list_permit(); co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> { ret.push_back(co_await sst->clone(calculate_generation_for_new_table())); }); co_return ret; } void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept { _stats.live_disk_space_used += sst->get_file_size_stats(); _stats.total_disk_space_used += sst->get_file_size_stats(); _stats.live_sstable_count++; } future<> table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. // atomically load all opened sstables into column family. if (!offstrategy) { add_sstable(cg, sst); } else { add_maintenance_sstable(cg, sst); } update_stats_for_new_sstable(sst); if (trigger_compaction) { try_trigger_compaction(cg); } }), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) { return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key())); }); } future<> table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) { for (auto sst : co_await maybe_split_new_sstable(new_sst)) { auto& cg = compaction_group_for_sstable(sst); // Hold gate to make share compaction group is alive. auto holder = cg.async_gate().hold(); co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction); } } future<> table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) { bool do_trigger_compaction = offstrategy == sstables::offstrategy::no; co_await do_add_sstable_and_update_cache(std::move(sst), offstrategy, do_trigger_compaction); } future<> table::add_sstables_and_update_cache(const std::vector& ssts) { constexpr bool do_not_trigger_compaction = false; for (auto& sst : ssts) { co_await do_add_sstable_and_update_cache(sst, sstables::offstrategy::no, do_not_trigger_compaction); } trigger_compaction(); } future<> table::update_cache(compaction_group& cg, lw_shared_ptr m, std::vector ssts) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); mutation_source_opt ms_opt; if (ssts.size() == 1) { ms_opt = ssts.front()->as_mutation_source(); } else { std::vector sources; sources.reserve(ssts.size()); for (auto& sst : ssts) { sources.push_back(sst->as_mutation_source()); } ms_opt = make_combined_mutation_source(std::move(sources)); } auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(*ms_opt), &cg] () mutable { // FIXME: the following isn't exception safe. for (auto& sst : ssts) { add_sstable(cg, sst); update_stats_for_new_sstable(sst); } m->mark_flushed(std::move(new_ssts_ms)); try_trigger_compaction(cg); }); if (cache_enabled()) { co_return co_await _cache.update(std::move(adder), *m); } else { co_return co_await _cache.invalidate(std::move(adder)).then([m] { return m->clear_gently(); }); } } // Handles permit management only, used for situations where we don't want to inform // the compaction manager about backlogs (i.e., tests) class permit_monitor : public sstables::write_monitor { lw_shared_ptr _permit; public: permit_monitor(lw_shared_ptr permit) : _permit(std::move(permit)) { } virtual void on_write_started(const sstables::writer_offset_tracker& t) override { } virtual void on_data_write_completed() override { // We need to start a flush before the current one finishes, otherwise // we'll have a period without significant disk activity when the current // SSTable is being sealed, the caches are being updated, etc. To do that, // we ensure the permit doesn't outlive this continuation. *_permit = sstable_write_permit::unconditional(); } }; // Handles all tasks related to sstable writing: permit management, compaction backlog updates, etc class database_sstable_write_monitor : public permit_monitor, public compaction::backlog_write_progress_manager { sstables::shared_sstable _sst; compaction_group& _cg; const sstables::writer_offset_tracker* _tracker = nullptr; uint64_t _progress_seen = 0; api::timestamp_type _maximum_timestamp; public: database_sstable_write_monitor(lw_shared_ptr permit, sstables::shared_sstable sst, compaction_group& cg, api::timestamp_type max_timestamp) : permit_monitor(std::move(permit)) , _sst(std::move(sst)) , _cg(cg) , _maximum_timestamp(max_timestamp) {} database_sstable_write_monitor(const database_sstable_write_monitor&) = delete; database_sstable_write_monitor(database_sstable_write_monitor&& x) = default; ~database_sstable_write_monitor() { // We failed to finish handling this SSTable, so we have to update the backlog_tracker // about it. if (_sst) { _cg.get_backlog_tracker().revert_charges(_sst); } } virtual void on_write_started(const sstables::writer_offset_tracker& t) override { _tracker = &t; _cg.get_backlog_tracker().register_partially_written_sstable(_sst, *this); } virtual void on_data_write_completed() override { permit_monitor::on_data_write_completed(); _progress_seen = _tracker->offset; _tracker = nullptr; } virtual uint64_t written() const override { if (_tracker) { return _tracker->offset; } return _progress_seen; } api::timestamp_type maximum_timestamp() const override { return _maximum_timestamp; } unsigned level() const override { return 0; } }; // The function never fails. // It either succeeds eventually after retrying or aborts. future<> table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) noexcept { auto old = cg.memtables()->back(); tlogger.debug("Sealing active memtable of {}.{}, partitions: {}, occupancy: {}", _schema->ks_name(), _schema->cf_name(), old->partition_count(), old->occupancy()); if (old->empty()) { tlogger.debug("Memtable is empty"); co_return co_await _flush_barrier.advance_and_await(); } auto permit = std::move(flush_permit); auto r = exponential_backoff_retry(100ms, 10s); // Try flushing for around half an hour (30 minutes every 10 seconds) int default_retries = 30 * 60 / 10; int allowed_retries = default_retries; std::optional op; size_t memtable_size; future<> previous_flush = make_ready_future<>(); auto with_retry = [&] (std::function()> func) -> future<> { for (;;) { std::exception_ptr ex; try { co_return co_await func(); } catch (...) { ex = std::current_exception(); _config.cf_stats->failed_memtables_flushes_count++; auto should_retry = [](auto* ep) { int ec = ep->code().value(); return ec == ENOSPC || ec == EDQUOT || ec == EPERM || ec == EACCES; }; if (try_catch(ex)) { // There is a chance something else will free the memory, so we can try again allowed_retries--; } else if (auto ep = try_catch(ex)) { allowed_retries = should_retry(ep) ? default_retries : 0; } else if (auto ep = try_catch(ex)) { allowed_retries = should_retry(ep) ? default_retries : 0; } else if (try_catch(ex)) { allowed_retries = default_retries; } else { allowed_retries = 0; } if (allowed_retries <= 0) { // At this point we don't know what has happened and it's better to potentially // take the node down and rely on commitlog to replay. // // FIXME: enter maintenance mode when available. // since replaying the commitlog with a corrupt mutation // may end up in an infinite crash loop. tlogger.error("Memtable flush failed due to: {}. Aborting, at {}", ex, current_backtrace()); std::abort(); } } if (_async_gate.is_closed()) { tlogger.warn("Memtable flush failed due to: {}. Dropped due to shutdown", ex); co_await std::move(previous_flush); co_await coroutine::return_exception_ptr(std::move(ex)); } tlogger.warn("Memtable flush failed due to: {}. Will retry in {}ms", ex, r.sleep_time().count()); co_await r.retry(); } }; co_await with_retry([&] { tlogger.debug("seal_active_memtable: adding memtable"); utils::get_local_injector().inject("table_seal_active_memtable_add_memtable", []() { throw std::bad_alloc(); }); cg.memtables()->add_memtable(); _highest_flushed_rp = std::max(_highest_flushed_rp, old->replay_position()); // no exceptions allowed (nor expected) from this point on _stats.memtable_switch_count++; [&] () noexcept { // This will set evictable occupancy of the old memtable region to zero, so that // this region is considered last for flushing by dirty_memory_manager::flush_when_needed(). // If we don't do that, the flusher may keep picking up this memtable list for flushing after // the permit is released even though there is not much to flush in the active memtable of this list. old->region().ground_evictable_occupancy(); memtable_size = old->occupancy().total_space(); }(); return make_ready_future<>(); }); co_await with_retry([&] { previous_flush = _flush_barrier.advance_and_await(); utils::get_local_injector().inject("table_seal_active_memtable_start_op", []() { throw std::bad_alloc(); }); op = _flush_barrier.start(); // no exceptions allowed (nor expected) from this point on _stats.pending_flushes++; _config.cf_stats->pending_memtables_flushes_count++; _config.cf_stats->pending_memtables_flushes_bytes += memtable_size; return make_ready_future<>(); }); auto undo_stats = std::make_optional(deferred_action([this, memtable_size] () noexcept { _stats.pending_flushes--; _config.cf_stats->pending_memtables_flushes_count--; _config.cf_stats->pending_memtables_flushes_bytes -= memtable_size; })); co_await with_retry([&] () -> future<> { // Reacquiring the write permit might be needed if retrying flush if (!permit.has_sstable_write_permit()) { tlogger.debug("seal_active_memtable: reacquiring write permit"); utils::get_local_injector().inject("table_seal_active_memtable_reacquire_write_permit", []() { throw std::bad_alloc(); }); permit = co_await std::move(permit).reacquire_sstable_write_permit(); } auto write_permit = permit.release_sstable_write_permit(); utils::get_local_injector().inject("table_seal_active_memtable_try_flush", []() { throw std::system_error(ENOSPC, std::system_category(), "Injected error"); }); co_return co_await this->try_flush_memtable_to_sstable(cg, old, std::move(write_permit)); }); undo_stats.reset(); if (_commitlog) { _commitlog->discard_completed_segments(_schema->id(), old->get_and_discard_rp_set()); } co_await std::move(previous_flush); // keep `op` alive until after previous_flush resolves // FIXME: release commit log // FIXME: provide back-pressure to upper layers } future<> table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr old, sstable_write_permit&& permit) { auto try_flush = [this, old = std::move(old), permit = make_lw_shared(std::move(permit)), &cg] () mutable -> future<> { // Note that due to our sharded architecture, it is possible that // in the face of a value change some shards will backup sstables // while others won't. // // This is, in theory, possible to mitigate through a rwlock. // However, this doesn't differ from the situation where all tables // are coming from a single shard and the toggle happens in the // middle of them. // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. auto newtabs = std::vector(); auto metadata = mutation_source_metadata{}; metadata.min_timestamp = old->get_min_timestamp(); metadata.max_timestamp = old->get_max_timestamp(); auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema); if (!cg.async_gate().is_closed()) { co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data()); } auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> { std::exception_ptr ex; try { sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable"); cfg.backup = incremental_backups_enabled(); auto newtab = make_sstable(); newtabs.push_back(newtab); tlogger.debug("Flushing to {}", newtab->get_filename()); auto monitor = database_sstable_write_monitor(permit, newtab, cg, old->get_max_timestamp()); co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg); } catch (...) { ex = std::current_exception(); } co_await reader.close(); co_await coroutine::return_exception_ptr(std::move(ex)); }); auto f = consumer(old->make_flush_reader( old->schema(), compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {}))); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to // priority inversion. auto post_flush = [this, old = std::move(old), &newtabs, f = std::move(f), &cg] () mutable -> future<> { try { co_await std::move(f); co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> { co_await newtab->open_data(); tlogger.debug("Flushing to {} done", newtab->get_filename()); }); co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] { return update_cache(cg, old, newtabs); }); co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> { const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name()); if (this_table_name == handler.get("table_name")) { tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name); handler.set("suspended", true); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name); } }); cg.memtables()->erase(old); tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size()); co_return; } catch (const std::exception& e) { for (auto& newtab : newtabs) { newtab->mark_for_deletion(); tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e); } _config.cf_stats->failed_memtables_flushes_count++; // If we failed this write we will try the write again and that will create a new flush reader // that will decrease dirty memory again. So we need to reset the accounting. old->revert_flushed_memory(); throw; } }; co_return co_await with_scheduling_group(default_scheduling_group(), std::ref(post_flush)); }; co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush)); } void table::start() { start_compaction(); if (_schema->memtable_flush_period() > 0) { _flush_timer.arm(std::chrono::milliseconds(_schema->memtable_flush_period())); } } future<> table::stop() { if (_async_gate.is_closed()) { co_return; } _flush_timer.cancel(); // Allow `compaction_group::stop` to stop ongoing compactions // while they may still hold the table _async_gate auto gate_closed_fut = _async_gate.close(); co_await when_all( _pending_reads_phaser.close(), _pending_writes_phaser.close(), _pending_streams_phaser.close()); // Allow parallel flushes from the commitlog path // to synchronize with table::stop { auto op = _pending_flushes_phaser.start(); co_await _sg_manager->stop_storage_groups(); } co_await _pending_flushes_phaser.close(); co_await _sstable_deletion_gate.close(); co_await std::move(gate_closed_fut); co_await get_row_cache().invalidate(row_cache::external_updater([this] { _sg_manager->clear_storage_groups(); _sstables = make_compound_sstable_set(); })); _cache.refresh_snapshot(); } static seastar::metrics::label_instance node_table_metrics("__per_table", "node"); void table::set_metrics() { auto cf = column_family_label(_schema->cf_name()); auto ks = keyspace_label(_schema->ks_name()); namespace ms = seastar::metrics; if (_config.enable_metrics_reporting) { _metrics.add_group("column_family", { ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_rows_dropped_by_tombstones", _stats.memtable_app_stats.rows_dropped_by_tombstones, ms::description("Number of rows dropped in memtables by a tombstone write"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_rows_compacted_with_tombstones", _stats.memtable_app_stats.rows_compacted_with_tombstones, ms::description("Number of rows scanned during write of a tombstone for the purpose of compaction in memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_range_tombstone_reads", _stats.memtable_range_tombstone_reads, ms::description("Number of range tombstones read from memtables"))(cf)(ks).set_skip_when_empty(), ms::make_counter("memtable_row_tombstone_reads", _stats.memtable_row_tombstone_reads, ms::description("Number of row tombstones read from memtables"))(cf)(ks), ms::make_gauge("pending_tasks", ms::description("Estimated number of tasks pending for this column family"), _stats.pending_flushes)(cf)(ks), ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks), ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks), ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks), ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks), ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks), ms::make_gauge("pending_sstable_deletions", ms::description("Number of tasks waiting to delete sstables from a table"), [this] { return _stats.pending_sstable_deletions; })(cf)(ks) }); // Metrics related to row locking auto add_row_lock_metrics = [this, ks, cf] (row_locker::single_lock_stats& stats, sstring stat_name) { _metrics.add_group("column_family", { ms::make_total_operations(format("row_lock_{}_acquisitions", stat_name), stats.lock_acquisitions, ms::description(format("Row lock acquisitions for {} lock", stat_name)))(cf)(ks).set_skip_when_empty(), ms::make_queue_length(format("row_lock_{}_operations_currently_waiting_for_lock", stat_name), stats.operations_currently_waiting_for_lock, ms::description(format("Operations currently waiting for {} lock", stat_name)))(cf)(ks), ms::make_histogram(format("row_lock_{}_waiting_time", stat_name), ms::description(format("Histogram representing time that operations spent on waiting for {} lock", stat_name)), [&stats] {return to_metrics_histogram(stats.estimated_waiting_for_lock);})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty() }); }; add_row_lock_metrics(_row_locker_stats.exclusive_row, "exclusive_row"); add_row_lock_metrics(_row_locker_stats.shared_row, "shared_row"); add_row_lock_metrics(_row_locker_stats.exclusive_partition, "exclusive_partition"); add_row_lock_metrics(_row_locker_stats.shared_partition, "shared_partition"); // View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views) if (!_schema->is_view()) { _view_stats.register_stats(); } if (uses_tablets()) { _metrics.add_group("column_family", { ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks) }); } if (!is_internal_keyspace(_schema->ks_name())) { _metrics.add_group("column_family", { ms::make_summary("read_latency_summary", ms::description("Read latency summary"), [this] {return to_metrics_summary(_stats.reads.summary());})(cf)(ks).set_skip_when_empty(), ms::make_summary("write_latency_summary", ms::description("Write latency summary"), [this] {return to_metrics_summary(_stats.writes.summary());})(cf)(ks).set_skip_when_empty(), ms::make_summary("cas_prepare_latency_summary", ms::description("CAS prepare round latency summary"), [this] {return to_metrics_summary(_stats.cas_prepare.summary());})(cf)(ks).set_skip_when_empty(), ms::make_summary("cas_propose_latency_summary", ms::description("CAS accept round latency summary"), [this] {return to_metrics_summary(_stats.cas_accept.summary());})(cf)(ks).set_skip_when_empty(), ms::make_summary("cas_commit_latency_summary", ms::description("CAS learn round latency summary"), [this] {return to_metrics_summary(_stats.cas_learn.summary());})(cf)(ks).set_skip_when_empty(), ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_histogram("cas_prepare_latency", ms::description("CAS prepare round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_prepare.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_histogram("cas_propose_latency", ms::description("CAS accept round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_accept.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_histogram("cas_commit_latency", ms::description("CAS learn round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_learn.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks) }); } } else { if (_config.enable_node_aggregated_table_metrics && !is_internal_keyspace(_schema->ks_name())) { _metrics.add_group("column_family", { ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}), ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}), ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty() }); if (uses_tablets()) { _metrics.add_group("column_family", { ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({seastar::metrics::shard_label}) }); } if (this_shard_id() == 0) { _metrics.add_group("column_family", { ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)(ms::shard_label("")) }); } } } if (uses_tablets()) { _metrics.add_group("tablets", { ms::make_gauge("count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({column_family_label, keyspace_label}) }); } } void table::deregister_metrics() { _metrics.clear(); _view_stats._metrics.clear(); } size_t compaction_group::live_sstable_count() const noexcept { return _main_sstables->size() + _maintenance_sstables->size(); } uint64_t compaction_group::live_disk_space_used() const noexcept { return _main_sstables->bytes_on_disk() + _maintenance_sstables->bytes_on_disk(); } sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() const noexcept { return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats(); } uint64_t storage_group::live_disk_space_used() const { auto cgs = const_cast(*this).compaction_groups(); return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{}); } uint64_t compaction_group::total_disk_space_used() const noexcept { return live_disk_space_used() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0), std::plus{}); } sstables::file_size_stats compaction_group::total_disk_space_used_full_stats() const noexcept { return live_disk_space_used_full_stats() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::get_file_size_stats)), sstables::file_size_stats{}, std::plus{}); } void table::rebuild_statistics() { _stats.live_disk_space_used = {}; _stats.live_sstable_count = 0; _stats.total_disk_space_used = {}; for_each_compaction_group([this] (const compaction_group& cg) { _stats.live_disk_space_used += cg.live_disk_space_used_full_stats(); _stats.total_disk_space_used += cg.total_disk_space_used_full_stats(); _stats.live_sstable_count += cg.live_sstable_count(); }); } void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept { _stats.live_disk_space_used -= cg.live_disk_space_used_full_stats(); _stats.total_disk_space_used -= cg.total_disk_space_used_full_stats(); _stats.live_sstable_count -= cg.live_sstable_count(); } future table::get_sstable_list_permit() { co_return sstable_list_permit(co_await seastar::get_units(_sstable_set_mutation_sem, 1)); } future table::sstable_list_builder::build_new_list(const sstables::sstable_set& current_sstables, sstables::sstable_set new_sstable_list, const std::vector& new_sstables, const std::vector& old_sstables) { std::unordered_set s(old_sstables.begin(), old_sstables.end()); co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100)); // add sstables from the current list into the new list except the ones that are in the old list std::vector removed_sstables; co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) { if (s.contains(tab)) { removed_sstables.push_back(tab); } else { new_sstable_list.insert(tab); } }); // add new sstables into the new list for (auto& tab : new_sstables) { new_sstable_list.insert(tab); co_await coroutine::maybe_yield(); } co_return table::sstable_list_builder::result { make_lw_shared(std::move(new_sstable_list)), std::move(removed_sstables)}; } future<> table::delete_sstables_atomically(const sstable_list_permit&, std::vector sstables_to_remove) { try { auto gh = _sstable_deletion_gate.hold(); co_await get_sstables_manager().delete_atomically(std::move(sstables_to_remove)); } catch (...) { // There is nothing more we can do here. // Any remaining SSTables will eventually be re-compacted and re-deleted. tlogger.error("Compacted SSTables deletion failed: {}. Ignored.", std::current_exception()); } } future<> table::sstable_list_builder::delete_sstables_atomically(std::vector sstables_to_remove) { return _t->delete_sstables_atomically(_permit, std::move(sstables_to_remove)); } std::vector compaction_group::unused_sstables_for_deletion(compaction::compaction_completion_desc desc) const { std::unordered_set output(desc.new_sstables.begin(), desc.new_sstables.end()); return std::ranges::to>(desc.old_sstables | std::views::filter([&output] (const sstables::shared_sstable& input_sst) { return !output.contains(input_sst); })); } std::vector compaction_group::all_sstables() const { std::vector all; auto main_sstables = _main_sstables->all(); auto maintenance_sstables = _maintenance_sstables->all(); all.reserve(main_sstables->size() + maintenance_sstables->size()); std::ranges::copy(*main_sstables, std::back_inserter(all)); std::ranges::copy(*maintenance_sstables, std::back_inserter(all)); return all; } future<> compaction_group::update_repaired_at_for_merge() { auto sstables = all_sstables(); auto sstables_repaired_at = get_sstables_repaired_at(); co_await seastar::async([&] { for (auto& sst : sstables) { thread::maybe_yield(); auto& stats = sst->get_stats_metadata(); if (stats.repaired_at > sstables_repaired_at) { auto neww = 0; auto old = sst->update_repaired_at(neww); tlogger.info("Finished repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", sst->get_filename(), old, neww, sstables_repaired_at, group_id(), token_range()); } else { auto old = stats.repaired_at; tlogger.debug("Skipped repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", sst->get_filename(), old, old, sstables_repaired_at, group_id(), token_range()); } } }); } future> table::get_compaction_group_views_for_repair(dht::token_range range) { std::vector ret; auto sgs = storage_groups_for_token_range(range); for (auto& sg : sgs) { co_await coroutine::maybe_yield(); auto cgs = sg->compaction_groups(); for (auto& cg : cgs) { ret.push_back(&cg->view_for_unrepaired_data()); } } co_return ret; } future table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db, const service::frozen_topology_guard& guard, dht::token_range range) { auto ret = compaction_reenablers_and_lock_holders(); auto views = co_await get_compaction_group_views_for_repair(range); for (auto view : views) { auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(*view); tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard); ret.cres.push_back(std::make_unique(std::move(cre))); // This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair. // The unrepaired compaction started by minor compaction does not need to take the lock since it ignores // sstables being repaired, so it can run in parallel with repair. auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(*view, "row_level_repair"); tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard); ret.lock_holders.push_back(std::move(lock_holder)); } co_return ret; } future<> table::clear_being_repaired_for_range(dht::token_range range) { auto sgs = storage_groups_for_token_range(range); for (auto& sg : sgs) { auto cgs = sg->compaction_groups(); for (auto& cg : cgs) { auto sstables = cg->all_sstables(); co_await coroutine::maybe_yield(); for (auto& sst : sstables) { co_await coroutine::maybe_yield(); if (!sst->being_repaired.uuid().is_null()) { sst->being_repaired = service::session_id(utils::UUID()); } } } } } future<> compaction_group::merge_sstables_from(compaction_group& group) { auto permit = co_await _t.get_sstable_list_permit(); table::sstable_list_builder builder(_t, std::move(permit)); auto sstables_to_merge = group.all_sstables(); // re-build new list for this group with sstables of the group being merged. auto res = co_await builder.build_new_list(*main_sstables(), make_main_sstable_set(), sstables_to_merge, {}); // execute: std::invoke([&] noexcept { set_main_sstables(std::move(res.new_sstable_set)); group.clear_sstables(); // FIXME: backlog adjustment is not exception safe. backlog_tracker_adjust_charges({}, sstables_to_merge); }); _t.rebuild_statistics(); } future<> compaction_group::update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc) { // Build a new list of _sstables: We remove from the existing list the // tables we compacted (by now, there might be more sstables flushed // later), and we add the new tables generated by the compaction. // We create a new list rather than modifying it in-place, so that // on-going reads can continue to use the old list. // // We only remove old sstables after they are successfully deleted, // to avoid a new compaction from ignoring data in the old sstables // if the deletion fails (note deletion of shared sstables can take // unbounded time, because all shards must agree on the deletion). // make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion. for (auto& sst : desc.old_sstables) { auto shards = sst->get_shards_for_this_sstable(); auto& schema = _t.schema(); if (shards.size() > 1) { throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!", schema->ks_name(), schema->cf_name(), sst->toc_filename())); } if (!belongs_to_current_shard(shards)) { throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used sstable {} which doesn't belong to this shard!", schema->ks_name(), schema->cf_name(), sst->toc_filename())); } } // Precompute before so undo_compacted_but_not_deleted can be sure not to throw std::unordered_set s( desc.old_sstables.begin(), desc.old_sstables.end()); auto& sstables_compacted_but_not_deleted = _sstables_compacted_but_not_deleted; sstables_compacted_but_not_deleted.insert(sstables_compacted_but_not_deleted.end(), desc.old_sstables.begin(), desc.old_sstables.end()); // After we are done, unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted, // or they could stay forever in the set, resulting in deleted files remaining // opened and disk space not being released until shutdown. auto undo_compacted_but_not_deleted = defer([&] { std::erase_if(sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) { return s.contains(sst); }); _t.rebuild_statistics(); }); _t.get_stats().pending_sstable_deletions++; auto undo_stats = defer([this] { _t.get_stats().pending_sstable_deletions--; }); class sstable_list_updater : public row_cache::external_updater_impl { table& _t; compaction_group& _cg; table::sstable_list_builder& _builder; const compaction::compaction_completion_desc& _desc; struct replacement_desc { compaction::compaction_completion_desc desc; table::sstable_list_builder::result main_sstable_set_builder_result; std::optional> new_maintenance_sstables; }; std::unordered_map _cg_desc; public: explicit sstable_list_updater(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d) : _t(cg._t), _cg(cg), _builder(builder), _desc(d) {} virtual future<> prepare() override { // Segregate output sstables according to their owner compaction group. // If not in splitting mode, then all output sstables will belong to the same group. for (auto& sst : _desc.new_sstables) { auto& cg = _t.compaction_group_for_sstable(sst); _cg_desc[&cg].desc.new_sstables.push_back(sst); } // The group that triggered compaction is the only one to have sstables removed from it. _cg_desc[&_cg].desc.old_sstables = _desc.old_sstables; for (auto& [cg, d] : _cg_desc) { d.main_sstable_set_builder_result = co_await _builder.build_new_list(*cg->main_sstables(), cg->make_main_sstable_set(), d.desc.new_sstables, d.desc.old_sstables); if (!d.desc.old_sstables.empty() && d.main_sstable_set_builder_result.removed_sstables.size() != d.desc.old_sstables.size()) { // Not all old_sstables were removed from the main sstable set, which implies that // they don't exist there. This can happen if the input sstables were picked up from // the maintenance set during an offstrategy or scrub compaction. So, remove the old // sstables from the maintenance set. No need to add any new sstables to the maintenance // set though, as they are always added to the main set. auto builder_result = co_await _builder.build_new_list( *cg->maintenance_sstables(), std::move(*cg->make_maintenance_sstable_set()), {}, d.desc.old_sstables); d.new_maintenance_sstables = std::move(builder_result.new_sstable_set); } } } virtual void execute() override { for (auto&& [cg, d] : _cg_desc) { cg->set_main_sstables(std::move(d.main_sstable_set_builder_result.new_sstable_set)); if (d.new_maintenance_sstables) { // offstrategy or scrub compaction - replace the maintenance set cg->set_maintenance_sstables(std::move(d.new_maintenance_sstables.value())); } } // FIXME: the following is not exception safe _t.refresh_compound_sstable_set(); for (auto& [cg, d] : _cg_desc) { cg->backlog_tracker_adjust_charges(d.main_sstable_set_builder_result.removed_sstables, d.desc.new_sstables); } } static std::unique_ptr make(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d) { return std::make_unique(cg, builder, d); } }; table::sstable_list_builder builder(_t, co_await _t.get_sstable_list_permit()); auto updater = row_cache::external_updater(sstable_list_updater::make(*this, builder, desc)); auto& cache = _t.get_row_cache(); co_await cache.invalidate(std::move(updater), std::move(desc.ranges_for_cache_invalidation)); // refresh underlying data source in row cache to prevent it from holding reference // to sstables files that are about to be deleted. cache.refresh_snapshot(); _t.rebuild_statistics(); co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc))); } future<> table::compact_all_sstables(tasks::task_info info, do_flush do_flush, bool consider_only_existing_data) { if (do_flush) { co_await flush(); } // Forces off-strategy before major, so sstables previously sitting on maintenance set will be included // in the compaction's input set, to provide same semantics as before maintenance set came into existence. co_await perform_offstrategy_compaction(info); co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) -> future<> { auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables"); co_await _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data); }); } void table::start_compaction() { set_compaction_strategy(_schema->compaction_strategy()); } void table::trigger_compaction() { for_each_compaction_group([] (compaction_group& cg) { cg.trigger_compaction(); }); } void table::try_trigger_compaction(compaction_group& cg) noexcept { try { cg.trigger_compaction(); } catch (...) { tlogger.error("Failed to trigger compaction: {}", std::current_exception()); } } void compaction_group::trigger_compaction() { // But not if we're locked out or stopping if (!_async_gate.is_closed()) { // FIXME: indentation for (auto view : all_views()) { _t._compaction_manager.submit(*view); } } } void table::trigger_offstrategy_compaction() { // Run in background. // This is safe since the the compaction task is tracked // by the compaction_manager until stop() (void)perform_offstrategy_compaction(tasks::task_info{}).then_wrapped([this] (future f) { if (f.failed()) { auto ex = f.get_exception(); tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex); } }); } future table::perform_offstrategy_compaction(tasks::task_info info) { // If the user calls trigger_offstrategy_compaction() to trigger // off-strategy explicitly, cancel the timeout based automatic trigger. _off_strategy_trigger.cancel(); bool performed = false; co_await parallel_foreach_compaction_group_view([this, &performed, info] (compaction::compaction_group_view& view) -> future<> { auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables"); performed |= co_await _compaction_manager.perform_offstrategy(view, info); }); co_return performed; } future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges, tasks::task_info info, do_flush do_flush) { auto* cg = try_get_compaction_group_with_static_sharding(); if (!cg) { co_return; } if (do_flush) { co_await flush(); } auto lock_holder = co_await get_compaction_manager().get_incremental_repair_read_lock(cg->as_view_for_static_sharding(), "perform_cleanup_compaction"); co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg->as_view_for_static_sharding(), info); } future compaction_group::estimate_pending_compactions() const { unsigned ret = 0; for (auto& view : all_views()) { ret += co_await _t.get_compaction_strategy().estimated_pending_compactions(*view); } co_return ret; } future table::estimate_pending_compactions() const { unsigned ret = 0; co_await const_cast(this)->parallel_foreach_compaction_group([&ret] (const compaction_group& cg) -> future<> { ret += co_await cg.estimate_pending_compactions(); }); co_return ret; } void compaction_group::set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept { _compaction_strategy_state = std::move(compaction_strategy_state); } void table::set_compaction_strategy(compaction::compaction_strategy_type strategy) { tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), compaction::compaction_strategy::name(strategy)); auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options()); struct compaction_group_strategy_updater { table& t; compaction_group& cg; compaction::compaction_backlog_tracker new_bt; compaction::compaction_strategy_state new_cs_state; compaction_group_strategy_updater(table& t, compaction_group& cg, compaction::compaction_strategy& new_cs) : t(t) , cg(cg) , new_bt(new_cs.make_backlog_tracker()) , new_cs_state(compaction::compaction_strategy_state::make(new_cs)) { } void prepare(compaction::compaction_strategy& new_cs) { auto move_read_charges = new_cs.type() == t._compaction_strategy.type(); cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges); std::vector new_sstables_for_backlog_tracker; new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size()); cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) { new_sstables_for_backlog_tracker.push_back(s); }); new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker)); } void execute() noexcept { // Update strategy state and backlog tracker according to new strategy. SSTable set update // is delayed until new compaction, which is triggered on strategy change. SSTable set // cannot be updated here since it must happen under the set update lock. cg.register_backlog_tracker(std::move(new_bt)); cg.set_compaction_strategy_state(std::move(new_cs_state)); } }; std::vector cg_sstable_set_updaters; for_each_compaction_group([&] (compaction_group& cg) { compaction_group_strategy_updater updater(*this, cg, new_cs); updater.prepare(new_cs); cg_sstable_set_updaters.push_back(std::move(updater)); }); // now exception safe: _compaction_strategy = std::move(new_cs); for (auto& updater : cg_sstable_set_updaters) { updater.execute(); } } size_t table::sstables_count() const { return _sstables->size(); } std::vector table::sstable_count_per_level() const { std::vector count_per_level; _sstables->for_each_sstable([&] (const sstables::shared_sstable& sst) { auto level = sst->get_sstable_level(); if (level + 1 > count_per_level.size()) { count_per_level.resize(level + 1, 0UL); } count_per_level[level]++; }); return count_per_level; } int64_t table::get_unleveled_sstables() const { // TODO: when we support leveled compaction, we should return the number of // SSTables in L0. If leveled compaction is enabled in this column family, // then we should return zero, as we currently do. return 0; } future> table::get_sstables_by_partition_key(const sstring& key) const { auto pk = partition_key::from_nodetool_style_string(_schema, key); auto dk = dht::decorate_key(*_schema, pk); auto hk = sstables::sstable::make_hashed_key(*_schema, dk.key()); auto sel = std::make_unique(get_sstable_set().make_incremental_selector()); const auto& sst = sel->select(dk).sstables; std::unordered_set ssts; for (auto s : sst) { if (co_await s->has_partition_key(hk, dk)) { ssts.insert(s); } } co_return ssts; } const sstables::sstable_set& table::get_sstable_set() const { return *_sstables; } lw_shared_ptr table::get_sstables() const { return _sstables->all(); } std::vector table::select_sstables(const dht::partition_range& range) const { return _sstables->select(range); } future<> table::drop_quarantined_sstables() { class quarantine_removal_updater : public row_cache::external_updater_impl { table& _t; std::vector& _removed; struct compaction_group_update { lw_shared_ptr new_main_sstables; lw_shared_ptr new_maintenance_sstables; std::vector removed_main_sstables; }; std::unordered_map _cg_updates; public: explicit quarantine_removal_updater(table& t, std::vector& removed) : _t(t), _removed(removed) {} virtual future<> prepare() override { _t.for_each_compaction_group([&] (compaction_group& cg) { auto new_main = make_lw_shared(cg.make_main_sstable_set()); auto new_maintenance = cg.make_maintenance_sstable_set(); std::vector removed_main; cg.main_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) { if (sst->is_quarantined()) { _removed.emplace_back(sst); removed_main.emplace_back(sst); } else { new_main->insert(sst); } }); cg.maintenance_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) { if (sst->is_quarantined()) { _removed.emplace_back(sst); } else { new_maintenance->insert(sst); } }); _cg_updates[&cg] = compaction_group_update{ .new_main_sstables = std::move(new_main), .new_maintenance_sstables = std::move(new_maintenance), .removed_main_sstables = std::move(removed_main) }; }); co_return; } virtual void execute() override { for (auto& [cg, update] : _cg_updates) { cg->set_main_sstables(std::move(update.new_main_sstables)); cg->set_maintenance_sstables(std::move(update.new_maintenance_sstables)); } _t.refresh_compound_sstable_set(); for (auto& [cg, d] : _cg_updates) { cg->get_backlog_tracker().replace_sstables(d.removed_main_sstables, {}); } } static std::unique_ptr make(table& t, std::vector& removed) { return std::make_unique(t, removed); } }; _stats.pending_sstable_deletions++; auto undo_stats = defer([this] { _stats.pending_sstable_deletions--; }); auto permit = co_await get_sstable_list_permit(); std::vector removed; auto updater = row_cache::external_updater(quarantine_removal_updater::make(*this, removed)); co_await _cache.invalidate(std::move(updater)); _cache.refresh_snapshot(); rebuild_statistics(); co_await delete_sstables_atomically(permit, std::move(removed)); } bool storage_group::no_compacted_sstable_undeleted() const { return std::ranges::all_of(compaction_groups(), [] (const_compaction_group_ptr& cg) { return cg->compacted_undeleted_sstables().empty(); }); } // Gets the list of all sstables in the column family, including ones that are // not used for active queries because they have already been compacted, but are // waiting for delete_atomically() to return. // // As long as we haven't deleted them, compaction needs to ensure it doesn't // garbage-collect a tombstone that covers data in an sstable that may not be // successfully deleted. lw_shared_ptr table::get_sstables_including_compacted_undeleted() const { bool no_compacted_undeleted_sstable = std::ranges::all_of(storage_groups() | std::views::values, std::mem_fn(&storage_group::no_compacted_sstable_undeleted)); if (no_compacted_undeleted_sstable) { return get_sstables(); } auto ret = make_lw_shared(*_sstables->all()); for_each_compaction_group([&ret] (const compaction_group& cg) { for (auto&& s: cg.compacted_undeleted_sstables()) { ret->insert(s); } }); return ret; } const std::vector& compaction_group::compacted_undeleted_sstables() const noexcept { return _sstables_compacted_but_not_deleted; } lw_shared_ptr table::make_memory_only_memtable_list() { auto get_schema = [this] { return schema(); }; return make_lw_shared(std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group, &get_compaction_manager().get_shared_tombstone_gc_state()); } lw_shared_ptr table::make_memtable_list(compaction_group& cg) { auto seal = [this, &cg] (flush_permit&& permit) -> future<> { gate::holder holder = cg.flush_gate().hold(); co_await seal_active_memtable(cg, std::move(permit)); }; auto get_schema = [this] { return schema(); }; return make_lw_shared(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group, &get_compaction_manager().get_shared_tombstone_gc_state()); } class compaction_group::compaction_group_view : public compaction::compaction_group_view { table& _t; compaction_group& _cg; // When engaged, compaction is disabled altogether on this view. std::optional _compaction_reenabler; private: bool belongs_to_this_view(const sstables::shared_sstable& sst) const { return &_cg.view_for_sstable(sst) == this; } future> make_sstable_set_for_this_view(lw_shared_ptr sstables, auto make_sstable_set_func) const { sstables::sstable_set ret = make_sstable_set_func(); auto all_sstables = sstables->all(); auto belongs_to_this_view_func = [this] (const sstables::shared_sstable& sst) { return belongs_to_this_view(sst); }; for (auto sst : *all_sstables | std::views::filter(belongs_to_this_view_func)) { ret.insert(sst); co_await coroutine::maybe_yield(); } co_return make_lw_shared(std::move(ret)); } public: explicit compaction_group_view(table& t, compaction_group& cg) : _t(t), _cg(cg) {} dht::token_range token_range() const noexcept override { return _cg.token_range(); } const schema_ptr& schema() const noexcept override { return _t.schema(); } unsigned min_compaction_threshold() const noexcept override { // During receiving stream operations, the less we compact the faster streaming is. For // bootstrap and replace thereThere are no readers so it is fine to be less aggressive with // compactions as long as we don't ignore them completely (this could create a problem for // when streaming ends) if (_t._is_bootstrap_or_replace) { auto target = std::min(_t.schema()->max_compaction_threshold(), 16); return std::max(_t.schema()->min_compaction_threshold(), target); } else { return _t.schema()->min_compaction_threshold(); } } bool compaction_enforce_min_threshold() const noexcept override { return _t.get_config().compaction_enforce_min_threshold || _t._is_bootstrap_or_replace; } future> main_sstable_set() const override { return make_sstable_set_for_this_view(_cg.main_sstables(), [this] { return _cg.make_main_sstable_set(); }); } future> maintenance_sstable_set() const override { return make_sstable_set_for_this_view(_cg.maintenance_sstables(), [this] { return *_cg.make_maintenance_sstable_set(); }); } lw_shared_ptr sstable_set_for_tombstone_gc() const override { return _t.sstable_set_for_tombstone_gc(_cg); } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return compaction::get_fully_expired_sstables(*this, sstables, query_time); } const std::vector& compacted_undeleted_sstables() const noexcept override { return _cg.compacted_undeleted_sstables(); } compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _t.get_compaction_strategy(); } compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _cg._compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _t.get_sstables_manager(); } sstables::shared_sstable make_sstable() const override { return _t.make_sstable(); } sstables::sstable_writer_config configure_writer(sstring origin) const override { auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin)); return cfg; } api::timestamp_type min_memtable_timestamp() const override { return _cg.min_memtable_timestamp(); } api::timestamp_type min_memtable_live_timestamp() const override { return _cg.min_memtable_live_timestamp(); } api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return _cg.min_memtable_live_row_marker_timestamp(); } bool memtable_has_key(const dht::decorated_key& key) const override { return _cg.memtable_has_key(key); } future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { co_await _cg.update_sstable_sets_on_compaction_completion(std::move(desc)); if (offstrategy) { _cg.trigger_compaction(); } } bool is_auto_compaction_disabled_by_user() const noexcept override { return _t.is_auto_compaction_disabled_by_user(); } bool tombstone_gc_enabled() const noexcept override { return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled(); } const tombstone_gc_state& get_tombstone_gc_state() const noexcept override { return _t.get_compaction_manager().get_tombstone_gc_state(); } compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _cg.get_backlog_tracker(); } const std::string get_group_id() const noexcept override { return fmt::format("{}", _cg.group_id()); } seastar::condition_variable& get_staging_done_condition() noexcept override { return _cg.get_staging_done_condition(); } dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return _t.get_token_range_after_split(t); } int64_t get_sstables_repaired_at() const noexcept override { return _cg.get_sstables_repaired_at(); } }; std::unique_ptr compaction_group::make_compacting_view() { auto view = std::make_unique(_t, *this); _t._compaction_manager.add(*view); return view; } std::unique_ptr compaction_group::make_non_compacting_view() { auto view = std::make_unique(_t, *this); auto reenabler = _t._compaction_manager.add_with_compaction_disabled(*view); // Attaches compaction reenabler, so this non compacting view will not be compacted. _compaction_disabler_for_views.push_back(std::move(reenabler)); return view; } compaction_group::compaction_group(table& t, size_t group_id, dht::token_range token_range, repair_classifier_func repair_classifier) : _t(t) , _unrepaired_view(make_compacting_view()) , _repairing_view(make_non_compacting_view()) , _repaired_view(make_compacting_view()) , _group_id(group_id) , _token_range(std::move(token_range)) , _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy)) , _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list()) , _main_sstables(make_lw_shared(make_main_sstable_set())) , _maintenance_sstables(make_maintenance_sstable_set()) , _async_gate(format("[compaction_group {}.{} {}]", t.schema()->ks_name(), t.schema()->cf_name(), group_id)) , _backlog_tracker(t.get_compaction_strategy().make_backlog_tracker()) , _repair_sstable_classifier(std::move(repair_classifier)) { } compaction_group_ptr compaction_group::make_empty_group(const compaction_group& base) { return make_lw_shared(base._t, base._group_id, base._token_range, base._repair_sstable_classifier); } bool compaction_group::stopped() const noexcept { return _async_gate.is_closed(); } bool compaction_group::compaction_disabled() const { return std::ranges::all_of(all_views(), [this] (compaction::compaction_group_view* view) { return _t._compaction_manager.compaction_disabled(*view); }); } compaction_group::~compaction_group() { // Unclosed group is not tolerated since it might result in an use-after-free. if (!compaction_disabled()) { on_fatal_internal_error(tlogger, format("Compaction group of id {} that belongs to {}.{} was not disabled.", _group_id, _t.schema()->ks_name(), _t.schema()->cf_name())); } } future<> compaction_group::stop(sstring reason) noexcept { if (_async_gate.is_closed()) { co_return; } // FIXME: indentation for (auto view : all_views()) { co_await _t._compaction_manager.stop_ongoing_compactions(reason, view); } co_await _async_gate.close(); auto flush_future = co_await seastar::coroutine::as_future(flush()); co_await _flush_gate.close(); // FIXME: indentation _compaction_disabler_for_views.clear(); co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s)); for (auto view : all_views()) { co_await _t._compaction_manager.remove(*view, reason); } if (flush_future.failed()) { co_await seastar::coroutine::return_exception_ptr(flush_future.get_exception()); } } bool compaction_group::empty() const noexcept { return _memtables->empty() && live_sstable_count() == 0; } const schema_ptr& compaction_group::schema() const { return _t.schema(); } void compaction_group::clear_sstables() { _main_sstables = make_lw_shared(make_main_sstable_set()); _maintenance_sstables = make_maintenance_sstable_set(); } void storage_group::clear_sstables() { for (auto cg : compaction_groups()) { cg->clear_sstables(); } } table::table(schema_ptr schema, config config, lw_shared_ptr sopts, compaction::compaction_manager& compaction_manager, sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) : _schema(std::move(schema)) , _config(std::move(config)) , _erm(std::move(erm)) , _storage_opts(std::move(sopts)) , _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()), keyspace_label(_schema->ks_name()), column_family_label(_schema->cf_name()) ) , _compaction_manager(compaction_manager) , _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options())) , _sg_manager(make_storage_group_manager()) , _sstables(make_compound_sstable_set()) , _sstable_deletion_gate(format("[table {}.{}] sstable_deletion_gate", _schema->ks_name(), _schema->cf_name())) , _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes) , _commitlog(nullptr) , _readonly(true) , _durable_writes(true) , _sstables_manager(sst_manager) , _index_manager(this->as_data_dictionary()) , _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name())) , _counter_cell_locks(_schema->is_counter() ? std::make_unique(_schema, cl_stats) : nullptr) , _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name())) , _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name())) , _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name())) , _pending_streams_phaser(format("[table {}.{}] pending_streams", _schema->ks_name(), _schema->cf_name())) , _pending_flushes_phaser(format("[table {}.{}] pending_flushes", _schema->ks_name(), _schema->cf_name())) , _row_locker(_schema) , _flush_timer([this]{ on_flush_timer(); }) , _off_strategy_trigger([this] { trigger_offstrategy_compaction(); }) { if (!_config.enable_disk_writes) { tlogger.warn("Writes disabled, column family no durable."); } recalculate_tablet_count_stats(); set_metrics(); } void table::on_flush_timer() { tlogger.debug("on table {}.{} flush timer, period {}ms", _schema->ks_name(), _schema->cf_name(), _schema->memtable_flush_period()); (void)with_gate(_async_gate, [this] { return flush().finally([this] { if (_schema->memtable_flush_period() > 0) { _flush_timer.rearm(timer::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period())); } }); }); } locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function tablet_filter) const { locator::table_load_stats table_stats; table_stats.split_ready_seq_number = _split_ready_seq_number; locator::tablet_load_stats tablet_stats; for_each_storage_group([&] (size_t id, storage_group& sg) { locator::global_tablet_id gid { _t.schema()->id(), locator::tablet_id(id) }; if (tablet_filter(*_tablet_map, gid)) { const uint64_t tablet_size = sg.live_disk_space_used(); table_stats.size_in_bytes += tablet_size; const dht::token_range trange = _tablet_map->get_token_range(gid.tablet); // Make sure the token range is in the form (a, b] SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive()); tablet_stats.tablet_sizes[gid.table][trange] = tablet_size; } }); return locator::combined_load_stats{ .table_ls = std::move(table_stats), .tablet_ls = std::move(tablet_stats) }; } locator::combined_load_stats table::table_load_stats(std::function tablet_filter) const { return _sg_manager->table_load_stats(std::move(tablet_filter)); } void tablet_storage_group_manager::handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) { auto table_id = schema()->id(); size_t old_tablet_count = old_tmap.tablet_count(); size_t new_tablet_count = new_tmap.tablet_count(); storage_group_map new_storage_groups; if (!old_tablet_count) { on_internal_error(tlogger, format("Table {} had zero tablets, it should never happen when splitting.", table_id)); } // NOTE: exception when applying replica changes to reflect token metadata will abort for obvious reasons, // so exception safety is not required here. unsigned growth_factor = log2ceil(new_tablet_count / old_tablet_count); unsigned split_size = 1 << growth_factor; tlogger.debug("Growth factor: {}, split size {}", growth_factor, split_size); if (old_tablet_count * split_size != new_tablet_count) { on_internal_error(tlogger, format("New tablet count for table {} is unexpected, actual: {}, expected {}.", table_id, new_tablet_count, old_tablet_count * split_size)); } // Stop the released main compaction groups asynchronously for (auto& [id, sg] : _storage_groups) { if (!sg->split_unready_groups_are_empty()) { on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ "therefore groups cannot be remapped with the new tablet count.", id, table_id)); } // Remove old empty groups, they're unused, but they need to be deregistered properly // FIXME: indent. for (auto cg_ptr : sg->split_unready_groups()) { auto f = cg_ptr->stop("tablet split"); if (!f.available() || f.failed()) [[unlikely]] { _stop_fut = _stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable { return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) { tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex)); }); }); } } unsigned first_new_id = id << growth_factor; auto split_ready_groups = sg->split_ready_compaction_groups(); if (split_ready_groups.size() != split_size) { on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size)); } for (unsigned i = 0; i < split_size; i++) { auto group_id = first_new_id + i; auto old_range = old_tmap.get_token_range(locator::tablet_id(id)); auto new_range = new_tmap.get_token_range(locator::tablet_id(group_id)); auto sstables_repaired_at = new_tmap.get_tablet_info(locator::tablet_id(group_id)).sstables_repaired_at; tlogger.debug("Setting sstables_repaired_at={} for split tablet_id={} old_tid={} new_tid={} old_range={} new_range={} idx={}", sstables_repaired_at, table_id, id, group_id, old_range, new_range, i); split_ready_groups[i]->update_id_and_range(group_id, new_range); new_storage_groups[group_id] = make_lw_shared(std::move(split_ready_groups[i])); } tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].", id, table_id, fmt::join(std::views::iota(first_new_id, first_new_id+split_size), ", ")); } _storage_groups = std::move(new_storage_groups); } future<> tablet_storage_group_manager::merge_completion_fiber() { co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group); while (!_t.async_gate().is_closed()) { try { co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s)); auto ks_name = schema()->ks_name(); auto cf_name = schema()->cf_name(); // Enable compaction after merge is done. auto cres = std::exchange(_compaction_reenablers_for_merging, {}); co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> { auto main_group = sg.main_compaction_group(); tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started", ks_name, cf_name, main_group->group_id(), main_group->token_range()); int nr = 0; int sz = sg.merging_groups().size(); for (auto& group : sg.merging_groups()) { tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} merging {} out of {} groups", ks_name, cf_name, main_group->group_id(), main_group->token_range(), ++nr, sz); // Synchronize with ongoing writes that might be blocked waiting for memory. // Also, disabling compaction provides stability on the sstable set. // Flushes memtable, so all the data can be moved. co_await group->stop("tablet merge"); if (utils::get_local_injector().enter("merge_completion_fiber_error")) { tlogger.info("Got merge_completion_fiber_error"); co_await sleep(std::chrono::seconds(60)); } co_await main_group->merge_sstables_from(*group); } co_await sg.remove_empty_merging_groups(); tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished", ks_name, cf_name, main_group->group_id(), main_group->token_range()); }); } catch (...) { tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name()); } utils::get_local_injector().inject("replica_merge_completion_wait", [] () { tlogger.info("Merge completion fiber finished, about to sleep"); }); co_await _merge_completion_event.wait(); tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name()); } } void tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) { auto table_id = schema()->id(); size_t old_tablet_count = old_tmap.tablet_count(); size_t new_tablet_count = new_tmap.tablet_count(); storage_group_map new_storage_groups; unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count); unsigned merge_size = 1 << log2_reduce_factor; if (merge_size != 2) { throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}", old_tablet_count, new_tablet_count, table_id)); } for (auto& [id, sg] : _storage_groups) { // Pick first (even) tablet of each sibling pair. if (id % merge_size != 0) { continue; } auto new_tid = id >> log2_reduce_factor; auto new_range = new_tmap.get_token_range(locator::tablet_id(new_tid)); auto new_cg = make_lw_shared(_t, new_tid, new_range, make_repair_sstable_classifier_func()); for (auto& view : new_cg->all_views()) { auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging"); _compaction_reenablers_for_merging.push_back(std::move(cre)); } auto new_sg = make_lw_shared(std::move(new_cg)); for (unsigned i = 0; i < merge_size; i++) { auto group_id = id + i; auto it = _storage_groups.find(group_id); if (it == _storage_groups.end()) { throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id)); } auto& sg = it->second; sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) { cg->update_id(new_tid); tlogger.debug("Adding merging_group: sstables_repaired_at={} old_range={} new_range={} old_tid={} new_tid={} old_group_id={}", cg->get_sstables_repaired_at(), cg->token_range(), new_range, cg->group_id(), new_tid, group_id); new_sg->add_merging_group(cg); }); // Cannot wait for group to be closed, since it can only return after some long-running operation // is done with it, and old erm is still held at this point. (void) with_gate(_t.async_gate(), [sg] { return sg->close().handle_exception([sg] (std::exception_ptr ex) { tlogger.warn("Failed to close storage group: {}. Ignored", std::move(ex)); }); }); } new_storage_groups[new_tid] = std::move(new_sg); } _storage_groups = std::move(new_storage_groups); _merge_completion_event.signal(); } void tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function refresh_mutation_source) { auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id()); auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map); size_t old_tablet_count = old_tablet_map->tablet_count(); size_t new_tablet_count = new_tablet_map->tablet_count(); if (new_tablet_count > old_tablet_count) { tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); } else if (new_tablet_count < old_tablet_count) { tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets", schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map); } // Allocate storage group if tablet is migrating in, or deallocate if it's migrating out. auto this_replica = locator::tablet_replica{ .host = erm.get_token_metadata().get_my_id(), .shard = this_shard_id() }; auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) { return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica; }; bool tablet_migrating_in = false; for (auto& transition : new_tablet_map->transitions()) { auto tid = transition.first; auto transition_info = transition.second; if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) { auto range = new_tablet_map->get_token_range(tid); _storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range)); tablet_migrating_in = true; } else if (_storage_groups.contains(tid.value()) && locator::is_post_cleanup(this_replica, new_tablet_map->get_tablet_info(tid), transition_info)) { // The storage group should be cleaned up and stopped at this point usually by the tablet cleanup stage, // unless the storage group was allocated after tablet cleanup was completed for this node. This could // happen if the node was restarted after tablet cleanup was run but before moving to the next stage. To // handle this case we stop the storage group here if it's not stopped already. auto sg = _storage_groups[tid.value()]; remove_storage_group(tid.value()); (void) with_gate(_t.async_gate(), [sg] { return sg->stop("tablet post-cleanup").then([sg] {}); }); } } // update the per compaction group tombstone GC enabled flag for_each_storage_group([&] (size_t group_id, storage_group& sg) { const locator::tablet_id tid = static_cast(group_id); const locator::tablet_info& tinfo = new_tablet_map->get_tablet_info(tid); const bool tombstone_gc_enabled = std::ranges::contains(tinfo.replicas, this_replica); sg.for_each_compaction_group([tombstone_gc_enabled] (const compaction_group_ptr& cg_ptr) { cg_ptr->set_tombstone_gc_enabled(tombstone_gc_enabled); }); }); // TODO: possibly use row_cache::invalidate(external_updater) instead on all ranges of new replicas, // as underlying source will be refreshed and external_updater::execute can refresh the sstable set. // Also serves as a protection for clearing the cache on the new range, although it shouldn't be a // problem as fresh node won't have any data in new range and migration cleanup invalidates the // range being moved away. if (tablet_migrating_in || old_tablet_count != new_tablet_count) { refresh_mutation_source(); } } // This function is called in the topology::transition_state::tablet_resize_finalization transition which // guarantees there is no tablet repair. The cm.stop_and_disable_compaction() ensures no compaction is running. future<> table::update_repaired_at_for_merge() { if (!uses_tablets()) { co_return; } if (utils::get_local_injector().enter("skip_update_repaired_at_for_merge")) { co_return; } auto sgs = storage_groups(); for (auto& x : sgs) { auto sg = x.second; if (sg) { auto cgs = sg->compaction_groups(); for (auto& cg : cgs) { auto cre = co_await cg->get_compaction_manager().stop_and_disable_compaction("update_repaired_at_for_merge", cg->view_for_unrepaired_data()); co_await cg->update_repaired_at_for_merge(); } } } } void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); auto refresh_mutation_source = [this] { refresh_compound_sstable_set(); _cache.refresh_snapshot(); }; if (uses_tablets()) { _sg_manager->update_effective_replication_map(*_erm, refresh_mutation_source); } if (old_erm) { old_erm->invalidate(); } recalculate_tablet_count_stats(); } void table::recalculate_tablet_count_stats() { _stats.tablet_count = calculate_tablet_count(); } int64_t table::calculate_tablet_count() const { if (!uses_tablets()) { return 0; } return _sg_manager->storage_groups().size(); } partition_presence_checker table::make_partition_presence_checker(lw_shared_ptr sstables) { auto sel = make_lw_shared(sstables->make_incremental_selector()); return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) { auto& sst = sel->select(key).sstables; if (sst.empty()) { return partition_presence_checker_result::definitely_doesnt_exist; } auto hk = sstables::sstable::make_hashed_key(*_schema, key.key()); for (auto&& s : sst) { if (s->filter_has_key(hk)) { return partition_presence_checker_result::maybe_exists; } } return partition_presence_checker_result::definitely_doesnt_exist; }; } max_purgeable_fn table::get_max_purgeable_fn_for_cache_underlying_reader() const { return [this](const dht::decorated_key& dk, ::is_shadowable is_shadowable) -> max_purgeable { auto& sg = storage_group_for_token(dk.token()); max_purgeable mp; sg.for_each_compaction_group([&dk, is_shadowable, &mp] (const compaction_group_ptr& cg) { mp.combine(cg->memtables()->get_max_purgeable(dk, is_shadowable, cg->max_seen_timestamp())); }); return mp; }; } snapshot_source table::sstables_as_snapshot_source() { return snapshot_source([this] () { auto sst_set = _sstables; return mutation_source([this, sst_set] (schema_ptr s, reader_permit permit, const dht::partition_range& r, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { auto reader = make_sstable_reader(std::move(s), std::move(permit), sst_set, r, slice, std::move(trace_state), fwd, fwd_mr); return make_compacting_reader( std::move(reader), gc_clock::now(), get_max_purgeable_fn_for_cache_underlying_reader(), _compaction_manager.get_tombstone_gc_state().with_commitlog_check_disabled(), fwd); }, [this, sst_set] { return make_partition_presence_checker(sst_set); }); }); } // define in .cc, since sstable is forward-declared in .hh table::~table() { } logalloc::occupancy_stats table::occupancy() const { logalloc::occupancy_stats res; for_each_compaction_group([&] (const compaction_group& cg) { for (auto& m : *const_cast(cg).memtables()) { res += m->region().occupancy(); } }); return res; } db::replay_position table::highest_flushed_replay_position() const { return _highest_flushed_rp; } struct manifest_json : public json::json_base { json::json_chunked_list files; manifest_json() { register_params(); } manifest_json(manifest_json&& e) { register_params(); files = std::move(e.files); } manifest_json& operator=(manifest_json&& e) { files = std::move(e.files); return *this; } private: void register_params() { add(&files, "files"); } }; future<> table::seal_snapshot(sstring jsondir, std::vector file_sets) { manifest_json manifest; for (const auto& fsp : file_sets) { for (auto& rf : *fsp) { manifest.files.push(std::move(rf)); } } auto streamer = json::stream_object(std::move(manifest)); auto jsonfile = jsondir + "/manifest.json"; tlogger.debug("Storing manifest {}", jsonfile); co_await io_check([jsondir] { return recursive_touch_directory(jsondir); }); auto f = co_await open_checked_file_dma(general_disk_error_handler, jsonfile, open_flags::wo | open_flags::create | open_flags::truncate); auto out = co_await make_file_output_stream(std::move(f)); std::exception_ptr ex; try { co_await streamer(std::move(out)); } catch (...) { ex = std::current_exception(); } if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } co_await io_check(sync_directory, std::move(jsondir)); } future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstring dir) const { auto schema_desc = schema()->describe( replica::make_schema_describe_helper(table_shards), cql3::describe_option::STMTS); auto schema_description = std::move(*schema_desc.create_statement); auto schema_file_name = dir + "/schema.cql"; auto f = co_await open_checked_file_dma(general_disk_error_handler, schema_file_name, open_flags::wo | open_flags::create | open_flags::truncate); auto out = co_await make_file_output_stream(std::move(f)); std::exception_ptr ex; auto view = managed_bytes_view(schema_description.as_managed_bytes()); try { for (auto&& fragment : fragment_range(view)) { auto sv = to_string_view(fragment); co_await out.write(sv.data(), sv.size()); } co_await out.flush(); } catch (...) { ex = std::current_exception(); } co_await out.close(); if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } } // Runs the orchestration code on an arbitrary shard to balance the load. future<> table::snapshot_on_all_shards(sharded& sharded_db, const global_table_ptr& table_shards, sstring name) { auto* so = std::get_if(&table_shards->get_storage_options().value); if (so == nullptr) { throw std::runtime_error("Snapshotting non-local tables is not implemented"); } if (so->dir.empty()) { // virtual tables don't have initialized local storage co_return; } auto jsondir = (so->dir / sstables::snapshots_dir / name).native(); auto orchestrator = std::hash()(jsondir) % smp::count; co_await smp::submit_to(orchestrator, [&] () -> future<> { auto& t = *table_shards; auto s = t.schema(); tlogger.debug("Taking snapshot of {}.{}: directory={}", s->ks_name(), s->cf_name(), jsondir); std::vector file_sets; file_sets.reserve(smp::count); co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); }); co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> { file_sets.emplace_back(co_await smp::submit_to(shard, [&] { return table_shards->take_snapshot(jsondir); })); }); co_await io_check(sync_directory, jsondir); co_await t.finalize_snapshot(table_shards, std::move(jsondir), std::move(file_sets)); }); } future table::take_snapshot(sstring jsondir) { tlogger.trace("take_snapshot {}", jsondir); auto sstable_deletion_guard = co_await get_sstable_list_permit(); auto tables = *_sstables->all() | std::ranges::to>(); auto table_names = std::make_unique>(); co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) { table_names->insert(sstable->component_basename(sstables::component_type::Data)); return io_check([sstable, &dir = jsondir] { return sstable->snapshot(dir); }); }); co_return make_foreign(std::move(table_names)); } future<> table::finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector file_sets) { std::exception_ptr ex; tlogger.debug("snapshot {}: writing schema.cql", jsondir); co_await write_schema_as_cql(table_shards, jsondir).handle_exception([&] (std::exception_ptr ptr) { tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr); ex = std::move(ptr); }); tlogger.debug("snapshot {}: seal_snapshot", jsondir); co_await seal_snapshot(jsondir, std::move(file_sets)).handle_exception([&] (std::exception_ptr ptr) { tlogger.error("Failed to seal snapshot in {}: {}.", jsondir, ptr); ex = std::move(ptr); }); if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } } future table::snapshot_exists(sstring tag) { auto* so = std::get_if(&_storage_opts->value); if (so == nullptr || so->dir.empty()) { co_return false; // Technically it doesn't as snapshots only work for local storage } sstring jsondir = (so->dir / sstables::snapshots_dir / tag).native(); bool exists = false; try { auto sd = co_await io_check(file_stat, jsondir, follow_symlink::no); if (sd.type != directory_entry_type::directory) { throw std::error_code(ENOTDIR, std::system_category()); } exists = true; } catch (std::system_error& e) { if (e.code() != std::error_code(ENOENT, std::system_category())) { throw; } } co_return exists; } future> table::get_snapshot_details() { return seastar::async([this] { std::unordered_map all_snapshots; auto* so = std::get_if(&_storage_opts->value); if (so == nullptr || so->dir.empty()) { return all_snapshots; } auto datadirs = _sstables_manager.get_local_directories(*so); for (auto& datadir : datadirs) { fs::path snapshots_dir = datadir / sstables::snapshots_dir; auto file_exists = io_check([&snapshots_dir] { return seastar::file_exists(snapshots_dir.native()); }).get(); if (!file_exists) { continue; } auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of()); while (auto de = lister.get().get()) { auto snapshot_name = de->name; all_snapshots.emplace(snapshot_name, snapshot_details()); auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get(); auto& sd = all_snapshots.at(snapshot_name); sd.total += details.total; sd.live += details.live; } } return all_snapshots; }); } future table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) { table::snapshot_details details{}; std::optional staging_dir = snapshot_dir / sstables::staging_dir; if (!co_await file_exists(staging_dir->native())) { staging_dir.reset(); } auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of()); while (auto de = co_await lister.get()) { const auto& name = de->name; // FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below. // See https://github.com/scylladb/seastar/pull/3163 auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no); auto size = sd.allocated_size; // The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable. // // All the others should just generate an exception: there is something wrong, so don't blindly // add it to the size. if (name != "manifest.json" && name != "schema.cql") { details.total += size; if (sd.number_of_links == 1) { // File exists only in the snapshot directory. details.live += size; continue; } // If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot // So check the datadir for the file too. } else { continue; } auto exists_in_dir = [&] (fs::path path) -> future { try { // File exists in the main SSTable directory. Snapshots are not contributing to size auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no); // File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name. if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) { dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]", (datadir / name).native(), psd.device_id, psd.inode_number, psd.size, (snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size); co_return false; } co_return true; } catch (std::system_error& e) { if (e.code() != std::error_code(ENOENT, std::system_category())) { throw; } co_return false; } }; // Check staging dir first, as files might be moved from there to the datadir concurrently to this check if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) && !co_await exists_in_dir(datadir / name)) { details.live += size; } } co_return details; } future<> compaction_group::flush() noexcept { try { return _memtables->flush(); } catch (...) { return current_exception_as_future<>(); } } future<> storage_group::flush() noexcept { for (auto& cg : compaction_groups()) { co_await cg->flush(); } } bool compaction_group::can_flush() const { return _memtables->can_flush(); } lw_shared_ptr& compaction_group::memtables() noexcept { return _memtables; } size_t compaction_group::memtable_count() const noexcept { return _memtables->size(); } size_t storage_group::memtable_count() const { return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{}); } future<> table::flush(std::optional pos) { if (pos && *pos < _flush_rp) { co_return; } // There is nothing to flush if the table was stopped. if (_pending_flushes_phaser.is_closed()) { co_return; } auto op = _pending_flushes_phaser.start(); auto fp = _highest_rp; co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::flush)); _flush_rp = std::max(_flush_rp, fp); } bool storage_group::can_flush() const { return std::ranges::any_of(compaction_groups(), std::mem_fn(&compaction_group::can_flush)); } bool table::can_flush() const { return std::ranges::any_of(storage_groups() | std::views::values, std::mem_fn(&storage_group::can_flush)); } future<> compaction_group::clear_memtables() { if (_t.commitlog()) { for (auto& t : *_memtables) { _t.commitlog()->discard_completed_segments(_t.schema()->id(), t->get_and_discard_rp_set()); } } auto old_memtables = _memtables->clear_and_add(); for (auto& smt : old_memtables) { co_await smt->clear_gently(); } } future<> table::clear() { auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits(); co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::clear_memtables)); co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ })); } bool storage_group::compaction_disabled() const { // Compaction group that has been stopped will be excluded, since the group will not be available for a caller // to disable compaction explicitly on it, e.g. on truncate, and the caller might want to perform a check // that compaction was disabled on all groups. Stopping a group is equivalent to disabling compaction on it. return std::ranges::all_of(compaction_groups() | std::views::filter(std::not_fn(&compaction_group::stopped)), [] (const_compaction_group_ptr& cg) { return cg->compaction_disabled(); }); } // NOTE: does not need to be futurized, but might eventually, depending on // if we implement notifications, whatnot. future table::discard_sstables(db_clock::time_point truncated_at) { // truncate_table_on_all_shards() disables compaction for the truncated // tables and views, so we normally expect compaction to be disabled on // this table. But as shown in issue #17543, it is possible that a new // materialized view was created right after truncation started, and it // would not have compaction disabled when this function is called on it. if (!schema()->is_view()) { // Check if the storage groups have compaction disabled, but also check if they have been stopped. // This is to avoid races with tablet cleanup which stops the storage group, and then stops the // compaction groups. We could have a situation where compaction couldn't have been disabled by // truncate because the storage group has been stopped, but the compaction groups have not yet been stopped. auto compaction_disabled = std::ranges::all_of(storage_groups() | std::views::values, [] (const storage_group_ptr& sgp) { return sgp->async_gate().is_closed() || sgp->compaction_disabled(); }); if (!compaction_disabled) { utils::on_internal_error(fmt::format("compaction not disabled on table {}.{} during TRUNCATE", schema()->ks_name(), schema()->cf_name())); } } db::replay_position rp; struct removed_sstable { compaction_group& cg; sstables::shared_sstable sst; replica::enable_backlog_tracker enable_backlog_tracker; }; std::vector remove; _stats.pending_sstable_deletions++; auto undo_stats = defer([this] { _stats.pending_sstable_deletions--; }); auto permit = co_await get_sstable_list_permit(); co_await _cache.invalidate(row_cache::external_updater([this, &rp, &remove, truncated_at] { // FIXME: the following isn't exception safe. for_each_compaction_group([&] (compaction_group& cg) { auto pruned = make_lw_shared(cg.make_main_sstable_set()); auto maintenance_pruned = cg.make_maintenance_sstable_set(); auto prune = [&] (lw_shared_ptr& pruned, const lw_shared_ptr& pruning, replica::enable_backlog_tracker enable_backlog_tracker) mutable { pruning->for_each_sstable([&] (const sstables::shared_sstable& p) mutable { if (p->max_data_age() <= truncated_at) { if (p->originated_on_this_node().value_or(false) && p->get_stats_metadata().position.shard_id() == this_shard_id()) { rp = std::max(p->get_stats_metadata().position, rp); } remove.emplace_back(removed_sstable{cg, p, enable_backlog_tracker}); return; } pruned->insert(p); }); }; prune(pruned, cg.main_sstables(), enable_backlog_tracker::yes); prune(maintenance_pruned, cg.maintenance_sstables(), enable_backlog_tracker::no); cg.set_main_sstables(std::move(pruned)); cg.set_maintenance_sstables(std::move(maintenance_pruned)); }); refresh_compound_sstable_set(); tlogger.debug("cleaning out row cache"); })); rebuild_statistics(); std::vector del; del.reserve(remove.size()); for (auto& r : remove) { if (r.enable_backlog_tracker) { remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst); } erase_sstable_cleanup_state(r.sst); del.emplace_back(r.sst); }; co_await delete_sstables_atomically(permit, std::move(del)); co_return rp; } void table::mark_ready_for_writes(db::commitlog* cl) { if (!_readonly) { on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name())); } if (_config.enable_commitlog) { _commitlog = cl; } _readonly = false; } db::commitlog* table::commitlog() const { if (_readonly) [[unlikely]] { on_internal_error(dblog, ::format("table {}.{} is readonly", _schema->ks_name(), _schema->cf_name())); } return _commitlog; } void table::set_schema(schema_ptr s) { SCYLLA_ASSERT(s->is_counter() == _schema->is_counter()); tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}", _schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version()); _flush_timer.cancel(); for_each_compaction_group([&] (compaction_group& cg) { for (auto& m: *cg.memtables()) { m->set_schema(s); } }); _cache.set_schema(s); if (_counter_cell_locks) { _counter_cell_locks->set_schema(s); } _schema = std::move(s); for (auto&& v : _views) { v->view_info()->reset_view_info(); if (auto reverse_schema = local_schema_registry().get_or_null(reversed(v->version()))) { reverse_schema->view_info()->reset_view_info(); } } set_compaction_strategy(_schema->compaction_strategy()); trigger_compaction(); if (_schema->memtable_flush_period() > 0) { _flush_timer.rearm(timer::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period())); } } static std::vector::iterator find_view(std::vector& views, const view_ptr& v) { return std::find_if(views.begin(), views.end(), [&v] (auto&& e) { return e->id() == v->id(); }); } void table::add_or_update_view(view_ptr v) { auto existing = find_view(_views, v); if (existing != _views.end()) { *existing = std::move(v); } else { _views.push_back(std::move(v)); } } void table::remove_view(view_ptr v) { auto existing = find_view(_views, v); if (existing != _views.end()) { _views.erase(existing); } } void table::clear_views() { _views.clear(); } const std::vector& table::views() const { return _views; } std::vector table::affected_views(shared_ptr gen, const schema_ptr& base, const mutation& update) const { //FIXME: Avoid allocating a vector here; consider returning the boost iterator. return std::ranges::to>(_views | std::views::filter([&] (auto&& view) { return db::view::partition_key_matches(gen->get_db().as_data_dictionary(), *base, *view->view_info(), update.decorated_key()); })); } /** * Shard-local locking of clustering rows or entire partitions of the base * table during a Materialized-View read-modify-update: * * Consider that two concurrent base-table updates set column C, a column * added to a view's primary key, to two different values - V1 and V2. * Say that that before the updates, C's value was V0. Both updates may remove * from the view the old row with V0, one will add a view row with V1 and the * second will add a view row with V2, and we end up with two rows, with the * two different values, instead of just one row with the last value. * * The solution is to lock the base row which we read to ensure atomic read- * modify-write to the view table: Under one locked section, the row with V0 * is deleted and a new one with V1 is created, and then under a second locked * section the row with V1 is deleted and a new one with V2 is created. * Note that the lock is node-local (and in fact shard-local) and the locked * section doesn't include the view table modifications - it includes just the * read and the creation of the update commands - commands which will * eventually be sent to the view replicas. * * We need to lock a base-table row even if an update does not modify the * view's new key column C: Consider an update that only updates a non-key * column (but also in the view) D. We still need to read the current base row * to retrieve the view row's current key (column C), and then write the * modification to *that* view row. Having several such modifications in * parallel is fine. What is not fine is to have in parallel a modification * of the value of C. So basically we need a reader-writer lock (a.k.a. * shared-exclusive lock) on base rows: * 1. Updates which do not modify the view's key column take a reader lock * on the base row. * 2. Updates which do modify the view's key column take a writer lock. * * Further complicating matters is that some operations involve multiple * base rows - such as a deletion of an entire partition or a range of rows. * In that case, we should lock the entire partition, and forbid parallel * work on the same partition or one of its rows. We can do this with a * read-writer lock on base partitions: * 1. Before we lock a row (as described above), we lock its partition key * with the reader lock. * 2. When an operation involves an entire partition (or range of rows), * we lock the partition key with a writer lock. * * If an operation involves only a range of rows, not an entire partition, * we could in theory lock only this range and not an entire partition. * However, we expect this case to be rare enough to not care about and we * currently just lock the entire partition. * * If a base table has *multiple* views, we still read the base table row * only once, and have to keep a lock around this read and all the view * updates generation. This lock needs to be the strictest of the above - * i.e., if a column is modified which is not part of one view's key but is * part of a second view's key - we should lock the base row with the * stricter writer lock, not a reader lock. */ future table::local_base_lock( const schema_ptr& s, const dht::decorated_key& pk, const query::clustering_row_ranges& rows, db::timeout_clock::time_point timeout) const { // FIXME: Optimization: // Below we always pass "true" to the lock functions and take an exclusive // lock on the affected row or partition. But as explained above, if all // the modified columns are not key columns in *any* of the views, and // shared lock is enough. We should test for this case and pass false. // This will allow more parallelism in concurrent modifications to the // same row - probably not a very urgent case. _row_locker.upgrade(s); if (rows.size() == 1 && rows[0].is_singular() && rows[0].start() && !rows[0].start()->value().is_empty(*s)) { // A single clustering row is involved. return _row_locker.lock_ck(pk, rows[0].start()->value(), true, timeout, _row_locker_stats); } else { // More than a single clustering row is involved. Most commonly it's // the entire partition, so let's lock the entire partition. We could // lock less than the entire partition in more elaborate cases where // just a few individual rows are involved, or row ranges, but we // don't think this will make a practical difference. return _row_locker.lock_pk(pk, true, timeout, _row_locker_stats); } } const ssize_t new_reader_base_cost{16 * 1024}; size_t table::estimate_read_memory_cost() const { return new_reader_base_cost; } void table::set_hit_rate(locator::host_id addr, cache_temperature rate) { auto& e = _cluster_cache_hit_rates[addr]; e.rate = rate; e.last_updated = lowres_clock::now(); } table::cache_hit_rate table::get_my_hit_rate() const { return cache_hit_rate { _global_cache_hit_rate, lowres_clock::now()}; } table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, locator::host_id addr) { if (gossiper.my_host_id() == addr) { return get_my_hit_rate(); } auto it = _cluster_cache_hit_rates.find(addr); if (it == _cluster_cache_hit_rates.end()) { // no data yet, get it from the gossiper auto eps = gossiper.get_endpoint_state_ptr(addr); if (eps) { auto* state = eps->get_application_state_ptr(gms::application_state::CACHE_HITRATES); float f = -1.0f; // missing state means old node if (state) { const auto me = format("{}.{}", _schema->ks_name(), _schema->cf_name()); const auto& value = state->value(); const auto i = value.find(me); if (i != sstring::npos) { f = strtof(&value[i + me.size() + 1], nullptr); } else { f = 0.0f; // empty state means that node has rebooted } set_hit_rate(addr, cache_temperature(f)); return cache_hit_rate{cache_temperature(f), lowres_clock::now()}; } } return cache_hit_rate {cache_temperature(0.0f), lowres_clock::now()}; } else { return it->second; } } void table::drop_hit_rate(locator::host_id addr) { _cluster_cache_hit_rates.erase(addr); } void table::check_valid_rp(const db::replay_position& rp) const { if (rp != db::replay_position() && rp < _lowest_allowed_rp) { throw mutation_reordered_with_truncate_exception(); } } db::replay_position table::set_low_replay_position_mark() { _lowest_allowed_rp = _highest_rp; return _lowest_allowed_rp; } template void table::do_apply(compaction_group& cg, db::rp_handle&& h, Args&&... args) { utils::latency_counter lc; _stats.writes.set_latency(lc); db::replay_position rp = h; check_valid_rp(rp); try { cg.memtables()->active_memtable().apply(std::forward(args)..., std::move(h)); _highest_rp = std::max(_highest_rp, rp); } catch (...) { _failed_counter_applies_to_memtable++; throw; } _stats.writes.mark(lc); } future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point timeout) { if (_virtual_writer) [[unlikely]] { return (*_virtual_writer)(freeze(m)); } auto& cg = compaction_group_for_token(m.token()); auto holder = cg.async_gate().hold(); return dirty_memory_region_group().run_when_memory_available([this, &m, h = std::move(h), &cg, holder = std::move(holder)] () mutable { do_apply(cg, std::move(h), m); }, timeout); } template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&); future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) { if (_virtual_writer) [[unlikely]] { return (*_virtual_writer)(m); } auto& cg = compaction_group_for_key(m.key(), m_schema); auto holder = cg.async_gate().hold(); return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable { do_apply(cg, std::move(h), m, m_schema); }, timeout); } template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&); future<> write_memtable_to_sstable(mutation_reader reader, memtable& mt, sstables::shared_sstable sst, size_t estimated_partitions, sstables::write_monitor& monitor, sstables::sstable_writer_config& cfg) { cfg.replay_position = mt.replay_position(); cfg.monitor = &monitor; cfg.origin = "memtable"; schema_ptr s = reader.schema(); return sst->write_components(std::move(reader), estimated_partitions, s, cfg, mt.get_encoding_stats()); } future<> write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) { auto cfg = sst->manager().configure_writer("memtable"); auto monitor = replica::permit_monitor(make_lw_shared(sstable_write_permit::unconditional())); auto semaphore = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable", reader_concurrency_semaphore::register_metrics::no); std::exception_ptr ex; try { auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {}); auto reader = mt.make_flush_reader(mt.schema(), std::move(permit)); co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg); } catch (...) { ex = std::current_exception(); } co_await semaphore.stop(); if (ex) { std::rethrow_exception(std::move(ex)); } } future> table::query(schema_ptr query_schema, reader_permit permit, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& partition_ranges, tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter, db::timeout_clock::time_point timeout, std::optional* saved_querier) { if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { co_return make_lw_shared(); } const auto table_async_gate_holder = _async_gate.hold(); utils::latency_counter lc; _stats.reads.set_latency(lc); auto finally = defer([&] () noexcept { _stats.reads.mark(lc); }); const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await (opts.request == query::result_request::only_digest ? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed) : memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed)); query_state qs(query_schema, cmd, opts, partition_ranges, std::move(accounter)); std::optional querier_opt; if (saved_querier) { querier_opt = std::move(*saved_querier); } co_await utils::get_local_injector().inject("replica_query_wait", [&] (auto& handler) -> future<> { auto table_name = handler.template get("table"); if (table_name && *table_name == _schema->cf_name()) { tlogger.info("replica_query_wait: waiting"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes(5)); } }); while (!qs.done()) { auto&& range = *qs.current_partition_range++; if (!querier_opt) { querier_base::querier_config conf(_config.tombstone_warn_threshold); querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf); } auto& q = *querier_opt; std::exception_ptr ex; try { co_await q.consume_page(query_result_builder(*query_schema, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, trace_state); } catch (...) { ex = std::current_exception(); } if (ex || !qs.done()) { co_await q.close(); querier_opt = {}; } if (ex) { co_return coroutine::exception(std::move(ex)); } } std::optional last_pos; if (querier_opt && querier_opt->current_position()) { last_pos.emplace(*querier_opt->current_position()); } if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) { co_await querier_opt->close(); querier_opt = {}; } if (saved_querier) { *saved_querier = std::move(querier_opt); } co_return make_lw_shared(qs.builder.build(std::move(last_pos))); } future table::mutation_query(schema_ptr query_schema, reader_permit permit, const query::read_command& cmd, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, db::timeout_clock::time_point timeout, bool tombstone_gc_enabled, std::optional* saved_querier) { if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { co_return reconcilable_result(); } const auto table_async_gate_holder = _async_gate.hold(); std::optional querier_opt; if (saved_querier) { querier_opt = std::move(*saved_querier); } if (!querier_opt) { auto tombstone_gc_state = tombstone_gc_enabled ? get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc(); querier_base::querier_config conf(_config.tombstone_warn_threshold); querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, tombstone_gc_state, conf); } auto& q = *querier_opt; std::exception_ptr ex; try { auto rrb = reconcilable_result_builder(*query_schema, cmd.slice, std::move(accounter)); auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, trace_state); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { co_await q.close(); querier_opt = {}; } if (saved_querier) { *saved_querier = std::move(querier_opt); } co_return r; } catch (...) { ex = std::current_exception(); } co_await q.close(); co_return coroutine::exception(std::move(ex)); } mutation_source table::as_mutation_source() const { return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { return this->make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); }); } void table::add_coordinator_read_latency(utils::estimated_histogram::duration latency) { _stats.estimated_coordinator_read.add(std::chrono::duration_cast(latency).count()); } std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double percentile) { if (_cached_percentile != percentile || lowres_clock::now() - _percentile_cache_timestamp > 1s) { _percentile_cache_timestamp = lowres_clock::now(); _cached_percentile = percentile; _percentile_cache_value = std::max(_stats.estimated_coordinator_read.percentile(percentile) / 1000, int64_t(1)) * 1ms; _stats.estimated_coordinator_read *= 0.9; // decay values a little to give new data points more weight } return _percentile_cache_value; } void table::enable_auto_compaction() { // FIXME: unmute backlog. turn table backlog back on. // see table::disable_auto_compaction() notes. _compaction_disabled_by_user = false; trigger_compaction(); } future<> table::disable_auto_compaction() { // FIXME: mute backlog. When we disable background compactions // for the table, we must also disable current backlog of the // table compaction strategy that contributes to the scheduling // group resources prioritization. // // There are 2 possibilities possible: // - there are no ongoing background compaction, and we can freely // mute table backlog. // - there are compactions happening. than we must decide either // we want to allow them to finish not allowing submitting new // compactions tasks, or we may "suspend" them until the bg // compactions will be enabled back. This is not a worst option // because it will allow bg compactions to finish if there are // unused resourced, it will not lose any writers/readers stats. // // Besides that: // - there are major compactions that additionally uses constant // size backlog of shares, // - sstables rewrites tasks that do the same. // // Setting NullCompactionStrategy is not an option due to the // following reasons: // - it will 0 backlog if suspending current compactions is not an // option // - it will break computation of major compaction descriptor // for new submissions _compaction_disabled_by_user = true; return with_gate(_async_gate, [this] { return parallel_foreach_compaction_group_view([this] (compaction::compaction_group_view& view) { return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &view, compaction::compaction_type::Compaction); }); }); } void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { _tombstone_gc_enabled = tombstone_gc_enabled; tlogger.info0("Tombstone GC was {} for {}.{}", tombstone_gc_enabled ? "enabled" : "disabled", _schema->ks_name(), _schema->cf_name()); if (_tombstone_gc_enabled) { trigger_compaction(); } } mutation_reader table::make_mutation_reader_excluding_staging(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { std::vector readers; add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) { readers.reserve(memtable_count + 1); }); static const sstables::sstable_predicate excl_staging_predicate = [] (const sstables::sstable& sst) { return !sst.requires_view_building(); }; readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } future<> table::move_sstables_from_staging(std::vector sstables) { auto permit = co_await get_sstable_list_permit(); sstables::delayed_commit_changes delay_commit; std::unordered_set compaction_groups_to_notify; for (auto sst : sstables) { try { // Off-strategy can happen in parallel to view building, so the SSTable may be deleted already if the former // completed first. // The sstable list permit prevents list update on off-strategy completion and move_sstables_from_staging() // from stepping on each other's toe. co_await sst->change_state(sstables::sstable_state::normal, &delay_commit); auto& cg = compaction_group_for_sstable(sst); if (get_compaction_manager().requires_cleanup(cg.view_for_sstable(sst), sst)) { compaction_groups_to_notify.insert(&cg); } // If view building finished faster, SSTable with repair origin still exists. // It can also happen the SSTable is not going through reshape, so it doesn't have a repair origin. // That being said, we'll only add this SSTable to tracker if its origin is other than repair. // Otherwise, we can count on off-strategy completion to add it when updating lists. if (sst->get_origin() != sstables::repair_origin) { add_sstable_to_backlog_tracker(cg.get_backlog_tracker(), sst); } } catch (...) { tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception()); throw; } } co_await delay_commit.commit(); for (auto* cg : compaction_groups_to_notify) { cg->get_staging_done_condition().broadcast(); } // Off-strategy timer will be rearmed, so if there's more incoming data through repair / streaming, // the timer can be updated once again. In practice, it allows off-strategy compaction to kick off // at the end of the node operation on behalf of this table, which brings more efficiency in terms // of write amplification. do_update_off_strategy_trigger(); } /** * Given an update for the base table, calculates the set of potentially affected views, * generates the relevant updates, and sends them to the paired view replicas. */ future table::push_view_replica_updates(shared_ptr gen, const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const { //FIXME: Avoid unfreezing here. auto m = fm.unfreeze(s); return push_view_replica_updates(std::move(gen), s, std::move(m), timeout, std::move(tr_state), sem); } future table::do_push_view_replica_updates(shared_ptr gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, query::partition_slice::option_set custom_opts) const { schema_ptr base = schema(); m.upgrade(base); gc_clock::time_point now = gc_clock::now(); utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] { now -= 10s; }); if (!db::view::should_generate_view_updates_on_this_shard(base, get_effective_replication_map(), m.token())) { // This could happen if we are a pending replica. // A pending replica may have incomplete data, and building view updates could result // in wrong updates. Therefore we don't send updates from a pending replica. Instead, the // base replicas send the updates to the view replicas, including pending replicas. co_return row_locker::lock_holder(); } auto views = affected_views(gen, base, m); if (views.empty()) { co_return row_locker::lock_holder(); } auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(gen->get_db().as_data_dictionary(), *base, m.decorated_key(), m.partition(), views); const bool need_regular = !cr_ranges.empty(); const bool need_static = db::view::needs_static_row(m.partition(), views); if (!need_regular && !need_static) { tracing::trace(tr_state, "View updates do not require read-before-write"); co_await gen->generate_and_propagate_view_updates(*this, base, sem.make_tracking_only_permit(s, "push-view-updates-no-read-before-write", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now, timeout); // In this case we are not doing a read-before-write, just a // write, so no lock is needed. co_return row_locker::lock_holder(); } // We read whole sets of regular and/or static columns in case the update now causes a base row to pass // a view's filters, and a view happens to include columns that have no value in this update. // Also, one of those columns can determine the lifetime of the base row, if it has a TTL. query::column_id_vector static_columns; query::column_id_vector regular_columns; if (need_regular) { std::ranges::copy(base->regular_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(regular_columns)); } if (need_static) { std::ranges::copy(base->static_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(static_columns)); } query::partition_slice::option_set opts; opts.set(query::partition_slice::option::send_partition_key); opts.set_if(need_regular); opts.set_if(need_static && !need_regular); opts.set_if(need_static); opts.set(query::partition_slice::option::send_timestamp); opts.set(query::partition_slice::option::send_ttl); opts.add(custom_opts); auto slice = query::partition_slice( std::move(cr_ranges), std::move(static_columns), std::move(regular_columns), std::move(opts), { }, query::max_rows); // Take the shard-local lock on the base-table row or partition as needed. // We'll return this lock to the caller, which will release it after // writing the base-table update. future lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout); co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout); auto lock = co_await std::move(lockf); auto pk = dht::partition_range::make_singular(m.decorated_key()); auto permit = co_await sem.obtain_permit(base, "push-view-updates-read-before-write", estimate_read_memory_cost(), timeout, tr_state); auto reader = source.make_mutation_reader(base, permit, pk, slice, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); co_await gen->generate_and_propagate_view_updates(*this, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now, timeout); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); // return the local partition/row lock we have taken so it // remains locked until the caller is done modifying this // partition/row and destroys the lock object. co_return std::move(lock); } future table::push_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const { return do_push_view_replica_updates(std::move(gen), s, std::move(m), timeout, as_mutation_source(), std::move(tr_state), sem, {}); } future table::stream_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, std::vector& excluded_sstables) const { return do_push_view_replica_updates( std::move(gen), s, std::move(m), timeout, as_mutation_source_excluding_staging(), tracing::trace_state_ptr(), *_config.streaming_read_concurrency_semaphore, query::partition_slice::option_set::of()); } mutation_source table::as_mutation_source_excluding_staging() const { return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { return this->make_mutation_reader_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); }); } std::vector table::select_memtables_as_mutation_sources(dht::token token) const { auto& sg = storage_group_for_token(token); std::vector mss; mss.reserve(sg.memtable_count()); for (auto& cg : sg.compaction_groups()) { for (auto& mt : *cg->memtables()) { mss.emplace_back(mt->as_data_source()); } } return mss; } compaction::compaction_backlog_tracker& compaction_group::get_backlog_tracker() { return *_backlog_tracker; } void compaction_group::register_backlog_tracker(compaction::compaction_backlog_tracker new_backlog_tracker) { _backlog_tracker.emplace(std::move(new_backlog_tracker)); get_compaction_manager().register_backlog_tracker(*_backlog_tracker); } compaction::compaction_manager& compaction_group::get_compaction_manager() noexcept { return _t.get_compaction_manager(); } const compaction::compaction_manager& compaction_group::get_compaction_manager() const noexcept { return _t.get_compaction_manager(); } compaction::compaction_group_view& compaction_group::as_view_for_static_sharding() const { return view_for_unrepaired_data(); } compaction::compaction_group_view& compaction_group::view_for_unrepaired_data() const { return *_unrepaired_view; } compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const { switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) { case repair_sstable_classification::unrepaired: return *_unrepaired_view; case repair_sstable_classification::repairing: return *_repairing_view; case repair_sstable_classification::repaired: return *_repaired_view; } std::unreachable(); } utils::small_vector compaction_group::all_views() const { utils::small_vector ret; ret.push_back(_unrepaired_view.get()); ret.push_back(_repairing_view.get()); ret.push_back(_repaired_view.get()); return ret; } compaction_group* table::try_get_compaction_group_with_static_sharding() const { if (!uses_static_sharding()) { return nullptr; } return get_compaction_group(0); } compaction::compaction_group_view& table::try_get_compaction_group_view_with_static_sharding() const { auto* cg = try_get_compaction_group_with_static_sharding(); if (!cg) { throw std::runtime_error("Getting table state is allowed only with static sharding"); } return cg->as_view_for_static_sharding(); } future<> table::parallel_foreach_compaction_group_view(std::function(compaction::compaction_group_view&)> action) { return parallel_foreach_compaction_group([action = std::move(action)] (compaction_group& cg) -> future<> { for (auto view : cg.all_views()) { co_await action(*view); } }); } compaction::compaction_group_view& table::compaction_group_view_for_sstable(const sstables::shared_sstable& sst) const { auto& cg = compaction_group_for_sstable(sst); return cg.view_for_sstable(sst); } data_dictionary::table table::as_data_dictionary() const { static constinit data_dictionary_impl _impl; return _impl.wrap(*this); } bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) { auto& cg = compaction_group_for_sstable(sst); return get_compaction_manager().erase_sstable_cleanup_state(cg.as_view_for_static_sharding(), sst); } bool table::requires_cleanup(const sstables::shared_sstable& sst) const { auto& cg = compaction_group_for_sstable(sst); return get_compaction_manager().requires_cleanup(cg.as_view_for_static_sharding(), sst); } bool table::requires_cleanup(const sstables::sstable_set& set) const { return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) { auto& cg = compaction_group_for_sstable(sst); return stop_iteration(_compaction_manager.requires_cleanup(cg.as_view_for_static_sharding(), sst)); })); } future<> compaction_group::cleanup() { class compaction_group_cleaner : public row_cache::external_updater_impl { table& _t; compaction_group& _cg; const lw_shared_ptr _empty_main_set; const lw_shared_ptr _empty_maintenance_set; private: lw_shared_ptr empty_sstable_set() const { return make_lw_shared(_cg.make_main_sstable_set()); } public: explicit compaction_group_cleaner(compaction_group& cg) : _t(cg._t) , _cg(cg) , _empty_main_set(empty_sstable_set()) , _empty_maintenance_set(empty_sstable_set()) { } virtual future<> prepare() override { // Capture SSTables after flush, and with compaction disabled, to avoid missing any. auto set = _cg.make_sstable_set(); auto& sstables_compacted_but_not_deleted = _cg._sstables_compacted_but_not_deleted; sstables_compacted_but_not_deleted.reserve(set->size() + sstables_compacted_but_not_deleted.size()); set->for_each_sstable([&] (const sstables::shared_sstable& sst) mutable { sstables_compacted_but_not_deleted.push_back(sst); }); if (utils::get_local_injector().enter("tablet_cleanup_failure")) { co_await sleep(std::chrono::seconds(1)); tlogger.info("Cleanup failed for tablet {}", _cg.group_id()); throw std::runtime_error("tablet cleanup failure"); } } virtual void execute() override { _t.subtract_compaction_group_from_stats(_cg); _cg.set_main_sstables(std::move(_empty_main_set)); _cg.set_maintenance_sstables(std::move(_empty_maintenance_set)); _t.refresh_compound_sstable_set(); } }; auto permit = co_await _t.get_sstable_list_permit(); auto updater = row_cache::external_updater(std::make_unique(*this)); auto p_range = to_partition_range(token_range()); tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.", p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name()); // Since permit is still held, all actions below will be executed atomically: co_await _t._cache.invalidate(std::move(updater), p_range); _t._cache.refresh_snapshot(); co_await _t.delete_sstables_atomically(permit, _sstables_compacted_but_not_deleted); // Clearing sstables_compacted_but_not_deleted only on success allows a retry caused // by a failure during deletion to still find the sstables, despite they were removed // from the sstable sets. _sstables_compacted_but_not_deleted.clear(); if (utils::get_local_injector().enter("tablet_cleanup_failure_post_deletion")) { tlogger.info("Cleanup failed for tablet {}", group_id()); throw std::runtime_error("tablet cleanup failure"); } } future<> table::clear_inactive_reads_for_tablet(database& db, storage_group& sg) { for (auto& cg_ptr : sg.compaction_groups()) { co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range()); } } future<> storage_group::stop(sstring reason) noexcept { if (_async_gate.is_closed()) { co_return; } // Carefully waits for close of gate after stopping compaction groups, since we don't want // to wait on an ongoing compaction, *but* start it earlier to prevent iterations from // picking this group that is being stopped. auto closed_gate_fut = _async_gate.close(); co_await utils::get_local_injector().inject("wait_before_stop_compaction_groups", [] (auto& handler) -> future<> { dblog.info("wait_before_stop_compaction_groups: wait"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); dblog.info("wait_before_stop_compaction_groups: done"); }, false); // Synchronizes with in-flight writes if any, and also takes care of flushing if needed. // The reason we have to stop main cg first, is because an ongoing split always run in main cg // and output will be written to left and right groups. If either left or right are stopped before // main, split completion will add sstable to a closed group, and that might in turn trigger an // exception while running under row_cache::external_updater::execute, resulting in node crash. co_await _main_cg->stop(reason); co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) { return cg_ptr->stop(reason); }); co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) { return cg_ptr->stop(reason); }); co_await std::move(closed_gate_fut); } future<> table::stop_compaction_groups(storage_group& sg) { return sg.stop("tablet cleanup"); } future<> table::flush_compaction_groups(storage_group& sg) { for (auto& cg_ptr : sg.compaction_groups()) { co_await cg_ptr->flush(); } } future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group& sg) { for (auto& cg_ptr : sg.compaction_groups()) { co_await cg_ptr->cleanup(); // FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation, // and can cover some mutations which weren't cleaned, causing them to be lost during replay. // // This should be okay, because writes are not supposed to race with cleanups // in the first place, but it would be better to extract the exact replay_position from // the actually flushed/deleted sstables, like discard_sstable() does, so that the mutations // cleaned from sstables are exactly the same as the ones cleaned from commitlog. co_await sys_ks.save_commitlog_cleanup_record(schema()->id(), sg.token_range(), _highest_rp); // This is the only place (outside of reboot) where we delete unneeded commitlog cleanup // records. This isn't ideal -- it would be more natural if the unneeded records // were deleted as soon as they become unneeded -- but this gets the job done with a // minimal amount of code. co_await sys_ks.drop_old_commitlog_cleanup_records(db.commitlog()->min_position()); } tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name()); } future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { auto holder = async_gate().hold(); auto sgp = _sg_manager->maybe_storage_group_for_id(_schema, tid.value()); if (!sgp) { tlogger.warn("Storage group for tablet {} is deallocated. Ignore cleanup.", tid); co_return; } auto& sg = *sgp; co_await clear_inactive_reads_for_tablet(db, sg); // compaction_group::stop takes care of flushing. co_await stop_compaction_groups(sg); co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5)); co_await cleanup_compaction_groups(db, sys_ks, tid, sg); co_await utils::get_local_injector().inject("tablet_cleanup_completion_wait", utils::wait_for_message(std::chrono::seconds(5))); } future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { auto holder = async_gate().hold(); auto& sg = storage_group_for_id(tid.value()); co_await clear_inactive_reads_for_tablet(db, sg); co_await flush_compaction_groups(sg); co_await cleanup_compaction_groups(db, sys_ks, tid, sg); } shard_id table::shard_for_reads(dht::token t) const { return _erm ? _erm->shard_for_reads(*_schema, t) : dht::static_shard_of(*_schema, t); // for tests. } dht::shard_replica_set table::shard_for_writes(dht::token t) const { return _erm ? _erm->shard_for_writes(*_schema, t) : dht::shard_replica_set{dht::static_shard_of(*_schema, t)}; // for tests. } future table::estimated_partitions_in_range(dht::token_range tr) const { // FIXME: use a better estimation for the set than a simple sum of individual estimations for each sstable. // // If sstables can be grouped by token range, // and tokens within each sstable are uniformly distributed // over the token range, (that's the usual case, and it's always // the case with tablets), then we could use an alternative calculation. // // The result would be the sum of sub-results for each compaction group. // To get an estimate for a compaction group, // we could first use the sstable cardinality sketches (hyperloglog) // to get a decent post-compaction estimate of total cardinality. // And then, we could multiply the total estimate by the fraction of the group's // range which is covered by `tr`. auto sstables = select_sstables(dht::to_partition_range(tr)); uint64_t partition_count = 0; co_await seastar::max_concurrent_for_each(sstables, 10, [&partition_count, &tr] (sstables::shared_sstable sst) -> future<> { partition_count += co_await sst->estimated_keys_for_range(tr); }); co_return partition_count; } } // namespace replica