There's a flaw in table::query() -- calling querier_opt->close() can dereferences a disengaged std::optional. The fix pretty simple. Once fixed, there are two if-s checking for querier_opt being engaged or not that are worth being merged. The problem doesn't really shows itself becase table::query() is not called with null saved_querier, so the de-facto if is always correct. However, better to be on safe-side. The problem doesn't show itself for real, not worth backporting Closes scylladb/scylladb#29142 * github.com:scylladb/scylladb: table: merge adjacent querier_opt checks in query() table: don't close a disengaged querier in query()
5508 lines
246 KiB
C++
5508 lines
246 KiB
C++
/*
|
|
* Copyright (C) 2018-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/shard_id.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/with_scheduling_group.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/exception.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/coroutine/switch_to.hh>
|
|
#include <seastar/coroutine/as_future.hh>
|
|
#include <seastar/util/closeable.hh>
|
|
#include <seastar/util/defer.hh>
|
|
#include <seastar/json/json_elements.hh>
|
|
|
|
#include "dht/decorated_key.hh"
|
|
#include "replica/database.hh"
|
|
#include "replica/data_dictionary_impl.hh"
|
|
#include "replica/compaction_group.hh"
|
|
#include "replica/query_state.hh"
|
|
#include "sstables/shared_sstable.hh"
|
|
#include "sstables/sstable_set.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "cell_locking.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/logalloc.hh"
|
|
#include "utils/checked-file-impl.hh"
|
|
#include "utils/managed_bytes.hh"
|
|
#include "view_info.hh"
|
|
#include "db/data_listeners.hh"
|
|
#include "memtable-sstable.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "compaction/compaction_group_view.hh"
|
|
#include "sstables/sstable_directory.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/extensions.hh"
|
|
#include "query/query-result-writer.hh"
|
|
#include "db/view/view_update_generator.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/histogram_metrics_helper.hh"
|
|
#include "mutation/mutation_source_metadata.hh"
|
|
#include "gms/gossiper.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "db/config.hh"
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "utils/lister.hh"
|
|
#include "dht/token.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "replica/global_table_ptr.hh"
|
|
#include "locator/tablets.hh"
|
|
|
|
#include "utils/error_injection.hh"
|
|
#include "readers/reversing.hh"
|
|
#include "readers/empty.hh"
|
|
#include "readers/multi_range.hh"
|
|
#include "readers/combined.hh"
|
|
#include "readers/compacting.hh"
|
|
#include "replica/schema_describe_helper.hh"
|
|
#include "repair/incremental.hh"
|
|
|
|
namespace replica {
|
|
|
|
static logging::logger tlogger("table");
|
|
static seastar::metrics::label column_family_label("cf");
|
|
static seastar::metrics::label keyspace_label("ks");
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
table_holder::table_holder(table& t)
|
|
: _holder(t.async_gate())
|
|
, _table_ptr(t.shared_from_this())
|
|
{ }
|
|
|
|
sstables::generation_type table::calculate_generation_for_new_table() {
|
|
auto ret = _sstable_generation_generator();
|
|
tlogger.debug("{}.{} new sstable generation {}", schema()->ks_name(), schema()->cf_name(), ret);
|
|
return ret;
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_sstable_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
lw_shared_ptr<const sstables::sstable_set> sstables,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstables::sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const {
|
|
// CAVEAT: if make_sstable_reader() is called on a single partition
|
|
// we want to optimize and read exactly this partition. As a
|
|
// consequence, fast_forward_to() will *NOT* work on the result,
|
|
// regardless of what the fwd_mr parameter says.
|
|
if (pr.is_singular() && pr.start()->value().has_key()) {
|
|
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
|
|
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
|
|
} else {
|
|
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate, nullptr, integrity);
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> compaction_group::make_sstable_set() const {
|
|
// Bypasses compound set if maintenance one is empty. If a SSTable is added later into maintenance,
|
|
// the sstable set is refreshed to reflect the current state.
|
|
if (!_maintenance_sstables->size()) {
|
|
return _main_sstables;
|
|
}
|
|
return make_lw_shared(sstables::make_compound_sstable_set(_t.schema(), { _main_sstables, _maintenance_sstables }));
|
|
}
|
|
|
|
int64_t compaction_group::get_sstables_repaired_at() const noexcept {
|
|
try {
|
|
auto tid = locator::tablet_id(group_id());
|
|
auto erm = _t.get_effective_replication_map();
|
|
if (!erm) {
|
|
return 0;
|
|
}
|
|
if (!erm->get_replication_strategy().uses_tablets()) {
|
|
return 0;
|
|
}
|
|
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_t.schema()->id());
|
|
auto& tinfo = tmap.get_tablet_info(tid);
|
|
return tinfo.sstables_repaired_at;
|
|
} catch (locator::no_such_tablet_map) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> table::make_compound_sstable_set() const {
|
|
return _sg_manager->make_sstable_set();
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> compaction_group::make_maintenance_sstable_set() const {
|
|
return make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_t.schema(), token_range()));
|
|
}
|
|
|
|
void table::refresh_compound_sstable_set() {
|
|
_sstables = make_compound_sstable_set();
|
|
}
|
|
|
|
// Exposed for testing, not performance critical.
|
|
future<table::const_mutation_partition_ptr>
|
|
table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const {
|
|
return do_with(dht::partition_range::make_singular(key), [s = std::move(s), permit = std::move(permit), this] (auto& range) mutable {
|
|
return with_closeable(this->make_mutation_reader(std::move(s), std::move(permit), range), [] (mutation_reader& reader) {
|
|
return read_mutation_from_mutation_reader(reader).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
|
|
if (!mo) {
|
|
return {};
|
|
}
|
|
return std::make_unique<const mutation_partition>(std::move(mo->partition()));
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<table::const_row_ptr>
|
|
table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
|
|
return find_partition(s, std::move(permit), partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) {
|
|
if (!p) {
|
|
return make_ready_future<const_row_ptr>();
|
|
}
|
|
auto r = p->find_row(*s, clustering_key);
|
|
if (r) {
|
|
// FIXME: remove copy if only one data source
|
|
return make_ready_future<const_row_ptr>(std::make_unique<row>(*s, column_kind::regular_column, *r));
|
|
} else {
|
|
return make_ready_future<const_row_ptr>();
|
|
}
|
|
});
|
|
}
|
|
|
|
void
|
|
table::add_memtables_to_reader_list(std::vector<mutation_reader>& readers,
|
|
const schema_ptr& s,
|
|
const reader_permit& permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
const tracing::trace_state_ptr& trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
std::function<void(size_t)> reserve_fn) const {
|
|
auto add_memtables_from_cg = [&] (compaction_group& cg) mutable {
|
|
for (auto&& mt: *cg.memtables()) {
|
|
if (auto reader_opt = mt->make_mutation_reader_opt(s, permit, range, slice, trace_state, fwd, fwd_mr)) {
|
|
readers.emplace_back(std::move(*reader_opt));
|
|
}
|
|
}
|
|
};
|
|
|
|
// point queries can be optimized as they span a single compaction group.
|
|
if (range.is_singular() && range.start()->value().has_key()) {
|
|
const dht::ring_position& pos = range.start()->value();
|
|
auto& sg = storage_group_for_token(pos.token());
|
|
reserve_fn(sg.memtable_count());
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
add_memtables_from_cg(*cg);
|
|
});
|
|
return;
|
|
}
|
|
auto token_range = range.transform(std::mem_fn(&dht::ring_position::token));
|
|
auto sgs = storage_groups_for_token_range(token_range);
|
|
reserve_fn(std::ranges::fold_left(sgs | std::views::transform(std::mem_fn(&storage_group::memtable_count)), uint64_t(0), std::plus{}));
|
|
for (auto& sg : sgs) {
|
|
sg->for_each_compaction_group([&] (const compaction_group_ptr &cg) {
|
|
add_memtables_from_cg(*cg);
|
|
});
|
|
}
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_logstor_mutation_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const {
|
|
return _logstor->make_reader(std::move(s), logstor_index(), std::move(permit), pr, slice, std::move(trace_state));
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_mutation_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const {
|
|
if (_virtual_reader) [[unlikely]] {
|
|
return (*_virtual_reader).make_mutation_reader(s, std::move(permit), range, slice, trace_state, fwd, fwd_mr);
|
|
}
|
|
|
|
if (_logstor) [[unlikely]] {
|
|
return make_logstor_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
}
|
|
|
|
std::vector<mutation_reader> readers;
|
|
|
|
// We're assuming that cache and memtables are both read atomically
|
|
// for single-key queries, so we don't need to special case memtable
|
|
// undergoing a move to cache. At any given point in time between
|
|
// deferring points the sum of data in memtable and cache is coherent. If
|
|
// single-key queries for each data source were performed across deferring
|
|
// points, it would be possible that partitions which are ahead of the
|
|
// memtable cursor would be placed behind the cache cursor, resulting in
|
|
// those partitions being missing in the combined reader.
|
|
//
|
|
// We need to handle this in range queries though, as they are always
|
|
// deferring. scanning_reader from memtable.cc is falling back to reading
|
|
// the sstable when memtable is flushed. After memtable is moved to cache,
|
|
// new readers will no longer use the old memtable, but until then
|
|
// performance may suffer. We should fix this when we add support for
|
|
// range queries in cache, so that scans can always be satisfied form
|
|
// memtable and cache only, as long as data is not evicted.
|
|
//
|
|
// https://github.com/scylladb/scylla/issues/309
|
|
// https://github.com/scylladb/scylla/issues/185
|
|
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
|
|
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
|
|
if (cache_enabled() && !bypass_cache) {
|
|
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, get_tombstone_gc_state(),
|
|
get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) {
|
|
readers.emplace_back(std::move(*reader_opt));
|
|
}
|
|
} else {
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr));
|
|
}
|
|
|
|
auto rd = make_combined_reader(s, permit, std::move(readers), fwd, fwd_mr);
|
|
|
|
if (_config.data_listeners && !_config.data_listeners->empty()) {
|
|
rd = _config.data_listeners->on_read(s, range, slice, std::move(rd));
|
|
}
|
|
|
|
return rd;
|
|
}
|
|
|
|
sstables::shared_sstable table::make_streaming_sstable_for_write() {
|
|
auto newtab = make_sstable(sstables::sstable_state::normal);
|
|
tlogger.debug("Created sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
|
|
return newtab;
|
|
}
|
|
|
|
sstables::shared_sstable table::make_streaming_staging_sstable() {
|
|
auto newtab = make_sstable(sstables::sstable_state::staging);
|
|
tlogger.debug("Created staging sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
|
|
return newtab;
|
|
}
|
|
|
|
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, tombstone_gc_state gc_state,
|
|
gc_clock::time_point compaction_time, bool compaction_enabled, bool compaction_can_gc) {
|
|
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_enabled", fmt::to_string(compaction_enabled));
|
|
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_can_gc", fmt::to_string(compaction_can_gc));
|
|
if (!compaction_enabled) {
|
|
return underlying;
|
|
}
|
|
return make_compacting_reader(
|
|
std::move(underlying),
|
|
compaction_time,
|
|
compaction_can_gc ? can_always_purge : can_never_purge,
|
|
gc_state,
|
|
streamed_mutation::forwarding::no);
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_streaming_reader(schema_ptr s, reader_permit permit,
|
|
const dht::partition_range_vector& ranges,
|
|
gc_clock::time_point compaction_time) const {
|
|
auto& slice = s->full_slice();
|
|
|
|
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
|
std::vector<mutation_reader> readers;
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
|
|
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
});
|
|
|
|
return maybe_compact_for_streaming(
|
|
make_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, nullptr, mutation_reader::forwarding::no),
|
|
get_tombstone_gc_state(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, gc_clock::time_point compaction_time) const {
|
|
auto trace_state = tracing::trace_state_ptr();
|
|
const auto fwd = streamed_mutation::forwarding::no;
|
|
|
|
std::vector<mutation_reader> readers;
|
|
add_memtables_to_reader_list(readers, schema, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
|
|
return maybe_compact_for_streaming(
|
|
make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr),
|
|
get_tombstone_gc_state(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const {
|
|
auto& slice = schema->full_slice();
|
|
auto trace_state = tracing::trace_state_ptr();
|
|
const auto fwd = streamed_mutation::forwarding::no;
|
|
const auto fwd_mr = mutation_reader::forwarding::no;
|
|
return maybe_compact_for_streaming(
|
|
sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes),
|
|
get_tombstone_gc_state(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice, tracing::trace_state_ptr ts) {
|
|
if (!range.is_singular()) {
|
|
throw std::runtime_error("table::make_cache_reader(): only singular ranges are supported");
|
|
}
|
|
return _cache.make_nonpopulating_reader(std::move(schema), std::move(permit), range, slice, std::move(ts));
|
|
}
|
|
|
|
future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout) {
|
|
SCYLLA_ASSERT(m.schema() == _counter_cell_locks->schema());
|
|
return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout);
|
|
}
|
|
|
|
void table::for_each_active_memtable(noncopyable_function<void(memtable&)> action) {
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
action(cg.memtables()->active_memtable());
|
|
});
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_timestamp(); }
|
|
));
|
|
}
|
|
|
|
api::timestamp_type compaction_group::max_memtable_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::min_timestamp;
|
|
}
|
|
|
|
return std::ranges::max(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_max_timestamp(); }
|
|
));
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_live_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_live_timestamp(); }
|
|
));
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_live_row_marker_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_live_row_marker_timestamp(); }
|
|
));
|
|
}
|
|
|
|
bool compaction_group::memtable_has_key(const dht::decorated_key& key) const {
|
|
if (_memtables->empty()) {
|
|
return false;
|
|
}
|
|
return std::ranges::any_of(*_memtables,
|
|
std::bind(&memtable::contains_partition, std::placeholders::_1, std::ref(key)));
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_timestamp() const {
|
|
api::timestamp_type min_timestamp = api::max_timestamp;
|
|
for_each_compaction_group([&min_timestamp] (const compaction_group_ptr& cg) {
|
|
min_timestamp = std::min(min_timestamp, cg->min_memtable_timestamp());
|
|
});
|
|
return min_timestamp;
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_live_timestamp() const {
|
|
api::timestamp_type min_timestamp = api::max_timestamp;
|
|
for_each_compaction_group([&min_timestamp] (const compaction_group_ptr& cg) {
|
|
min_timestamp = std::min(min_timestamp, cg->min_memtable_live_timestamp());
|
|
});
|
|
return min_timestamp;
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_live_row_marker_timestamp() const {
|
|
api::timestamp_type min_timestamp = api::max_timestamp;
|
|
for_each_compaction_group([&min_timestamp] (const compaction_group_ptr& cg) {
|
|
min_timestamp = std::min(min_timestamp, cg->min_memtable_live_row_marker_timestamp());
|
|
});
|
|
return min_timestamp;
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_live_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_live_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_live_row_marker_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_live_row_marker_timestamp)));
|
|
}
|
|
|
|
static bool belongs_to_current_shard(const std::vector<shard_id>& shards) {
|
|
return std::ranges::contains(shards, this_shard_id());
|
|
}
|
|
|
|
static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
|
|
return shards.size() != size_t(belongs_to_current_shard(shards));
|
|
}
|
|
|
|
sstables::shared_sstable table::make_sstable(sstables::sstable_state state) {
|
|
auto& sstm = get_sstables_manager();
|
|
return make_sstable(state, sstm.get_preferred_sstable_version());
|
|
}
|
|
|
|
sstables::shared_sstable table::make_sstable(sstables::sstable_state state, sstables::sstable_version_types version) {
|
|
auto& sstm = get_sstables_manager();
|
|
return sstm.make_sstable(_schema, *_storage_opts, calculate_generation_for_new_table(), state, version, sstables::sstable::format_types::big);
|
|
}
|
|
|
|
sstables::shared_sstable table::make_sstable() {
|
|
return make_sstable(sstables::sstable_state::normal);
|
|
}
|
|
|
|
db_clock::time_point table::get_truncation_time() const {
|
|
if (!_truncated_at) [[unlikely]] {
|
|
on_internal_error(dblog, ::format("truncation time is not set, table {}.{}",
|
|
_schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
return *_truncated_at;
|
|
}
|
|
|
|
void table::notify_bootstrap_or_replace_start() {
|
|
_is_bootstrap_or_replace = true;
|
|
}
|
|
|
|
void table::notify_bootstrap_or_replace_end() {
|
|
_is_bootstrap_or_replace = false;
|
|
trigger_offstrategy_compaction();
|
|
}
|
|
|
|
inline void table::add_sstable_to_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
|
tracker.replace_sstables({}, {std::move(sstable)});
|
|
}
|
|
|
|
inline void table::remove_sstable_from_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
|
tracker.replace_sstables({std::move(sstable)}, {});
|
|
}
|
|
|
|
void compaction_group::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
|
// If group was closed / is being closed, it's ok to ignore request to adjust backlog tracker,
|
|
// since that might result in an exception due to the group being deregistered from compaction
|
|
// manager already. And the group is being removed anyway, so that won't have any practical
|
|
// impact.
|
|
if (_async_gate.is_closed()) {
|
|
return;
|
|
}
|
|
|
|
auto& tracker = get_backlog_tracker();
|
|
tracker.replace_sstables(old_sstables, new_sstables);
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set>
|
|
compaction_group::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
|
|
enable_backlog_tracker backlog_tracker) {
|
|
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
|
|
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
|
|
}
|
|
// allow in-progress reads to continue using old list
|
|
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
|
|
new_sstables->insert(sstable);
|
|
if (backlog_tracker) {
|
|
table::add_sstable_to_backlog_tracker(get_backlog_tracker(), sstable);
|
|
}
|
|
_max_seen_timestamp = std::max(_max_seen_timestamp, sstable->get_stats_metadata().max_timestamp);
|
|
return new_sstables;
|
|
}
|
|
|
|
void compaction_group::add_sstable(sstables::shared_sstable sstable) {
|
|
_main_sstables = do_add_sstable(_main_sstables, std::move(sstable), enable_backlog_tracker::yes);
|
|
}
|
|
|
|
const lw_shared_ptr<sstables::sstable_set>& compaction_group::main_sstables() const noexcept {
|
|
return _main_sstables;
|
|
}
|
|
|
|
sstables::sstable_set compaction_group::make_main_sstable_set() const {
|
|
// Uses view for unrepaired data, but doesn't really matter since this is only building an empty set,
|
|
// and will get through the view info like group id and token range.
|
|
return _t._compaction_strategy.make_sstable_set(view_for_unrepaired_data());
|
|
}
|
|
|
|
void compaction_group::set_main_sstables(lw_shared_ptr<sstables::sstable_set> new_main_sstables) {
|
|
_main_sstables = std::move(new_main_sstables);
|
|
}
|
|
|
|
void compaction_group::add_maintenance_sstable(sstables::shared_sstable sst) {
|
|
_maintenance_sstables = do_add_sstable(_maintenance_sstables, std::move(sst), enable_backlog_tracker::no);
|
|
}
|
|
|
|
const lw_shared_ptr<sstables::sstable_set>& compaction_group::maintenance_sstables() const noexcept {
|
|
return _maintenance_sstables;
|
|
}
|
|
|
|
void compaction_group::set_maintenance_sstables(lw_shared_ptr<sstables::sstable_set> new_maintenance_sstables) {
|
|
_maintenance_sstables = std::move(new_maintenance_sstables);
|
|
}
|
|
|
|
void table::add_sstable(compaction_group& cg, sstables::shared_sstable sstable) {
|
|
cg.add_sstable(std::move(sstable));
|
|
refresh_compound_sstable_set();
|
|
}
|
|
|
|
void table::add_maintenance_sstable(compaction_group& cg, sstables::shared_sstable sst) {
|
|
cg.add_maintenance_sstable(std::move(sst));
|
|
refresh_compound_sstable_set();
|
|
}
|
|
|
|
void table::do_update_off_strategy_trigger() {
|
|
_off_strategy_trigger.rearm(timer<>::clock::now() + std::chrono::minutes(5));
|
|
}
|
|
|
|
// If there are more sstables to be added to the off-strategy sstable set, call
|
|
// update_off_strategy_trigger to update the timer and delay to trigger
|
|
// off-strategy compaction. The off-strategy compaction will be triggered when
|
|
// the timer is expired.
|
|
void table::update_off_strategy_trigger() {
|
|
if (_off_strategy_trigger.armed()) {
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
}
|
|
|
|
// Call enable_off_strategy_trigger to enable the automatic off-strategy
|
|
// compaction trigger.
|
|
void table::enable_off_strategy_trigger() {
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
|
|
storage_group_manager::~storage_group_manager() = default;
|
|
|
|
// exception-less attempt to hold gate.
|
|
// TODO: move it to seastar.
|
|
static std::optional<gate::holder> try_hold_gate(gate& g) noexcept {
|
|
return g.is_closed() ? std::nullopt : std::make_optional(g.hold());
|
|
}
|
|
|
|
future<> storage_group_manager::parallel_foreach_storage_group(std::function<future<>(storage_group&)> f) {
|
|
co_await coroutine::parallel_for_each(_storage_groups | std::views::values, [&] (const storage_group_ptr sg) -> future<> {
|
|
// Table-wide ops, like 'nodetool compact', are inherently racy with migrations, so it's okay to skip
|
|
// storage of tablets being migrated away.
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
co_await f(*sg.get());
|
|
}
|
|
});
|
|
}
|
|
|
|
future<> storage_group_manager::for_each_storage_group_gently(std::function<future<>(storage_group&)> f) {
|
|
auto storage_groups = _storage_groups | std::views::values | std::ranges::to<std::vector>();
|
|
for (auto& sg: storage_groups) {
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
co_await f(*sg.get());
|
|
}
|
|
}
|
|
}
|
|
|
|
void storage_group_manager::for_each_storage_group(std::function<void(size_t, storage_group&)> f) const {
|
|
for (auto& [id, sg]: _storage_groups) {
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
f(id, *sg);
|
|
}
|
|
}
|
|
}
|
|
|
|
const storage_group_map& storage_group_manager::storage_groups() const {
|
|
return _storage_groups;
|
|
}
|
|
|
|
future<> storage_group_manager::stop_storage_groups() noexcept {
|
|
co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); });
|
|
co_await stop();
|
|
}
|
|
|
|
void storage_group_manager::clear_storage_groups() {
|
|
for (auto& [id, sg]: _storage_groups) {
|
|
sg->clear_sstables();
|
|
}
|
|
}
|
|
|
|
void storage_group_manager::remove_storage_group(size_t id) {
|
|
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
|
|
_storage_groups.erase(it);
|
|
} else {
|
|
throw std::out_of_range(format("remove_storage_group: storage group with id={} not found", id));
|
|
}
|
|
}
|
|
|
|
storage_group& storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const {
|
|
auto it = _storage_groups.find(i);
|
|
if (it == _storage_groups.end()) [[unlikely]] {
|
|
throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name()));
|
|
}
|
|
return *it->second.get();
|
|
}
|
|
|
|
storage_group* storage_group_manager::maybe_storage_group_for_id(const schema_ptr& s, size_t i) const {
|
|
auto it = _storage_groups.find(i);
|
|
return it != _storage_groups.end() ? &*it->second.get() : nullptr;
|
|
}
|
|
|
|
class single_storage_group_manager final : public storage_group_manager {
|
|
replica::table& _t;
|
|
storage_group_ptr _single_sg;
|
|
compaction_group* _single_cg;
|
|
|
|
compaction_group& get_compaction_group() const noexcept {
|
|
return *_single_cg;
|
|
}
|
|
public:
|
|
single_storage_group_manager(replica::table& t)
|
|
: _t(t)
|
|
{
|
|
storage_group_map r;
|
|
|
|
// Incremental repair is not supported with vnodes, so all sstables will be considered unrepaired.
|
|
auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&, int64_t sstables_repaired_at) { return repair_sstable_classification::unrepaired; };
|
|
|
|
// this might not reflect real vnode range for this node, but with 256 tokens, the actual
|
|
// first and last tokens are likely to be ~0.5% of the edges, so any measurement against
|
|
// this accurate enough token range will be likely up to ~1% off.
|
|
// TODO: we could fed actual vnode range here, but we might bump into a chicken and egg
|
|
// problem if e.g. a system table is created before tokens were allocated.
|
|
auto full_token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
auto cg = make_lw_shared<compaction_group>(_t, size_t(0), std::move(full_token_range), noop_repair_sstable_classifier);
|
|
_single_cg = cg.get();
|
|
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
|
_single_sg = sg;
|
|
r[0] = std::move(sg);
|
|
_storage_groups = std::move(r);
|
|
}
|
|
|
|
future<> stop() override {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
void update_effective_replication_map(const locator::effective_replication_map_ptr& old_erm,
|
|
const locator::effective_replication_map& erm,
|
|
noncopyable_function<void()> refresh_mutation_source) override {}
|
|
|
|
compaction_group& compaction_group_for_token(dht::token token) const override {
|
|
return get_compaction_group();
|
|
}
|
|
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const override {
|
|
utils::chunked_vector<storage_group_ptr> ret;
|
|
ret.push_back(_single_sg);
|
|
return ret;
|
|
}
|
|
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override {
|
|
return get_compaction_group();
|
|
}
|
|
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override {
|
|
return get_compaction_group();
|
|
}
|
|
size_t log2_storage_groups() const override {
|
|
return 0;
|
|
}
|
|
storage_group& storage_group_for_token(dht::token token) const override {
|
|
return *_single_sg;
|
|
}
|
|
|
|
locator::combined_load_stats table_load_stats() const override {
|
|
return locator::combined_load_stats{
|
|
.table_ls = locator::table_load_stats{
|
|
.size_in_bytes = _single_sg->live_disk_space_used(),
|
|
.split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min()},
|
|
.tablet_ls = locator::tablet_load_stats{}
|
|
};
|
|
}
|
|
|
|
bool all_storage_groups_split() override { return true; }
|
|
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
|
|
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override {
|
|
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
|
|
}
|
|
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
|
|
future<> wait_for_background_tablet_resize_work() override { return make_ready_future<>(); }
|
|
|
|
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
|
return get_compaction_group().make_sstable_set();
|
|
}
|
|
};
|
|
|
|
struct background_merge_guard {
|
|
compaction::compaction_reenabler compaction_guard;
|
|
locator::effective_replication_map_ptr erm_guard;
|
|
};
|
|
|
|
class tablet_storage_group_manager final : public storage_group_manager {
|
|
replica::table& _t;
|
|
locator::host_id _my_host_id;
|
|
const locator::tablet_map* _tablet_map;
|
|
future<> _stop_fut = make_ready_future();
|
|
// Every table replica that completes split work will load the seq number from tablet metadata into its local
|
|
// state. So when coordinator pull the local state of a table, it will know whether the table is ready for the
|
|
// current split, and not a previously revoked (stale) decision.
|
|
// The minimum value, which is a negative number, is not used by coordinator for first decision.
|
|
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
|
future<> _merge_completion_fiber;
|
|
condition_variable _merge_completion_event;
|
|
// Ensures that processes such as incremental repair will wait for pending work from
|
|
// merge fiber before proceeding. This guarantees stability on the compaction groups.
|
|
// NOTE: it's important that we don't await on the barrier with any compaction group
|
|
// gate held, since merge fiber will stop groups that in turn await on gate,
|
|
// potentially causing an ABBA deadlock.
|
|
utils::phased_barrier _merge_fiber_barrier;
|
|
std::optional<utils::phased_barrier::operation> _pending_merge_fiber_work;
|
|
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
|
std::vector<background_merge_guard> _compaction_reenablers_for_merging;
|
|
private:
|
|
const schema_ptr& schema() const {
|
|
return _t.schema();
|
|
}
|
|
|
|
const locator::tablet_map& tablet_map() const noexcept {
|
|
return *_tablet_map;
|
|
}
|
|
|
|
size_t tablet_count() const noexcept {
|
|
return tablet_map().tablet_count();
|
|
}
|
|
|
|
future<compaction::compaction_type_options::split> split_compaction_options() const noexcept;
|
|
|
|
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
|
|
// each tablet split into two, so this replica will remap all of its compaction groups
|
|
// that were previously split.
|
|
void handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
|
|
|
|
// Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into
|
|
// the new tablet id (X >> 1). In practice, that means storage groups for X and X+1
|
|
// are merged into a new storage group with id (X >> 1).
|
|
void handle_tablet_merge_completion(locator::effective_replication_map_ptr old_erm,
|
|
const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
|
|
|
|
// When merge completes, compaction groups of sibling tablets are added to same storage
|
|
// group, but they're not merged yet into one, since the merge completion handler happens
|
|
// inside the erm updater which must complete ASAP. Therefore, those groups will be merged
|
|
// into a single one (main) in background.
|
|
future<> merge_completion_fiber();
|
|
|
|
storage_group& storage_group_for_id(size_t i) const {
|
|
return storage_group_manager::storage_group_for_id(schema(), i);
|
|
}
|
|
|
|
size_t tablet_id_for_token(dht::token t) const noexcept {
|
|
return tablet_map().get_tablet_id(t).value();
|
|
}
|
|
|
|
size_t storage_group_of(dht::token t) const {
|
|
auto idx = tablet_id_for_token(t);
|
|
#ifndef SCYLLA_BUILD_MODE_RELEASE
|
|
if (idx >= tablet_count()) {
|
|
on_fatal_internal_error(tlogger, format("storage_group_of: index out of range: idx={} size_log2={} size={} token={}",
|
|
idx, log2_storage_groups(), tablet_count(), t));
|
|
}
|
|
auto& sg = storage_group_for_id(idx);
|
|
if (!t.is_minimum() && !t.is_maximum() && !sg.token_range().contains(t, dht::token_comparator())) {
|
|
on_fatal_internal_error(tlogger, format("storage_group_of: storage_group idx={} range={} does not contain token={}",
|
|
idx, sg.token_range(), t));
|
|
}
|
|
#endif
|
|
return idx;
|
|
}
|
|
|
|
repair_classifier_func make_repair_sstable_classifier_func() const {
|
|
// FIXME: implement it for incremental repair!
|
|
return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
|
|
bool is_repaired = repair::is_repaired(sstables_repaired_at, sst);
|
|
if (is_repaired) {
|
|
return repair_sstable_classification::repaired;
|
|
} else {
|
|
if (!sst->being_repaired.uuid().is_null()) {
|
|
return repair_sstable_classification::repairing;
|
|
} else {
|
|
return repair_sstable_classification::unrepaired;
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
storage_group_ptr allocate_storage_group(const locator::tablet_map& tmap, locator::tablet_id tid, dht::token_range range) const {
|
|
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range), make_repair_sstable_classifier_func());
|
|
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
|
if (tmap.needs_split()) {
|
|
sg->set_split_mode();
|
|
}
|
|
return sg;
|
|
}
|
|
public:
|
|
tablet_storage_group_manager(table& t, const locator::effective_replication_map& erm)
|
|
: _t(t)
|
|
, _my_host_id(erm.get_token_metadata().get_my_id())
|
|
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
|
|
, _merge_completion_fiber(merge_completion_fiber())
|
|
, _merge_fiber_barrier(format("[table {}.{}] merge_fiber_barrier", _t.schema()->ks_name(), _t.schema()->cf_name()))
|
|
{
|
|
storage_group_map ret;
|
|
|
|
auto& tmap = tablet_map();
|
|
auto local_replica = locator::tablet_replica{_my_host_id, this_shard_id()};
|
|
|
|
for (auto tid : tmap.tablet_ids()) {
|
|
if (!tmap.has_replica(tid, local_replica)) {
|
|
continue;
|
|
}
|
|
|
|
// if the tablet was cleaned up already on this replica, don't allocate a storage group for it.
|
|
auto trinfo = tmap.get_tablet_transition_info(tid);
|
|
if (trinfo && locator::is_post_cleanup(local_replica, tmap.get_tablet_info(tid), *trinfo)) {
|
|
continue;
|
|
}
|
|
|
|
auto range = tmap.get_token_range(tid);
|
|
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
|
|
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
|
|
}
|
|
_storage_groups = std::move(ret);
|
|
}
|
|
|
|
future<> stop() override {
|
|
_merge_completion_event.signal();
|
|
return when_all(std::exchange(_merge_completion_fiber, make_ready_future<>()),
|
|
std::exchange(_stop_fut, make_ready_future())).discard_result();
|
|
}
|
|
|
|
void update_effective_replication_map(const locator::effective_replication_map_ptr& old_erm,
|
|
const locator::effective_replication_map& erm,
|
|
noncopyable_function<void()> refresh_mutation_source) override;
|
|
|
|
compaction_group& compaction_group_for_token(dht::token token) const override;
|
|
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const override;
|
|
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override;
|
|
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override;
|
|
|
|
size_t log2_storage_groups() const override {
|
|
return log2ceil(tablet_map().tablet_count());
|
|
}
|
|
storage_group& storage_group_for_token(dht::token token) const override {
|
|
return storage_group_for_id(storage_group_of(token));
|
|
}
|
|
|
|
locator::combined_load_stats table_load_stats() const override;
|
|
bool all_storage_groups_split() override;
|
|
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
|
|
future<> maybe_split_compaction_group_of(size_t idx) override;
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override;
|
|
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
|
|
return tablet_map().get_token_range_after_split(token);
|
|
}
|
|
future<> wait_for_background_tablet_resize_work() override {
|
|
co_await _merge_fiber_barrier.advance_and_await();
|
|
co_return;
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
|
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
|
|
return make_tablet_sstable_set(schema(), *this, *_tablet_map);
|
|
}
|
|
};
|
|
|
|
bool table::uses_tablets() const {
|
|
return _erm && _erm->get_replication_strategy().uses_tablets();
|
|
}
|
|
|
|
storage_group::storage_group(compaction_group_ptr cg)
|
|
: _main_cg(cg)
|
|
, _async_gate(format("[storage_group {}.{} {}]", cg->schema()->ks_name(), cg->schema()->cf_name(), cg->group_id()))
|
|
{
|
|
}
|
|
|
|
const dht::token_range& storage_group::token_range() const noexcept {
|
|
return _main_cg->token_range();
|
|
}
|
|
|
|
const compaction_group_ptr& storage_group::main_compaction_group() const noexcept {
|
|
return _main_cg;
|
|
}
|
|
|
|
const std::vector<compaction_group_ptr>& storage_group::split_ready_compaction_groups() const {
|
|
return _split_ready_groups;
|
|
}
|
|
|
|
size_t storage_group::to_idx(locator::tablet_range_side side) const {
|
|
return size_t(side);
|
|
}
|
|
|
|
compaction_group_ptr& storage_group::select_compaction_group(dht::token token, const locator::tablet_map& tmap) noexcept {
|
|
if (splitting_mode()) {
|
|
return _split_ready_groups[to_idx(tmap.get_tablet_range_side(token))];
|
|
}
|
|
return _main_cg;
|
|
}
|
|
|
|
compaction_group_ptr& storage_group::select_compaction_group(dht::token first, dht::token last, const locator::tablet_map& tmap) noexcept {
|
|
if (splitting_mode()) {
|
|
auto first_side = tmap.get_tablet_range_side(first);
|
|
auto last_side = tmap.get_tablet_range_side(last);
|
|
if (first_side == last_side) {
|
|
return _split_ready_groups[to_idx(first_side)];
|
|
}
|
|
}
|
|
return _main_cg;
|
|
}
|
|
|
|
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
|
|
action(_main_cg);
|
|
for (auto& cg : _merging_groups) {
|
|
action(cg);
|
|
}
|
|
for (auto& cg : _split_ready_groups) {
|
|
action(cg);
|
|
}
|
|
}
|
|
|
|
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups_immediate() {
|
|
utils::small_vector<compaction_group_ptr, 3> cgs;
|
|
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
|
|
cgs.push_back(cg);
|
|
});
|
|
return cgs;
|
|
}
|
|
|
|
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups_immediate() const {
|
|
utils::small_vector<const_compaction_group_ptr, 3> cgs;
|
|
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
|
|
cgs.push_back(cg);
|
|
});
|
|
return cgs;
|
|
}
|
|
|
|
utils::small_vector<compaction_group_ptr, 3> storage_group::split_unready_groups() const {
|
|
utils::small_vector<compaction_group_ptr, 3> cgs;
|
|
cgs.push_back(_main_cg);
|
|
std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs));
|
|
return cgs;
|
|
}
|
|
|
|
bool storage_group::split_unready_groups_are_empty() const {
|
|
return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty));
|
|
}
|
|
|
|
bool storage_group::set_split_mode() {
|
|
// A group being stopped (e.g. during migration cleanup) cannot satisfy split mode.
|
|
// Also, a race can happen if new groups are added while old ones are being stopped,
|
|
// so the new ones can be left unstopped, potentially resulting in use-after-free.
|
|
if (_async_gate.is_closed()) {
|
|
return false;
|
|
}
|
|
if (!splitting_mode()) {
|
|
auto create_cg = [this] () -> compaction_group_ptr {
|
|
// TODO: use the actual sub-ranges instead, to help incremental selection on the read path.
|
|
return compaction_group::make_empty_group(*_main_cg);
|
|
};
|
|
tlogger.debug("storage_group::set_split_mode: Set sstables_repaired_at={} for split old_group={} old_range={}",
|
|
_main_cg->get_sstables_repaired_at(), _main_cg->group_id(), _main_cg->token_range());
|
|
std::vector<compaction_group_ptr> split_ready_groups(2);
|
|
split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg();
|
|
split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg();
|
|
_split_ready_groups = std::move(split_ready_groups);
|
|
}
|
|
|
|
// The storage group is considered "split ready" if all split unready groups (main + merging) are empty.
|
|
return split_unready_groups_are_empty();
|
|
}
|
|
|
|
void storage_group::add_merging_group(compaction_group_ptr cg) {
|
|
_merging_groups.push_back(std::move(cg));
|
|
}
|
|
|
|
const std::vector<compaction_group_ptr>& storage_group::merging_groups() const {
|
|
return _merging_groups;
|
|
}
|
|
|
|
future<> storage_group::remove_empty_merging_groups() {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) {
|
|
co_await group->stop("tablet merge");
|
|
}
|
|
std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty));
|
|
}
|
|
|
|
future<> compaction_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) {
|
|
auto& cm = get_compaction_manager();
|
|
|
|
for (auto view : all_views()) {
|
|
auto lock_holder = co_await cm.get_incremental_repair_read_lock(*view, "storage_group_split");
|
|
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
|
|
co_await cm.perform_offstrategy(*view, tablet_split_task_info);
|
|
co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info);
|
|
}
|
|
}
|
|
|
|
future<> compaction_group::discard_logstor_segments() {
|
|
auto& sm = get_logstor_segment_manager();
|
|
co_await sm.discard_segments(*_logstor_segments);
|
|
}
|
|
|
|
future<> compaction_group::flush_separator(std::optional<size_t> seq_num) {
|
|
auto units = co_await get_units(_separator_flush_sem, 1);
|
|
auto pending = std::exchange(_separator_flushes, {});
|
|
if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) {
|
|
auto& cm = get_logstor_compaction_manager();
|
|
auto b = std::move(*_logstor_separator);
|
|
_logstor_separator.reset();
|
|
pending.push_back(cm.flush_separator_buffer(std::move(b), *this));
|
|
}
|
|
co_await when_all(pending.begin(), pending.end());
|
|
}
|
|
|
|
logstor::separator_buffer& compaction_group::get_separator_buffer(size_t write_size) {
|
|
if (!_logstor_separator || !_logstor_separator->can_fit(write_size)) {
|
|
auto& cm = get_logstor_compaction_manager();
|
|
if (_logstor_separator) {
|
|
auto b = std::move(*_logstor_separator);
|
|
_logstor_separator.reset();
|
|
|
|
std::erase_if(_separator_flushes, [](future<>& f) { return f.available(); });
|
|
_separator_flushes.push_back(cm.flush_separator_buffer(std::move(b), *this));
|
|
}
|
|
_logstor_separator.emplace(cm.allocate_separator_buffer());
|
|
}
|
|
return *_logstor_separator;
|
|
}
|
|
|
|
future<> storage_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) {
|
|
if (set_split_mode()) {
|
|
co_return;
|
|
}
|
|
co_await utils::get_local_injector().inject("delay_split_compaction", 5s);
|
|
|
|
if (split_unready_groups_are_empty()) {
|
|
co_return;
|
|
}
|
|
for (auto cg : split_unready_groups()) {
|
|
if (cg->async_gate().is_closed()) {
|
|
continue;
|
|
}
|
|
auto holder = cg->async_gate().hold();
|
|
co_await cg->flush();
|
|
co_await cg->split(opt, tablet_split_task_info);
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> storage_group::make_sstable_set() const {
|
|
if (_split_ready_groups.empty() && _merging_groups.empty()) {
|
|
return _main_cg->make_sstable_set();
|
|
}
|
|
const auto& schema = _main_cg->_t.schema();
|
|
std::vector<lw_shared_ptr<sstables::sstable_set>> underlying;
|
|
underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size());
|
|
underlying.emplace_back(_main_cg->make_sstable_set());
|
|
for (const auto& cg : _merging_groups) {
|
|
if (!cg->empty()) {
|
|
underlying.emplace_back(cg->make_sstable_set());
|
|
}
|
|
}
|
|
for (const auto& cg : _split_ready_groups) {
|
|
underlying.emplace_back(cg->make_sstable_set());
|
|
}
|
|
return make_lw_shared(sstables::make_compound_sstable_set(schema, std::move(underlying)));
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> table::sstable_set_for_tombstone_gc(const compaction_group& cg) const {
|
|
auto& sg = storage_group_for_id(cg.group_id());
|
|
return sg.make_sstable_set();
|
|
}
|
|
|
|
bool tablet_storage_group_manager::all_storage_groups_split() {
|
|
auto& tmap = tablet_map();
|
|
if (_split_ready_seq_number == tmap.resize_decision().sequence_number) {
|
|
return true;
|
|
}
|
|
|
|
bool split_ready = true;
|
|
for (const storage_group_ptr& sg : _storage_groups | std::views::values) {
|
|
split_ready &= sg->set_split_mode();
|
|
}
|
|
|
|
// The table replica will say to coordinator that its split status is ready by
|
|
// mirroring the sequence number from tablet metadata into its local state,
|
|
// which is pulled periodically by coordinator.
|
|
if (split_ready) {
|
|
_split_ready_seq_number = tmap.resize_decision().sequence_number;
|
|
tlogger.info0("Setting split ready sequence number to {} for table {}.{}",
|
|
_split_ready_seq_number, schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
return split_ready;
|
|
}
|
|
|
|
bool table::all_storage_groups_split() {
|
|
return _sg_manager->all_storage_groups_split();
|
|
}
|
|
|
|
future<compaction::compaction_type_options::split> tablet_storage_group_manager::split_compaction_options() const noexcept {
|
|
// Split must work with a snapshot of tablet map, since it expects stability
|
|
// throughout its execution.
|
|
auto erm = _t.get_effective_replication_map();
|
|
auto tablet_map_ptr = co_await erm->get_token_metadata().tablets().get_tablet_map_ptr(schema()->id());
|
|
|
|
co_return compaction::compaction_type_options::split([tablet_map_ptr = make_lw_shared(std::move(tablet_map_ptr))] (dht::token t) {
|
|
// Classifies the input stream into either left or right side.
|
|
auto [_, side] = (*tablet_map_ptr)->get_tablet_id_and_range_side(t);
|
|
return mutation_writer::token_group_id(side);
|
|
});
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
|
|
compaction::compaction_type_options::split opt = co_await split_compaction_options();
|
|
|
|
co_await utils::get_local_injector().inject("split_storage_groups_wait", [] (auto& handler) -> future<> {
|
|
dblog.info("split_storage_groups_wait: waiting");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
dblog.info("split_storage_groups_wait: done");
|
|
}, false);
|
|
|
|
co_await for_each_storage_group_gently([opt, tablet_split_task_info] (storage_group& storage_group) {
|
|
return storage_group.split(opt, tablet_split_task_info);
|
|
});
|
|
}
|
|
|
|
future<> table::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
|
|
auto holder = async_gate().hold();
|
|
co_await _sg_manager->split_all_storage_groups(tablet_split_task_info);
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t idx) {
|
|
if (!tablet_map().needs_split()) {
|
|
co_return;
|
|
}
|
|
tasks::task_info tablet_split_task_info{tasks::task_id{tablet_map().resize_task_info().tablet_task_id.uuid()}, 0};
|
|
|
|
auto sg = _storage_groups[idx];
|
|
if (!sg) {
|
|
on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard",
|
|
idx, schema()->ks_name(), schema()->cf_name()));
|
|
}
|
|
|
|
co_return co_await sg->split(co_await split_compaction_options(), tablet_split_task_info);
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>>
|
|
tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
|
|
co_await utils::get_local_injector().inject("maybe_split_new_sstable_wait", utils::wait_for_message(120s));
|
|
if (!tablet_map().needs_split()) {
|
|
co_return std::vector<sstables::shared_sstable>{sst};
|
|
}
|
|
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
auto holder = cg.async_gate().hold();
|
|
auto& view = cg.view_for_sstable(sst);
|
|
co_return co_await _t.get_compaction_manager().maybe_split_new_sstable(sst, view, co_await split_compaction_options());
|
|
}
|
|
|
|
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
|
auto holder = async_gate().hold();
|
|
co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value());
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
|
|
auto holder = async_gate().hold();
|
|
co_return co_await _sg_manager->maybe_split_new_sstable(sst);
|
|
}
|
|
|
|
dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept {
|
|
return _sg_manager->get_token_range_after_split(token);
|
|
}
|
|
|
|
std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
|
|
std::unique_ptr<storage_group_manager> ret;
|
|
if (uses_tablets()) {
|
|
ret = std::make_unique<tablet_storage_group_manager>(*this, *_erm);
|
|
} else {
|
|
ret = std::make_unique<single_storage_group_manager>(*this);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
compaction_group* table::get_compaction_group(size_t id) const {
|
|
return storage_group_for_id(id).main_compaction_group().get();
|
|
}
|
|
|
|
storage_group& table::storage_group_for_token(dht::token token) const {
|
|
return _sg_manager->storage_group_for_token(token);
|
|
}
|
|
|
|
storage_group& table::storage_group_for_id(size_t i) const {
|
|
return _sg_manager->storage_group_for_id(_schema, i);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_token(dht::token token) const {
|
|
auto idx = storage_group_of(token);
|
|
auto& sg = storage_group_for_id(idx);
|
|
return *sg.select_compaction_group(token, tablet_map());
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_token(dht::token token) const {
|
|
return _sg_manager->compaction_group_for_token(token);
|
|
}
|
|
|
|
utils::chunked_vector<storage_group_ptr> tablet_storage_group_manager::storage_groups_for_token_range(dht::token_range tr) const {
|
|
utils::chunked_vector<storage_group_ptr> ret;
|
|
auto cmp = dht::token_comparator();
|
|
|
|
size_t candidate_start = tr.start() ? tablet_id_for_token(tr.start()->value()) : size_t(0);
|
|
size_t candidate_end = tr.end() ? tablet_id_for_token(tr.end()->value()) : (tablet_count() - 1);
|
|
|
|
while (candidate_start <= candidate_end) {
|
|
auto it = _storage_groups.find(candidate_start++);
|
|
if (it == _storage_groups.end()) {
|
|
continue;
|
|
}
|
|
auto& sg = it->second;
|
|
if (sg && tr.overlaps(sg->token_range(), cmp)) {
|
|
ret.push_back(sg);
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
utils::chunked_vector<storage_group_ptr> table::storage_groups_for_token_range(dht::token_range tr) const {
|
|
return _sg_manager->storage_groups_for_token_range(tr);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const {
|
|
return compaction_group_for_token(dht::get_token(*s, key));
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const {
|
|
return _sg_manager->compaction_group_for_key(key, s);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
|
|
auto first_id = storage_group_of(sst->get_first_decorated_key().token());
|
|
auto last_id = storage_group_of(sst->get_last_decorated_key().token());
|
|
|
|
auto sstable_desc = [] (const sstables::shared_sstable& sst) {
|
|
auto& identifier_opt = sst->sstable_identifier();
|
|
auto& originating_host_id_opt = sst->get_stats_metadata().originating_host_id;
|
|
return format("{} (originated from {} with id {} on host {})",
|
|
sst->get_filename(), sst->get_origin(),
|
|
identifier_opt ? identifier_opt->to_sstring() : "unknown",
|
|
originating_host_id_opt ? originating_host_id_opt->to_sstring() : "unknown");
|
|
};
|
|
auto tablet_desc = [this] (locator::tablet_id id) {
|
|
return format("{} (replica set: {})", id, tablet_map().get_tablet_info(id).replicas);
|
|
};
|
|
|
|
if (first_id != last_id) {
|
|
on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}",
|
|
sstable_desc(sst),
|
|
tablet_desc(locator::tablet_id(first_id)),
|
|
tablet_desc(locator::tablet_id(last_id))));
|
|
}
|
|
|
|
try {
|
|
auto& sg = storage_group_for_id(first_id);
|
|
return *sg.select_compaction_group(
|
|
sst->get_first_decorated_key().token(),
|
|
sst->get_last_decorated_key().token(),
|
|
tablet_map());
|
|
} catch (std::out_of_range& e) {
|
|
on_internal_error(tlogger, format("Unable to load SSTable {} of tablet {}, due to {}",
|
|
sstable_desc(sst),
|
|
tablet_desc(locator::tablet_id(first_id)),
|
|
e.what()));
|
|
}
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
|
|
return _sg_manager->compaction_group_for_sstable(sst);
|
|
}
|
|
|
|
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
|
|
co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> {
|
|
co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> {
|
|
tlogger.info("foreach_compaction_group_wait: waiting");
|
|
while (!handler.poll_for_message() && !_async_gate.is_closed() && !sg.async_gate().is_closed()) {
|
|
co_await sleep(std::chrono::milliseconds(5));
|
|
}
|
|
tlogger.info("foreach_compaction_group_wait: released");
|
|
});
|
|
|
|
co_await coroutine::parallel_for_each(sg.compaction_groups_immediate(), [&] (compaction_group_ptr cg) -> future<> {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
co_await action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
void table::for_each_compaction_group(std::function<void(compaction_group&)> action) {
|
|
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
void table::for_each_compaction_group(std::function<void(const compaction_group&)> action) const {
|
|
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
const storage_group_map& table::storage_groups() const {
|
|
return _sg_manager->storage_groups();
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::sstable_files_snapshot>> table::take_storage_snapshot(dht::token_range tr) {
|
|
utils::chunked_vector<sstables::sstable_files_snapshot> ret;
|
|
|
|
for (auto& sg : storage_groups_for_token_range(tr)) {
|
|
co_await utils::get_local_injector().inject("take_storage_snapshot", utils::wait_for_message(60s));
|
|
|
|
// We don't care about sstables in snapshot being unlinked, as the file
|
|
// descriptors remain opened until last reference to them are gone.
|
|
// Also, we should be careful with taking a deletion lock here as a
|
|
// deadlock might occur due to memtable flush backpressure waiting on
|
|
// compaction to reduce the backlog.
|
|
|
|
co_await sg->flush();
|
|
|
|
// The sstable set must be obtained *after* the deletion lock is taken,
|
|
// otherwise components of sstables in the set might be unlinked from the filesystem
|
|
// by compaction while we are waiting for the lock.
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
// It's vital that we build a set on the storage group level, since sstables
|
|
// might move across compaction groups in the background.
|
|
co_await sg->make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
|
ret.push_back({
|
|
.sst = sst,
|
|
.files = co_await sst->readable_file_for_all_components(),
|
|
});
|
|
});
|
|
}
|
|
|
|
co_return std::move(ret);
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_snapshot() {
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
utils::chunked_vector<sstables::shared_sstable> result;
|
|
co_await get_sstable_set().for_each_sstable_gently([&] (sstables::shared_sstable sst) {
|
|
result.push_back(sst);
|
|
});
|
|
co_return result;
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::entry_descriptor>>
|
|
table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
|
|
utils::chunked_vector<sstables::entry_descriptor> ret;
|
|
auto holder = async_gate().hold();
|
|
|
|
auto& sg = storage_group_for_id(tid.value());
|
|
auto sg_holder = sg.async_gate().hold();
|
|
co_await sg.flush();
|
|
// The sstable set must be obtained *after* the deletion lock is taken,
|
|
// otherwise components of sstables in the set might be unlinked from the filesystem
|
|
// by compaction while we are waiting for the lock.
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
|
ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed));
|
|
});
|
|
co_return ret;
|
|
}
|
|
|
|
void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept {
|
|
_stats.live_disk_space_used += sst->get_file_size_stats();
|
|
_stats.total_disk_space_used += sst->get_file_size_stats();
|
|
_stats.live_sstable_count++;
|
|
}
|
|
|
|
future<>
|
|
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy offstrategy,
|
|
bool trigger_compaction) {
|
|
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
|
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () mutable noexcept {
|
|
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
|
// atomically load all opened sstables into column family.
|
|
if (!offstrategy) {
|
|
add_sstable(cg, sst);
|
|
} else {
|
|
add_maintenance_sstable(cg, sst);
|
|
}
|
|
update_stats_for_new_sstable(sst);
|
|
if (trigger_compaction) {
|
|
try_trigger_compaction(cg);
|
|
}
|
|
// Resetting sstable ptr to inform the caller the sstable has been loaded successfully.
|
|
sst = nullptr;
|
|
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) {
|
|
return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key()));
|
|
});
|
|
}
|
|
|
|
future<>
|
|
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
|
|
auto& cg = compaction_group_for_sstable(new_sst);
|
|
// Hold gate to make share compaction group is alive.
|
|
auto holder = cg.async_gate().hold();
|
|
co_await do_add_sstable_and_update_cache(cg, new_sst, offstrategy, trigger_compaction);
|
|
}
|
|
|
|
future<>
|
|
table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
|
|
bool do_trigger_compaction = offstrategy == sstables::offstrategy::no;
|
|
co_await do_add_sstable_and_update_cache(std::move(sst), offstrategy, do_trigger_compaction);
|
|
}
|
|
|
|
future<>
|
|
table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts) {
|
|
constexpr bool do_not_trigger_compaction = false;
|
|
for (auto& sst : ssts) {
|
|
co_await do_add_sstable_and_update_cache(sst, sstables::offstrategy::no, do_not_trigger_compaction);
|
|
}
|
|
trigger_compaction();
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>>
|
|
table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
|
std::function<future<>(sstables::shared_sstable)> on_add,
|
|
sstables::offstrategy offstrategy) {
|
|
std::vector<sstables::shared_sstable> ret, ssts;
|
|
std::exception_ptr ex;
|
|
log_level failure_log_level = log_level::error;
|
|
try {
|
|
bool trigger_compaction = offstrategy == sstables::offstrategy::no;
|
|
auto& cg = compaction_group_for_sstable(new_sst);
|
|
// This prevents compaction group from being considered empty until the holder is released.
|
|
// Helpful for tablet split, where split is acked for a table when all pre-split groups are empty.
|
|
auto sstable_add_holder = cg.sstable_add_gate().hold();
|
|
|
|
ret = ssts = co_await maybe_split_new_sstable(new_sst);
|
|
// on successful split, input sstable is unlinked.
|
|
new_sst = nullptr;
|
|
for (auto& sst : ssts) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
// Hold gate to make sure compaction group is alive.
|
|
auto holder = cg.async_gate().hold();
|
|
co_await on_add(sst);
|
|
// If do_add_sstable_and_update_cache() throws after sstable has been loaded, the pointer
|
|
// sst passed by reference will be set to nullptr, so it won't be unlinked in the exception
|
|
// handler below.
|
|
co_await do_add_sstable_and_update_cache(cg, sst, offstrategy, trigger_compaction);
|
|
sst = nullptr;
|
|
}
|
|
} catch (compaction::compaction_stopped_exception&) {
|
|
failure_log_level = log_level::warn;
|
|
ex = std::current_exception();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
if (ex) {
|
|
// on failed split, input sstable is unlinked here.
|
|
if (new_sst) {
|
|
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
|
|
co_await new_sst->unlink();
|
|
}
|
|
// on failure after successful split, sstables not attached yet will be unlinked
|
|
co_await coroutine::parallel_for_each(ssts, [&ex, failure_log_level] (sstables::shared_sstable sst) -> future<> {
|
|
if (sst) {
|
|
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
|
co_await sst->unlink();
|
|
}
|
|
});
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
co_return std::move(ret);
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>>
|
|
table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
|
std::function<future<>(sstables::shared_sstable)> on_add) {
|
|
std::exception_ptr ex;
|
|
std::vector<sstables::shared_sstable> ret;
|
|
log_level failure_log_level = log_level::error;
|
|
|
|
// We rely on add_new_sstable_and_update_cache() to unlink the sstable fed into it,
|
|
// so the exception handling below will only have to unlink sstables not processed yet.
|
|
try {
|
|
for (auto& sst: new_ssts) {
|
|
auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add);
|
|
std::ranges::move(ssts, std::back_inserter(ret));
|
|
|
|
}
|
|
} catch (compaction::compaction_stopped_exception&) {
|
|
failure_log_level = log_level::warn;
|
|
ex = std::current_exception();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
if (ex) {
|
|
co_await coroutine::parallel_for_each(new_ssts, [&ex, failure_log_level] (sstables::shared_sstable sst) -> future<> {
|
|
if (sst) {
|
|
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
|
co_await sst->unlink();
|
|
}
|
|
});
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
co_return std::move(ret);
|
|
}
|
|
|
|
future<>
|
|
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
|
|
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
|
mutation_source_opt ms_opt;
|
|
if (ssts.size() == 1) {
|
|
ms_opt = ssts.front()->as_mutation_source();
|
|
} else {
|
|
std::vector<mutation_source> sources;
|
|
sources.reserve(ssts.size());
|
|
for (auto& sst : ssts) {
|
|
sources.push_back(sst->as_mutation_source());
|
|
}
|
|
ms_opt = make_combined_mutation_source(std::move(sources));
|
|
}
|
|
auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(*ms_opt), &cg] () mutable {
|
|
// FIXME: the following isn't exception safe.
|
|
for (auto& sst : ssts) {
|
|
add_sstable(cg, sst);
|
|
update_stats_for_new_sstable(sst);
|
|
}
|
|
m->mark_flushed(std::move(new_ssts_ms));
|
|
try_trigger_compaction(cg);
|
|
});
|
|
if (cache_enabled()) {
|
|
co_return co_await _cache.update(std::move(adder), *m);
|
|
} else {
|
|
co_return co_await _cache.invalidate(std::move(adder)).then([m] { return m->clear_gently(); });
|
|
}
|
|
}
|
|
|
|
bool table::add_logstor_segment(logstor::segment_descriptor& seg_desc, dht::token first_token, dht::token last_token) {
|
|
auto& cg = compaction_group_for_token(first_token);
|
|
if (&cg != &compaction_group_for_token(last_token)) {
|
|
return false;
|
|
}
|
|
cg.add_logstor_segment(seg_desc);
|
|
return true;
|
|
}
|
|
|
|
logstor::separator_buffer& table::get_logstor_separator_buffer(dht::token token, size_t write_size) {
|
|
return compaction_group_for_token(token).get_separator_buffer(write_size);
|
|
}
|
|
|
|
// Handles permit management only, used for situations where we don't want to inform
|
|
// the compaction manager about backlogs (i.e., tests)
|
|
class permit_monitor : public sstables::write_monitor {
|
|
lw_shared_ptr<sstable_write_permit> _permit;
|
|
public:
|
|
permit_monitor(lw_shared_ptr<sstable_write_permit> permit)
|
|
: _permit(std::move(permit)) {
|
|
}
|
|
|
|
virtual void on_write_started(const sstables::writer_offset_tracker& t) override { }
|
|
virtual void on_data_write_completed() override {
|
|
// We need to start a flush before the current one finishes, otherwise
|
|
// we'll have a period without significant disk activity when the current
|
|
// SSTable is being sealed, the caches are being updated, etc. To do that,
|
|
// we ensure the permit doesn't outlive this continuation.
|
|
*_permit = sstable_write_permit::unconditional();
|
|
}
|
|
};
|
|
|
|
// Handles all tasks related to sstable writing: permit management, compaction backlog updates, etc
|
|
class database_sstable_write_monitor : public permit_monitor, public compaction::backlog_write_progress_manager {
|
|
sstables::shared_sstable _sst;
|
|
compaction_group& _cg;
|
|
const sstables::writer_offset_tracker* _tracker = nullptr;
|
|
uint64_t _progress_seen = 0;
|
|
api::timestamp_type _maximum_timestamp;
|
|
public:
|
|
database_sstable_write_monitor(lw_shared_ptr<sstable_write_permit> permit, sstables::shared_sstable sst,
|
|
compaction_group& cg, api::timestamp_type max_timestamp)
|
|
: permit_monitor(std::move(permit))
|
|
, _sst(std::move(sst))
|
|
, _cg(cg)
|
|
, _maximum_timestamp(max_timestamp)
|
|
{}
|
|
|
|
database_sstable_write_monitor(const database_sstable_write_monitor&) = delete;
|
|
database_sstable_write_monitor(database_sstable_write_monitor&& x) = default;
|
|
|
|
~database_sstable_write_monitor() {
|
|
// We failed to finish handling this SSTable, so we have to update the backlog_tracker
|
|
// about it.
|
|
if (_sst) {
|
|
_cg.get_backlog_tracker().revert_charges(_sst);
|
|
}
|
|
}
|
|
|
|
virtual void on_write_started(const sstables::writer_offset_tracker& t) override {
|
|
_tracker = &t;
|
|
_cg.get_backlog_tracker().register_partially_written_sstable(_sst, *this);
|
|
}
|
|
|
|
virtual void on_data_write_completed() override {
|
|
permit_monitor::on_data_write_completed();
|
|
_progress_seen = _tracker->offset;
|
|
_tracker = nullptr;
|
|
}
|
|
|
|
virtual uint64_t written() const override {
|
|
if (_tracker) {
|
|
return _tracker->offset;
|
|
}
|
|
return _progress_seen;
|
|
}
|
|
|
|
api::timestamp_type maximum_timestamp() const override {
|
|
return _maximum_timestamp;
|
|
}
|
|
|
|
unsigned level() const override {
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
// The function never fails.
|
|
// It either succeeds eventually after retrying or aborts.
|
|
future<>
|
|
table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) noexcept {
|
|
auto old = cg.memtables()->back();
|
|
tlogger.debug("Sealing active memtable of {}.{}, partitions: {}, occupancy: {}", _schema->ks_name(), _schema->cf_name(), old->partition_count(), old->occupancy());
|
|
|
|
if (old->empty()) {
|
|
tlogger.debug("Memtable is empty");
|
|
co_return co_await _flush_barrier.advance_and_await();
|
|
}
|
|
|
|
auto permit = std::move(flush_permit);
|
|
auto r = exponential_backoff_retry(100ms, 10s);
|
|
// Try flushing for around half an hour (30 minutes every 10 seconds)
|
|
int default_retries = 30 * 60 / 10;
|
|
int allowed_retries = default_retries;
|
|
std::optional<utils::phased_barrier::operation> op;
|
|
size_t memtable_size;
|
|
future<> previous_flush = make_ready_future<>();
|
|
|
|
auto with_retry = [&] (std::function<future<>()> func) -> future<> {
|
|
for (;;) {
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_return co_await func();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
_config.cf_stats->failed_memtables_flushes_count++;
|
|
|
|
auto should_retry = [](auto* ep) {
|
|
int ec = ep->code().value();
|
|
return ec == ENOSPC || ec == EDQUOT || ec == EPERM || ec == EACCES;
|
|
};
|
|
if (try_catch<std::bad_alloc>(ex)) {
|
|
// There is a chance something else will free the memory, so we can try again
|
|
allowed_retries--;
|
|
} else if (auto ep = try_catch<std::system_error>(ex)) {
|
|
allowed_retries = should_retry(ep) ? default_retries : 0;
|
|
} else if (auto ep = try_catch<storage_io_error>(ex)) {
|
|
allowed_retries = should_retry(ep) ? default_retries : 0;
|
|
} else if (try_catch<db::extension_storage_exception>(ex)) {
|
|
allowed_retries = default_retries;
|
|
} else {
|
|
allowed_retries = 0;
|
|
}
|
|
|
|
if (allowed_retries <= 0) {
|
|
// At this point we don't know what has happened and it's better to potentially
|
|
// take the node down and rely on commitlog to replay.
|
|
//
|
|
// FIXME: enter maintenance mode when available.
|
|
// since replaying the commitlog with a corrupt mutation
|
|
// may end up in an infinite crash loop.
|
|
tlogger.error("Memtable flush failed due to: {}. Aborting, at {}", ex, current_backtrace());
|
|
std::abort();
|
|
}
|
|
}
|
|
if (_async_gate.is_closed()) {
|
|
tlogger.warn("Memtable flush failed due to: {}. Dropped due to shutdown", ex);
|
|
co_await std::move(previous_flush);
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
tlogger.warn("Memtable flush failed due to: {}. Will retry in {}ms", ex, r.sleep_time().count());
|
|
co_await r.retry();
|
|
}
|
|
};
|
|
|
|
co_await with_retry([&] {
|
|
tlogger.debug("seal_active_memtable: adding memtable");
|
|
utils::get_local_injector().inject("table_seal_active_memtable_add_memtable", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
|
|
cg.memtables()->add_memtable();
|
|
_highest_flushed_rp = std::max(_highest_flushed_rp, old->replay_position());
|
|
|
|
// no exceptions allowed (nor expected) from this point on
|
|
_stats.memtable_switch_count++;
|
|
[&] () noexcept {
|
|
// This will set evictable occupancy of the old memtable region to zero, so that
|
|
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
|
|
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
|
|
// the permit is released even though there is not much to flush in the active memtable of this list.
|
|
old->region().ground_evictable_occupancy();
|
|
memtable_size = old->occupancy().total_space();
|
|
}();
|
|
return make_ready_future<>();
|
|
});
|
|
|
|
co_await with_retry([&] {
|
|
previous_flush = _flush_barrier.advance_and_await();
|
|
utils::get_local_injector().inject("table_seal_active_memtable_start_op", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
op = _flush_barrier.start();
|
|
|
|
// no exceptions allowed (nor expected) from this point on
|
|
_stats.pending_flushes++;
|
|
_config.cf_stats->pending_memtables_flushes_count++;
|
|
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
|
|
return make_ready_future<>();
|
|
});
|
|
|
|
auto undo_stats = std::make_optional(deferred_action([this, memtable_size] () noexcept {
|
|
_stats.pending_flushes--;
|
|
_config.cf_stats->pending_memtables_flushes_count--;
|
|
_config.cf_stats->pending_memtables_flushes_bytes -= memtable_size;
|
|
}));
|
|
|
|
co_await with_retry([&] () -> future<> {
|
|
// Reacquiring the write permit might be needed if retrying flush
|
|
if (!permit.has_sstable_write_permit()) {
|
|
tlogger.debug("seal_active_memtable: reacquiring write permit");
|
|
utils::get_local_injector().inject("table_seal_active_memtable_reacquire_write_permit", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
permit = co_await std::move(permit).reacquire_sstable_write_permit();
|
|
}
|
|
auto write_permit = permit.release_sstable_write_permit();
|
|
|
|
utils::get_local_injector().inject("table_seal_active_memtable_try_flush", []() {
|
|
throw std::system_error(ENOSPC, std::system_category(), "Injected error");
|
|
});
|
|
co_await this->try_flush_memtable_to_sstable(cg, old, std::move(write_permit));
|
|
// signal a memtable was sealed
|
|
utils::get_local_injector().receive_message("table_seal_post_flush_waiters");
|
|
});
|
|
|
|
undo_stats.reset();
|
|
|
|
if (_commitlog) {
|
|
_commitlog->discard_completed_segments(_schema->id(), old->get_and_discard_rp_set());
|
|
}
|
|
co_await std::move(previous_flush);
|
|
// keep `op` alive until after previous_flush resolves
|
|
|
|
// FIXME: release commit log
|
|
// FIXME: provide back-pressure to upper layers
|
|
}
|
|
|
|
future<>
|
|
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit_) {
|
|
co_await utils::get_local_injector().inject("flush_memtable_to_sstable_wait", utils::wait_for_message(60s));
|
|
|
|
auto permit = make_lw_shared(std::move(permit_));
|
|
co_await coroutine::switch_to(_config.memtable_scheduling_group);
|
|
// Note that due to our sharded architecture, it is possible that
|
|
// in the face of a value change some shards will backup sstables
|
|
// while others won't.
|
|
//
|
|
// This is, in theory, possible to mitigate through a rwlock.
|
|
// However, this doesn't differ from the situation where all tables
|
|
// are coming from a single shard and the toggle happens in the
|
|
// middle of them.
|
|
//
|
|
// The code as is guarantees that we'll never partially backup a
|
|
// single sstable, so that is enough of a guarantee.
|
|
|
|
auto newtabs = std::vector<sstables::shared_sstable>();
|
|
auto metadata = mutation_source_metadata{};
|
|
metadata.min_timestamp = old->get_min_timestamp();
|
|
metadata.max_timestamp = old->get_max_timestamp();
|
|
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
|
|
|
|
if (!cg.async_gate().is_closed()) {
|
|
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
|
|
}
|
|
|
|
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
|
|
std::exception_ptr ex;
|
|
try {
|
|
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
|
|
cfg.backup = incremental_backups_enabled();
|
|
|
|
auto newtab = make_sstable();
|
|
newtabs.push_back(newtab);
|
|
tlogger.debug("Flushing to {}", newtab->get_filename());
|
|
|
|
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
|
|
old->get_max_timestamp());
|
|
|
|
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await reader.close();
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
});
|
|
|
|
auto f = consumer(old->make_flush_reader(
|
|
old->schema(),
|
|
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
|
|
|
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
|
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
|
// priority inversion.
|
|
co_await coroutine::switch_to(default_scheduling_group());
|
|
try {
|
|
co_await std::move(f);
|
|
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
|
|
co_await newtab->open_data();
|
|
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
|
});
|
|
|
|
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
|
|
return update_cache(cg, old, newtabs);
|
|
});
|
|
|
|
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
|
|
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
|
if (this_table_name == handler.get("table_name")) {
|
|
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
|
|
handler.set("suspended", true);
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
|
|
}
|
|
});
|
|
|
|
cg.memtables()->erase(old);
|
|
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
|
co_return;
|
|
} catch (const std::exception& e) {
|
|
for (auto& newtab : newtabs) {
|
|
newtab->mark_for_deletion();
|
|
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
|
}
|
|
_config.cf_stats->failed_memtables_flushes_count++;
|
|
// If we failed this write we will try the write again and that will create a new flush reader
|
|
// that will decrease dirty memory again. So we need to reset the accounting.
|
|
old->revert_flushed_memory();
|
|
throw;
|
|
}
|
|
}
|
|
|
|
void
|
|
table::start() {
|
|
start_compaction();
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.arm(std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::stop() {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
_flush_timer.cancel();
|
|
// Allow `compaction_group::stop` to stop ongoing compactions
|
|
// while they may still hold the table _async_gate
|
|
auto gate_closed_fut = _async_gate.close();
|
|
co_await when_all(
|
|
_pending_reads_phaser.close(),
|
|
_pending_writes_phaser.close(),
|
|
_pending_streams_phaser.close());
|
|
// Allow parallel flushes from the commitlog path
|
|
// to synchronize with table::stop
|
|
{
|
|
auto op = _pending_flushes_phaser.start();
|
|
co_await _sg_manager->stop_storage_groups();
|
|
}
|
|
co_await _pending_flushes_phaser.close();
|
|
co_await _sstable_deletion_gate.close();
|
|
co_await std::move(gate_closed_fut);
|
|
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
|
_sg_manager->clear_storage_groups();
|
|
_sstables = make_compound_sstable_set();
|
|
}));
|
|
_cache.refresh_snapshot();
|
|
}
|
|
static seastar::metrics::label_instance node_table_metrics("__per_table", "node");
|
|
void table::set_metrics() {
|
|
auto cf = column_family_label(_schema->cf_name());
|
|
auto ks = keyspace_label(_schema->ks_name());
|
|
namespace ms = seastar::metrics;
|
|
if (_config.enable_metrics_reporting) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_rows_dropped_by_tombstones", _stats.memtable_app_stats.rows_dropped_by_tombstones, ms::description("Number of rows dropped in memtables by a tombstone write"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_rows_compacted_with_tombstones", _stats.memtable_app_stats.rows_compacted_with_tombstones, ms::description("Number of rows scanned during write of a tombstone for the purpose of compaction in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_range_tombstone_reads", _stats.memtable_range_tombstone_reads, ms::description("Number of range tombstones read from memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_tombstone_reads", _stats.memtable_row_tombstone_reads, ms::description("Number of row tombstones read from memtables"))(cf)(ks),
|
|
ms::make_gauge("pending_tasks", ms::description("Estimated number of tasks pending for this column family"), _stats.pending_flushes)(cf)(ks),
|
|
ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks),
|
|
ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks),
|
|
ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks),
|
|
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
|
|
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks),
|
|
ms::make_gauge("pending_sstable_deletions",
|
|
ms::description("Number of tasks waiting to delete sstables from a table"),
|
|
[this] { return _stats.pending_sstable_deletions; })(cf)(ks)
|
|
});
|
|
|
|
// Metrics related to row locking
|
|
auto add_row_lock_metrics = [this, ks, cf] (row_locker::single_lock_stats& stats, sstring stat_name) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_total_operations(format("row_lock_{}_acquisitions", stat_name), stats.lock_acquisitions, ms::description(format("Row lock acquisitions for {} lock", stat_name)))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_queue_length(format("row_lock_{}_operations_currently_waiting_for_lock", stat_name), stats.operations_currently_waiting_for_lock, ms::description(format("Operations currently waiting for {} lock", stat_name)))(cf)(ks),
|
|
ms::make_histogram(format("row_lock_{}_waiting_time", stat_name), ms::description(format("Histogram representing time that operations spent on waiting for {} lock", stat_name)),
|
|
[&stats] {return to_metrics_histogram(stats.estimated_waiting_for_lock);})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
});
|
|
};
|
|
add_row_lock_metrics(_row_locker_stats.exclusive_row, "exclusive_row");
|
|
add_row_lock_metrics(_row_locker_stats.shared_row, "shared_row");
|
|
add_row_lock_metrics(_row_locker_stats.exclusive_partition, "exclusive_partition");
|
|
add_row_lock_metrics(_row_locker_stats.shared_partition, "shared_partition");
|
|
|
|
// View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views)
|
|
if (!_schema->is_view()) {
|
|
_view_stats.register_stats();
|
|
}
|
|
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks)
|
|
});
|
|
}
|
|
|
|
if (!is_internal_keyspace(_schema->ks_name())) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_summary("read_latency_summary", ms::description("Read latency summary"), [this] {return to_metrics_summary(_stats.reads.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("write_latency_summary", ms::description("Write latency summary"), [this] {return to_metrics_summary(_stats.writes.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_prepare_latency_summary", ms::description("CAS prepare round latency summary"), [this] {return to_metrics_summary(_stats.cas_prepare.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_propose_latency_summary", ms::description("CAS accept round latency summary"), [this] {return to_metrics_summary(_stats.cas_accept.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_commit_latency_summary", ms::description("CAS learn round latency summary"), [this] {return to_metrics_summary(_stats.cas_learn.summary());})(cf)(ks).set_skip_when_empty(),
|
|
|
|
ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_prepare_latency", ms::description("CAS prepare round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_prepare.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_propose_latency", ms::description("CAS accept round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_accept.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_commit_latency", ms::description("CAS learn round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_learn.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)
|
|
});
|
|
}
|
|
} else {
|
|
if (_config.enable_node_aggregated_table_metrics && !is_internal_keyspace(_schema->ks_name())) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}),
|
|
ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}),
|
|
ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
});
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({seastar::metrics::shard_label})
|
|
});
|
|
}
|
|
if (this_shard_id() == 0) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)(ms::shard_label(""))
|
|
});
|
|
}
|
|
}
|
|
}
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("tablets", {
|
|
ms::make_gauge("count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({column_family_label, keyspace_label})
|
|
});
|
|
}
|
|
}
|
|
|
|
void table::deregister_metrics() {
|
|
_metrics.clear();
|
|
_view_stats._metrics.clear();
|
|
}
|
|
|
|
size_t compaction_group::live_sstable_count() const noexcept {
|
|
return _main_sstables->size() + _maintenance_sstables->size();
|
|
}
|
|
|
|
size_t compaction_group::logstor_disk_space_used() const noexcept {
|
|
if (!_logstor_segments || !_t.uses_logstor()) {
|
|
return 0;
|
|
}
|
|
return _logstor_segments->segment_count() * _t.get_logstor_segment_manager().get_segment_size();
|
|
}
|
|
|
|
uint64_t compaction_group::live_disk_space_used() const noexcept {
|
|
return _main_sstables->bytes_on_disk() + _maintenance_sstables->bytes_on_disk() + logstor_disk_space_used();
|
|
}
|
|
|
|
sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() const noexcept {
|
|
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
|
|
}
|
|
|
|
uint64_t storage_group::live_disk_space_used() const {
|
|
auto cgs = const_cast<storage_group&>(*this).compaction_groups_immediate();
|
|
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
|
|
}
|
|
|
|
uint64_t compaction_group::total_disk_space_used() const noexcept {
|
|
return live_disk_space_used() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0), std::plus{});
|
|
}
|
|
|
|
sstables::file_size_stats compaction_group::total_disk_space_used_full_stats() const noexcept {
|
|
return live_disk_space_used_full_stats() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::get_file_size_stats)), sstables::file_size_stats{}, std::plus{});
|
|
}
|
|
|
|
void table::rebuild_statistics() {
|
|
_stats.live_disk_space_used = {};
|
|
_stats.live_sstable_count = 0;
|
|
_stats.total_disk_space_used = {};
|
|
|
|
for_each_compaction_group([this] (const compaction_group& cg) {
|
|
_stats.live_disk_space_used += cg.live_disk_space_used_full_stats();
|
|
_stats.total_disk_space_used += cg.total_disk_space_used_full_stats();
|
|
_stats.live_sstable_count += cg.live_sstable_count();
|
|
});
|
|
}
|
|
|
|
void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept {
|
|
_stats.live_disk_space_used -= cg.live_disk_space_used_full_stats();
|
|
_stats.total_disk_space_used -= cg.total_disk_space_used_full_stats();
|
|
_stats.live_sstable_count -= cg.live_sstable_count();
|
|
}
|
|
|
|
future<table::sstable_list_permit>
|
|
table::get_sstable_list_permit() {
|
|
co_return sstable_list_permit(co_await seastar::get_units(_sstable_set_mutation_sem, 1));
|
|
}
|
|
|
|
future<table::sstable_list_builder::result>
|
|
table::sstable_list_builder::build_new_list(const sstables::sstable_set& current_sstables,
|
|
sstables::sstable_set new_sstable_list,
|
|
const std::vector<sstables::shared_sstable>& new_sstables,
|
|
const std::vector<sstables::shared_sstable>& old_sstables) {
|
|
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
|
|
|
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
|
|
|
|
// add sstables from the current list into the new list except the ones that are in the old list
|
|
std::vector<sstables::shared_sstable> removed_sstables;
|
|
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
|
|
if (s.contains(tab)) {
|
|
removed_sstables.push_back(tab);
|
|
} else {
|
|
new_sstable_list.insert(tab);
|
|
}
|
|
});
|
|
|
|
// add new sstables into the new list
|
|
for (auto& tab : new_sstables) {
|
|
new_sstable_list.insert(tab);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return table::sstable_list_builder::result {
|
|
make_lw_shared<sstables::sstable_set>(std::move(new_sstable_list)), std::move(removed_sstables)};
|
|
}
|
|
|
|
future<>
|
|
table::delete_sstables_atomically(const sstable_list_permit&, std::vector<sstables::shared_sstable> sstables_to_remove) {
|
|
try {
|
|
auto gh = _sstable_deletion_gate.hold();
|
|
co_await get_sstables_manager().delete_atomically(std::move(sstables_to_remove));
|
|
} catch (...) {
|
|
// There is nothing more we can do here.
|
|
// Any remaining SSTables will eventually be re-compacted and re-deleted.
|
|
tlogger.error("Compacted SSTables deletion failed: {}. Ignored.", std::current_exception());
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::sstable_list_builder::delete_sstables_atomically(std::vector<sstables::shared_sstable> sstables_to_remove) {
|
|
return _t->delete_sstables_atomically(_permit, std::move(sstables_to_remove));
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable>
|
|
compaction_group::unused_sstables_for_deletion(compaction::compaction_completion_desc desc) const {
|
|
std::unordered_set<sstables::shared_sstable> output(desc.new_sstables.begin(), desc.new_sstables.end());
|
|
|
|
return std::ranges::to<std::vector<sstables::shared_sstable>>(desc.old_sstables
|
|
| std::views::filter([&output] (const sstables::shared_sstable& input_sst) {
|
|
return !output.contains(input_sst);
|
|
}));
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable> compaction_group::all_sstables() const {
|
|
std::vector<sstables::shared_sstable> all;
|
|
auto main_sstables = _main_sstables->all();
|
|
auto maintenance_sstables = _maintenance_sstables->all();
|
|
all.reserve(main_sstables->size() + maintenance_sstables->size());
|
|
std::ranges::copy(*main_sstables, std::back_inserter(all));
|
|
std::ranges::copy(*maintenance_sstables, std::back_inserter(all));
|
|
return all;
|
|
}
|
|
|
|
future<>
|
|
compaction_group::update_repaired_at_for_merge() {
|
|
auto sstables_repaired_at = get_sstables_repaired_at();
|
|
constexpr int64_t new_repaired_at = 0;
|
|
|
|
auto modifier = [] (sstables::sstable& new_sst) {
|
|
new_sst.update_repaired_at(new_repaired_at);
|
|
};
|
|
|
|
std::unordered_map<compaction::compaction_group_view*, std::vector<sstables::shared_sstable>> sstables_by_view;
|
|
for (auto& sst : all_sstables()) {
|
|
auto& stats = sst->get_stats_metadata();
|
|
if (stats.repaired_at > sstables_repaired_at) {
|
|
auto& view = view_for_sstable(sst);
|
|
sstables_by_view[&view].push_back(sst);
|
|
} else {
|
|
tlogger.debug("Skipped repaired_at update for tablet merge sstable={} repaired_at={} sstables_repaired_at={} group_id={} range={}",
|
|
sst->get_filename(), stats.repaired_at, sstables_repaired_at, group_id(), token_range());
|
|
}
|
|
}
|
|
|
|
auto& cm = get_compaction_manager();
|
|
for (auto& [view, ssts] : sstables_by_view) {
|
|
for (auto& sst : ssts) {
|
|
tlogger.info("Updating repaired_at for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}",
|
|
sst->get_filename(), sst->get_stats_metadata().repaired_at, new_repaired_at, sstables_repaired_at, group_id(), token_range());
|
|
}
|
|
co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts),
|
|
sstables::component_type::Statistics, modifier);
|
|
}
|
|
tlogger.info("Completed updating repaired_at={} for tablet merge in compaction group_id={} range={}",
|
|
new_repaired_at, group_id(), token_range());
|
|
}
|
|
|
|
future<compaction_reenablers_and_lock_holders> table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
|
const service::frozen_topology_guard& guard, dht::token_range range) {
|
|
auto ret = compaction_reenablers_and_lock_holders();
|
|
// Waits for background tablet resize work like merge that might destroy compaction groups,
|
|
// providing stability. Essentially, serializes tablet merge completion handling with
|
|
// the start of incremental repair, from the replica side.
|
|
co_await _sg_manager->wait_for_background_tablet_resize_work();
|
|
|
|
for (auto sg : storage_groups_for_token_range(range)) {
|
|
// FIXME: indentation
|
|
auto cgs = sg->compaction_groups_immediate();
|
|
for (auto& cg : cgs) {
|
|
auto gate_holder = cg->async_gate().hold();
|
|
auto& view = cg->view_for_unrepaired_data();
|
|
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(view);
|
|
tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard);
|
|
ret.cres.push_back(std::make_unique<compaction::compaction_reenabler>(std::move(cre)));
|
|
|
|
// This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair.
|
|
// The unrepaired compaction started by minor compaction does not need to take the lock since it ignores
|
|
// sstables being repaired, so it can run in parallel with repair.
|
|
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(view, "row_level_repair");
|
|
tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard);
|
|
ret.lock_holders.push_back(std::move(lock_holder));
|
|
}
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<> table::clear_being_repaired_for_range(dht::token_range range) {
|
|
auto sgs = storage_groups_for_token_range(range);
|
|
for (auto& sg : sgs) {
|
|
auto cgs = sg->compaction_groups_immediate();
|
|
for (auto& cg : cgs) {
|
|
auto sstables = cg->all_sstables();
|
|
co_await coroutine::maybe_yield();
|
|
for (auto& sst : sstables) {
|
|
co_await coroutine::maybe_yield();
|
|
if (!sst->being_repaired.uuid().is_null()) {
|
|
sst->being_repaired = service::session_id(utils::UUID());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
future<>
|
|
compaction_group::merge_sstables_from(compaction_group& group) {
|
|
auto permit = co_await _t.get_sstable_list_permit();
|
|
table::sstable_list_builder builder(_t, std::move(permit));
|
|
|
|
auto sstables_to_merge = group.all_sstables();
|
|
// re-build new list for this group with sstables of the group being merged.
|
|
auto res = co_await builder.build_new_list(*main_sstables(), make_main_sstable_set(), sstables_to_merge, {});
|
|
// execute:
|
|
std::invoke([&] noexcept {
|
|
set_main_sstables(std::move(res.new_sstable_set));
|
|
group.clear_sstables();
|
|
// FIXME: backlog adjustment is not exception safe.
|
|
backlog_tracker_adjust_charges({}, sstables_to_merge);
|
|
});
|
|
_t.rebuild_statistics();
|
|
}
|
|
|
|
future<>
|
|
compaction_group::update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc) {
|
|
// Build a new list of _sstables: We remove from the existing list the
|
|
// tables we compacted (by now, there might be more sstables flushed
|
|
// later), and we add the new tables generated by the compaction.
|
|
// We create a new list rather than modifying it in-place, so that
|
|
// on-going reads can continue to use the old list.
|
|
//
|
|
// We only remove old sstables after they are successfully deleted,
|
|
// to avoid a new compaction from ignoring data in the old sstables
|
|
// if the deletion fails (note deletion of shared sstables can take
|
|
// unbounded time, because all shards must agree on the deletion).
|
|
|
|
// make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion.
|
|
for (auto& sst : desc.old_sstables) {
|
|
auto shards = sst->get_shards_for_this_sstable();
|
|
auto& schema = _t.schema();
|
|
if (shards.size() > 1) {
|
|
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!",
|
|
schema->ks_name(), schema->cf_name(), sst->toc_filename()));
|
|
}
|
|
if (!belongs_to_current_shard(shards)) {
|
|
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used sstable {} which doesn't belong to this shard!",
|
|
schema->ks_name(), schema->cf_name(), sst->toc_filename()));
|
|
}
|
|
}
|
|
|
|
// Precompute before so undo_compacted_but_not_deleted can be sure not to throw
|
|
std::unordered_set<sstables::shared_sstable> s(
|
|
desc.old_sstables.begin(), desc.old_sstables.end());
|
|
auto& sstables_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
|
sstables_compacted_but_not_deleted.insert(sstables_compacted_but_not_deleted.end(), desc.old_sstables.begin(), desc.old_sstables.end());
|
|
// After we are done, unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
|
|
// or they could stay forever in the set, resulting in deleted files remaining
|
|
// opened and disk space not being released until shutdown.
|
|
auto undo_compacted_but_not_deleted = defer([&] {
|
|
std::erase_if(sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) {
|
|
return s.contains(sst);
|
|
});
|
|
_t.rebuild_statistics();
|
|
});
|
|
|
|
_t.get_stats().pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_t.get_stats().pending_sstable_deletions--;
|
|
});
|
|
|
|
class sstable_list_updater : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
table::sstable_list_builder& _builder;
|
|
const compaction::compaction_completion_desc& _desc;
|
|
struct replacement_desc {
|
|
compaction::compaction_completion_desc desc;
|
|
table::sstable_list_builder::result main_sstable_set_builder_result;
|
|
std::optional<lw_shared_ptr<sstables::sstable_set>> new_maintenance_sstables;
|
|
};
|
|
std::unordered_map<compaction_group*, replacement_desc> _cg_desc;
|
|
public:
|
|
explicit sstable_list_updater(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d)
|
|
: _t(cg._t), _cg(cg), _builder(builder), _desc(d) {}
|
|
virtual future<> prepare() override {
|
|
// Segregate output sstables according to their owner compaction group.
|
|
// If not in splitting mode, then all output sstables will belong to the same group.
|
|
for (auto& sst : _desc.new_sstables) {
|
|
auto& cg = _t.compaction_group_for_sstable(sst);
|
|
_cg_desc[&cg].desc.new_sstables.push_back(sst);
|
|
}
|
|
// The group that triggered compaction is the only one to have sstables removed from it.
|
|
_cg_desc[&_cg].desc.old_sstables = _desc.old_sstables;
|
|
for (auto& [cg, d] : _cg_desc) {
|
|
d.main_sstable_set_builder_result = co_await _builder.build_new_list(*cg->main_sstables(), cg->make_main_sstable_set(),
|
|
d.desc.new_sstables, d.desc.old_sstables);
|
|
|
|
if (!d.desc.old_sstables.empty()
|
|
&& d.main_sstable_set_builder_result.removed_sstables.size() != d.desc.old_sstables.size()) {
|
|
// Not all old_sstables were removed from the main sstable set, which implies that
|
|
// they don't exist there. This can happen if the input sstables were picked up from
|
|
// the maintenance set during an offstrategy or scrub compaction. So, remove the old
|
|
// sstables from the maintenance set. No need to add any new sstables to the maintenance
|
|
// set though, as they are always added to the main set.
|
|
auto builder_result = co_await _builder.build_new_list(
|
|
*cg->maintenance_sstables(), std::move(*cg->make_maintenance_sstable_set()), {}, d.desc.old_sstables);
|
|
d.new_maintenance_sstables = std::move(builder_result.new_sstable_set);
|
|
}
|
|
}
|
|
}
|
|
virtual void execute() override {
|
|
for (auto&& [cg, d] : _cg_desc) {
|
|
cg->set_main_sstables(std::move(d.main_sstable_set_builder_result.new_sstable_set));
|
|
if (d.new_maintenance_sstables) {
|
|
// offstrategy or scrub compaction - replace the maintenance set
|
|
cg->set_maintenance_sstables(std::move(d.new_maintenance_sstables.value()));
|
|
}
|
|
}
|
|
// FIXME: the following is not exception safe
|
|
_t.refresh_compound_sstable_set();
|
|
for (auto& [cg, d] : _cg_desc) {
|
|
cg->backlog_tracker_adjust_charges(d.main_sstable_set_builder_result.removed_sstables, d.desc.new_sstables);
|
|
}
|
|
}
|
|
static std::unique_ptr<row_cache::external_updater_impl> make(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d) {
|
|
return std::make_unique<sstable_list_updater>(cg, builder, d);
|
|
}
|
|
};
|
|
table::sstable_list_builder builder(_t, co_await _t.get_sstable_list_permit());
|
|
auto updater = row_cache::external_updater(sstable_list_updater::make(*this, builder, desc));
|
|
auto& cache = _t.get_row_cache();
|
|
|
|
co_await cache.invalidate(std::move(updater), std::move(desc.ranges_for_cache_invalidation));
|
|
|
|
// refresh underlying data source in row cache to prevent it from holding reference
|
|
// to sstables files that are about to be deleted.
|
|
cache.refresh_snapshot();
|
|
|
|
_t.rebuild_statistics();
|
|
|
|
co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc)));
|
|
}
|
|
|
|
future<>
|
|
table::compact_all_sstables(tasks::task_info info, do_flush do_flush, bool consider_only_existing_data) {
|
|
if (do_flush) {
|
|
co_await flush();
|
|
}
|
|
// Forces off-strategy before major, so sstables previously sitting on maintenance set will be included
|
|
// in the compaction's input set, to provide same semantics as before maintenance set came into existence.
|
|
co_await perform_offstrategy_compaction(info);
|
|
co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) -> future<> {
|
|
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
|
co_await _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data);
|
|
});
|
|
}
|
|
|
|
void table::start_compaction() {
|
|
set_compaction_strategy(_schema->compaction_strategy());
|
|
}
|
|
|
|
void table::trigger_compaction() {
|
|
for_each_compaction_group([] (compaction_group& cg) {
|
|
cg.trigger_compaction();
|
|
});
|
|
}
|
|
|
|
void table::trigger_logstor_compaction() {
|
|
for_each_compaction_group([] (compaction_group& cg) {
|
|
cg.trigger_logstor_compaction();
|
|
});
|
|
}
|
|
|
|
void table::try_trigger_compaction(compaction_group& cg) noexcept {
|
|
try {
|
|
cg.trigger_compaction();
|
|
} catch (...) {
|
|
tlogger.error("Failed to trigger compaction: {}", std::current_exception());
|
|
}
|
|
}
|
|
|
|
future<> table::flush_separator(std::optional<size_t> seq_num) {
|
|
if (!uses_logstor()) {
|
|
co_return;
|
|
}
|
|
|
|
// wait for all previous writes to be written to a separator buffer
|
|
co_await get_logstor_segment_manager().await_pending_writes();
|
|
|
|
// flush separator buffers
|
|
co_await parallel_foreach_compaction_group([seq_num] (compaction_group& cg) {
|
|
return cg.flush_separator(seq_num);
|
|
});
|
|
}
|
|
|
|
future<logstor::table_segment_stats> table::get_logstor_segment_stats() const {
|
|
logstor::table_segment_stats result;
|
|
if (!uses_logstor()) {
|
|
co_return std::move(result);
|
|
}
|
|
|
|
const auto segment_size = get_logstor_segment_manager().get_segment_size();
|
|
const auto bucket_count = 32;
|
|
const auto bucket_size = segment_size / bucket_count;
|
|
|
|
result.histogram.resize(bucket_count);
|
|
|
|
co_await const_cast<table*>(this)->parallel_foreach_compaction_group([&] (const compaction_group& cg) -> future<> {
|
|
const auto& cg_segments = cg.logstor_segments();
|
|
|
|
result.compaction_group_count++;
|
|
result.segment_count += cg_segments.segment_count();
|
|
|
|
for (const auto& desc : cg_segments._segments) {
|
|
co_await coroutine::maybe_yield();
|
|
auto data_size = desc.net_data_size(segment_size);
|
|
auto bucket_index = std::min<size_t>(data_size / bucket_size, bucket_count - 1);
|
|
auto& bucket = result.histogram[bucket_index];
|
|
bucket.count++;
|
|
bucket.max_data_size = std::max(bucket.max_data_size, data_size);
|
|
}
|
|
});
|
|
|
|
co_return std::move(result);
|
|
}
|
|
|
|
void compaction_group::trigger_compaction() {
|
|
// But not if we're locked out or stopping
|
|
if (!_async_gate.is_closed()) {
|
|
// FIXME: indentation
|
|
for (auto view : all_views()) {
|
|
_t._compaction_manager.submit(*view);
|
|
}
|
|
}
|
|
}
|
|
|
|
void compaction_group::trigger_logstor_compaction() {
|
|
if (!_async_gate.is_closed() && !_t.is_auto_compaction_disabled_by_user()) {
|
|
if (_logstor_segments) {
|
|
get_logstor_compaction_manager().submit(*this);
|
|
}
|
|
}
|
|
}
|
|
|
|
void table::trigger_offstrategy_compaction() {
|
|
// Run in background.
|
|
// This is safe since the the compaction task is tracked
|
|
// by the compaction_manager until stop()
|
|
(void)perform_offstrategy_compaction(tasks::task_info{}).then_wrapped([this] (future<bool> f) {
|
|
if (f.failed()) {
|
|
auto ex = f.get_exception();
|
|
tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex);
|
|
}
|
|
});
|
|
}
|
|
|
|
future<bool> table::perform_offstrategy_compaction(tasks::task_info info) {
|
|
// If the user calls trigger_offstrategy_compaction() to trigger
|
|
// off-strategy explicitly, cancel the timeout based automatic trigger.
|
|
_off_strategy_trigger.cancel();
|
|
bool performed = false;
|
|
co_await parallel_foreach_compaction_group_view([this, &performed, info] (compaction::compaction_group_view& view) -> future<> {
|
|
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
|
performed |= co_await _compaction_manager.perform_offstrategy(view, info);
|
|
});
|
|
co_return performed;
|
|
}
|
|
|
|
future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges,
|
|
tasks::task_info info,
|
|
do_flush do_flush) {
|
|
auto* cg = try_get_compaction_group_with_static_sharding();
|
|
if (!cg) {
|
|
co_return;
|
|
}
|
|
|
|
if (do_flush) {
|
|
co_await flush();
|
|
}
|
|
|
|
auto lock_holder = co_await get_compaction_manager().get_incremental_repair_read_lock(cg->as_view_for_static_sharding(), "perform_cleanup_compaction");
|
|
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg->as_view_for_static_sharding(), info);
|
|
}
|
|
|
|
future<unsigned> compaction_group::estimate_pending_compactions() const {
|
|
unsigned ret = 0;
|
|
for (auto& view : all_views()) {
|
|
ret += co_await _t.get_compaction_strategy().estimated_pending_compactions(*view);
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<unsigned> table::estimate_pending_compactions() const {
|
|
unsigned ret = 0;
|
|
co_await const_cast<table*>(this)->parallel_foreach_compaction_group([&ret] (const compaction_group& cg) -> future<> {
|
|
ret += co_await cg.estimate_pending_compactions();
|
|
});
|
|
co_return ret;
|
|
}
|
|
|
|
void compaction_group::set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept {
|
|
_compaction_strategy_state = std::move(compaction_strategy_state);
|
|
}
|
|
|
|
void table::set_compaction_strategy(compaction::compaction_strategy_type strategy) {
|
|
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), compaction::compaction_strategy::name(strategy));
|
|
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
|
|
|
|
struct compaction_group_strategy_updater {
|
|
table& t;
|
|
compaction_group& cg;
|
|
compaction::compaction_backlog_tracker new_bt;
|
|
compaction::compaction_strategy_state new_cs_state;
|
|
|
|
compaction_group_strategy_updater(table& t, compaction_group& cg, compaction::compaction_strategy& new_cs)
|
|
: t(t)
|
|
, cg(cg)
|
|
, new_bt(new_cs.make_backlog_tracker())
|
|
, new_cs_state(compaction::compaction_strategy_state::make(new_cs)) {
|
|
}
|
|
|
|
void prepare(compaction::compaction_strategy& new_cs) {
|
|
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
|
|
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
|
|
|
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
|
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
|
|
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
|
new_sstables_for_backlog_tracker.push_back(s);
|
|
});
|
|
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
|
}
|
|
|
|
void execute() noexcept {
|
|
// Update strategy state and backlog tracker according to new strategy. SSTable set update
|
|
// is delayed until new compaction, which is triggered on strategy change. SSTable set
|
|
// cannot be updated here since it must happen under the set update lock.
|
|
cg.register_backlog_tracker(std::move(new_bt));
|
|
cg.set_compaction_strategy_state(std::move(new_cs_state));
|
|
}
|
|
};
|
|
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
|
|
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
compaction_group_strategy_updater updater(*this, cg, new_cs);
|
|
updater.prepare(new_cs);
|
|
cg_sstable_set_updaters.push_back(std::move(updater));
|
|
});
|
|
// now exception safe:
|
|
_compaction_strategy = std::move(new_cs);
|
|
for (auto& updater : cg_sstable_set_updaters) {
|
|
updater.execute();
|
|
}
|
|
}
|
|
|
|
size_t table::sstables_count() const {
|
|
return _sstables->size();
|
|
}
|
|
|
|
std::vector<uint64_t> table::sstable_count_per_level() const {
|
|
std::vector<uint64_t> count_per_level;
|
|
_sstables->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
auto level = sst->get_sstable_level();
|
|
|
|
if (level + 1 > count_per_level.size()) {
|
|
count_per_level.resize(level + 1, 0UL);
|
|
}
|
|
count_per_level[level]++;
|
|
});
|
|
return count_per_level;
|
|
}
|
|
|
|
int64_t table::get_unleveled_sstables() const {
|
|
// TODO: when we support leveled compaction, we should return the number of
|
|
// SSTables in L0. If leveled compaction is enabled in this column family,
|
|
// then we should return zero, as we currently do.
|
|
return 0;
|
|
}
|
|
|
|
future<std::unordered_set<sstables::shared_sstable>> table::get_sstables_by_partition_key(const sstring& key) const {
|
|
auto pk = partition_key::from_nodetool_style_string(_schema, key);
|
|
auto dk = dht::decorate_key(*_schema, pk);
|
|
auto hk = sstables::sstable::make_hashed_key(*_schema, dk.key());
|
|
|
|
auto sel = std::make_unique<sstables::sstable_set::incremental_selector>(get_sstable_set().make_incremental_selector());
|
|
const auto& sst = sel->select(dk).sstables;
|
|
|
|
std::unordered_set<sstables::shared_sstable> ssts;
|
|
for (auto s : sst) {
|
|
if (co_await s->has_partition_key(hk, dk)) {
|
|
ssts.insert(s);
|
|
}
|
|
}
|
|
co_return ssts;
|
|
}
|
|
|
|
const sstables::sstable_set& table::get_sstable_set() const {
|
|
return *_sstables;
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list> table::get_sstables() const {
|
|
return _sstables->all();
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable> table::select_sstables(const dht::partition_range& range) const {
|
|
return _sstables->select(range);
|
|
}
|
|
|
|
future<> table::drop_quarantined_sstables() {
|
|
class quarantine_removal_updater : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
std::vector<sstables::shared_sstable>& _removed;
|
|
struct compaction_group_update {
|
|
lw_shared_ptr<sstables::sstable_set> new_main_sstables;
|
|
lw_shared_ptr<sstables::sstable_set> new_maintenance_sstables;
|
|
std::vector<sstables::shared_sstable> removed_main_sstables;
|
|
};
|
|
std::unordered_map<compaction_group*, compaction_group_update> _cg_updates;
|
|
|
|
public:
|
|
explicit quarantine_removal_updater(table& t, std::vector<sstables::shared_sstable>& removed)
|
|
: _t(t), _removed(removed) {}
|
|
|
|
virtual future<> prepare() override {
|
|
_t.for_each_compaction_group([&] (compaction_group& cg) {
|
|
auto new_main = make_lw_shared<sstables::sstable_set>(cg.make_main_sstable_set());
|
|
auto new_maintenance = cg.make_maintenance_sstable_set();
|
|
std::vector<sstables::shared_sstable> removed_main;
|
|
|
|
cg.main_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
if (sst->is_quarantined()) {
|
|
_removed.emplace_back(sst);
|
|
removed_main.emplace_back(sst);
|
|
|
|
} else {
|
|
new_main->insert(sst);
|
|
}
|
|
});
|
|
|
|
|
|
cg.maintenance_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
if (sst->is_quarantined()) {
|
|
_removed.emplace_back(sst);
|
|
} else {
|
|
new_maintenance->insert(sst);
|
|
}
|
|
});
|
|
|
|
_cg_updates[&cg] = compaction_group_update{
|
|
.new_main_sstables = std::move(new_main),
|
|
.new_maintenance_sstables = std::move(new_maintenance),
|
|
.removed_main_sstables = std::move(removed_main)
|
|
};
|
|
});
|
|
co_return;
|
|
}
|
|
|
|
virtual void execute() override {
|
|
for (auto& [cg, update] : _cg_updates) {
|
|
cg->set_main_sstables(std::move(update.new_main_sstables));
|
|
cg->set_maintenance_sstables(std::move(update.new_maintenance_sstables));
|
|
}
|
|
_t.refresh_compound_sstable_set();
|
|
for (auto& [cg, d] : _cg_updates) {
|
|
cg->get_backlog_tracker().replace_sstables(d.removed_main_sstables, {});
|
|
}
|
|
}
|
|
|
|
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, std::vector<sstables::shared_sstable>& removed) {
|
|
return std::make_unique<quarantine_removal_updater>(t, removed);
|
|
}
|
|
};
|
|
|
|
|
|
_stats.pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_stats.pending_sstable_deletions--;
|
|
});
|
|
|
|
auto permit = co_await get_sstable_list_permit();
|
|
|
|
std::vector<sstables::shared_sstable> removed;
|
|
auto updater = row_cache::external_updater(quarantine_removal_updater::make(*this, removed));
|
|
co_await _cache.invalidate(std::move(updater));
|
|
|
|
_cache.refresh_snapshot();
|
|
rebuild_statistics();
|
|
|
|
co_await delete_sstables_atomically(permit, std::move(removed));
|
|
}
|
|
|
|
bool storage_group::no_compacted_sstable_undeleted() const {
|
|
auto ret = true;
|
|
for_each_compaction_group([&ret] (const compaction_group_ptr& cg) {
|
|
ret &= cg->compacted_undeleted_sstables().empty();
|
|
});
|
|
return ret;
|
|
}
|
|
|
|
// Gets the list of all sstables in the column family, including ones that are
|
|
// not used for active queries because they have already been compacted, but are
|
|
// waiting for delete_atomically() to return.
|
|
//
|
|
// As long as we haven't deleted them, compaction needs to ensure it doesn't
|
|
// garbage-collect a tombstone that covers data in an sstable that may not be
|
|
// successfully deleted.
|
|
lw_shared_ptr<const sstable_list> table::get_sstables_including_compacted_undeleted() const {
|
|
bool no_compacted_undeleted_sstable = std::ranges::all_of(storage_groups() | std::views::values,
|
|
std::mem_fn(&storage_group::no_compacted_sstable_undeleted));
|
|
if (no_compacted_undeleted_sstable) {
|
|
return get_sstables();
|
|
}
|
|
auto ret = make_lw_shared<sstable_list>(*_sstables->all());
|
|
for_each_compaction_group([&ret] (const compaction_group& cg) {
|
|
for (auto&& s: cg.compacted_undeleted_sstables()) {
|
|
ret->insert(s);
|
|
}
|
|
});
|
|
return ret;
|
|
}
|
|
|
|
const std::vector<sstables::shared_sstable>& compaction_group::compacted_undeleted_sstables() const noexcept {
|
|
return _sstables_compacted_but_not_deleted;
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>
|
|
table::make_memory_only_memtable_list() {
|
|
auto get_schema = [this] { return schema(); };
|
|
return make_lw_shared<memtable_list>(std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group,
|
|
&get_compaction_manager().get_shared_tombstone_gc_state());
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>
|
|
table::make_memtable_list(compaction_group& cg) {
|
|
auto seal = [this, &cg] (flush_permit&& permit) -> future<> {
|
|
gate::holder holder = cg.flush_gate().hold();
|
|
co_await seal_active_memtable(cg, std::move(permit));
|
|
};
|
|
auto get_schema = [this] { return schema(); };
|
|
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group,
|
|
&get_compaction_manager().get_shared_tombstone_gc_state());
|
|
}
|
|
|
|
class compaction_group::compaction_group_view : public compaction::compaction_group_view {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
// When engaged, compaction is disabled altogether on this view.
|
|
std::optional<compaction::compaction_reenabler> _compaction_reenabler;
|
|
private:
|
|
bool belongs_to_this_view(const sstables::shared_sstable& sst) const {
|
|
return &_cg.view_for_sstable(sst) == this;
|
|
}
|
|
|
|
future<lw_shared_ptr<const sstables::sstable_set>> make_sstable_set_for_this_view(lw_shared_ptr<const sstables::sstable_set> sstables, auto make_sstable_set_func) const {
|
|
sstables::sstable_set ret = make_sstable_set_func();
|
|
auto all_sstables = sstables->all();
|
|
auto belongs_to_this_view_func = [this] (const sstables::shared_sstable& sst) { return belongs_to_this_view(sst); };
|
|
for (auto sst : *all_sstables | std::views::filter(belongs_to_this_view_func)) {
|
|
ret.insert(sst);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return make_lw_shared<const sstables::sstable_set>(std::move(ret));
|
|
}
|
|
public:
|
|
explicit compaction_group_view(table& t, compaction_group& cg) : _t(t), _cg(cg) {}
|
|
|
|
dht::token_range token_range() const noexcept override {
|
|
return _cg.token_range();
|
|
}
|
|
|
|
const schema_ptr& schema() const noexcept override {
|
|
return _t.schema();
|
|
}
|
|
unsigned min_compaction_threshold() const noexcept override {
|
|
// During receiving stream operations, the less we compact the faster streaming is. For
|
|
// bootstrap and replace thereThere are no readers so it is fine to be less aggressive with
|
|
// compactions as long as we don't ignore them completely (this could create a problem for
|
|
// when streaming ends)
|
|
if (_t._is_bootstrap_or_replace) {
|
|
auto target = std::min(_t.schema()->max_compaction_threshold(), 16);
|
|
return std::max(_t.schema()->min_compaction_threshold(), target);
|
|
} else {
|
|
return _t.schema()->min_compaction_threshold();
|
|
}
|
|
}
|
|
bool compaction_enforce_min_threshold() const noexcept override {
|
|
return _t.get_config().compaction_enforce_min_threshold || _t._is_bootstrap_or_replace;
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
|
return make_sstable_set_for_this_view(_cg.main_sstables(), [this] { return _cg.make_main_sstable_set(); });
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
|
return make_sstable_set_for_this_view(_cg.maintenance_sstables(), [this] { return *_cg.make_maintenance_sstable_set(); });
|
|
}
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
|
return _t.sstable_set_for_tombstone_gc(_cg);
|
|
}
|
|
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
|
return compaction::get_fully_expired_sstables(*this, sstables, query_time);
|
|
}
|
|
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
|
|
return _cg.compacted_undeleted_sstables();
|
|
}
|
|
compaction::compaction_strategy& get_compaction_strategy() const noexcept override {
|
|
return _t.get_compaction_strategy();
|
|
}
|
|
compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override {
|
|
return _cg._compaction_strategy_state;
|
|
}
|
|
reader_permit make_compaction_reader_permit() const override {
|
|
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {});
|
|
}
|
|
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
|
return _t.get_sstables_manager();
|
|
}
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state state) const override {
|
|
return _t.make_sstable(state);
|
|
}
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state state, sstables::sstable_version_types version) const override {
|
|
return _t.make_sstable(state, version);
|
|
}
|
|
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
|
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
|
|
return cfg;
|
|
}
|
|
api::timestamp_type min_memtable_timestamp() const override {
|
|
return _cg.min_memtable_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_timestamp() const override {
|
|
return _cg.min_memtable_live_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_row_marker_timestamp() const override {
|
|
return _cg.min_memtable_live_row_marker_timestamp();
|
|
}
|
|
bool memtable_has_key(const dht::decorated_key& key) const override {
|
|
return _cg.memtable_has_key(key);
|
|
}
|
|
future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
|
co_await _cg.update_sstable_sets_on_compaction_completion(std::move(desc));
|
|
if (offstrategy) {
|
|
_cg.trigger_compaction();
|
|
}
|
|
}
|
|
bool is_auto_compaction_disabled_by_user() const noexcept override {
|
|
return _t.is_auto_compaction_disabled_by_user();
|
|
}
|
|
bool tombstone_gc_enabled() const noexcept override {
|
|
return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled();
|
|
}
|
|
tombstone_gc_state get_tombstone_gc_state() const noexcept override {
|
|
return _t.get_tombstone_gc_state();
|
|
}
|
|
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
|
|
return _cg.get_backlog_tracker();
|
|
}
|
|
const std::string get_group_id() const noexcept override {
|
|
return fmt::to_string(_cg.group_id());
|
|
}
|
|
|
|
seastar::condition_variable& get_staging_done_condition() noexcept override {
|
|
return _cg.get_staging_done_condition();
|
|
}
|
|
|
|
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
|
return _t.get_token_range_after_split(t);
|
|
}
|
|
|
|
int64_t get_sstables_repaired_at() const noexcept override {
|
|
return _cg.get_sstables_repaired_at();
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<compaction_group::compaction_group_view> compaction_group::make_compacting_view() {
|
|
auto view = std::make_unique<compaction_group_view>(_t, *this);
|
|
_t._compaction_manager.add(*view);
|
|
return view;
|
|
}
|
|
|
|
std::unique_ptr<compaction_group::compaction_group_view> compaction_group::make_non_compacting_view() {
|
|
auto view = std::make_unique<compaction_group_view>(_t, *this);
|
|
auto reenabler = _t._compaction_manager.add_with_compaction_disabled(*view);
|
|
// Attaches compaction reenabler, so this non compacting view will not be compacted.
|
|
_compaction_disabler_for_views.push_back(std::move(reenabler));
|
|
return view;
|
|
}
|
|
|
|
compaction_group::compaction_group(table& t, size_t group_id, dht::token_range token_range, repair_classifier_func repair_classifier)
|
|
: _t(t)
|
|
, _unrepaired_view(make_compacting_view())
|
|
, _repairing_view(make_non_compacting_view())
|
|
, _repaired_view(make_compacting_view())
|
|
, _group_id(group_id)
|
|
, _token_range(std::move(token_range))
|
|
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy))
|
|
, _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list())
|
|
, _main_sstables(make_lw_shared<sstables::sstable_set>(make_main_sstable_set()))
|
|
, _maintenance_sstables(make_maintenance_sstable_set())
|
|
, _async_gate(format("[compaction_group {}.{} {}]", t.schema()->ks_name(), t.schema()->cf_name(), group_id))
|
|
, _backlog_tracker(t.get_compaction_strategy().make_backlog_tracker())
|
|
, _repair_sstable_classifier(std::move(repair_classifier))
|
|
, _logstor_segments(make_lw_shared<logstor::segment_set>())
|
|
{
|
|
}
|
|
|
|
compaction_group_ptr compaction_group::make_empty_group(const compaction_group& base) {
|
|
return make_lw_shared<compaction_group>(base._t, base._group_id, base._token_range, base._repair_sstable_classifier);
|
|
}
|
|
|
|
bool compaction_group::stopped() const noexcept {
|
|
return _async_gate.is_closed();
|
|
}
|
|
|
|
bool compaction_group::compaction_disabled() const {
|
|
return std::ranges::all_of(all_views(), [this] (compaction::compaction_group_view* view) {
|
|
return _t._compaction_manager.compaction_disabled(*view);
|
|
});
|
|
}
|
|
|
|
compaction_group::~compaction_group() {
|
|
// Unclosed group is not tolerated since it might result in an use-after-free.
|
|
if (!compaction_disabled()) {
|
|
on_fatal_internal_error(tlogger, format("Compaction group of id {} that belongs to {}.{} was not disabled.",
|
|
_group_id, _t.schema()->ks_name(), _t.schema()->cf_name()));
|
|
}
|
|
}
|
|
|
|
future<> compaction_group::stop(sstring reason) noexcept {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
// FIXME: indentation
|
|
for (auto view : all_views()) {
|
|
co_await _t._compaction_manager.stop_ongoing_compactions(reason, view);
|
|
}
|
|
if (_t.uses_logstor()) {
|
|
co_await get_logstor_compaction_manager().stop_ongoing_compactions(*this);
|
|
}
|
|
co_await _async_gate.close();
|
|
auto flush_future = co_await seastar::coroutine::as_future(flush());
|
|
|
|
co_await flush_separator();
|
|
co_await _flush_gate.close();
|
|
co_await _sstable_add_gate.close();
|
|
// FIXME: indentation
|
|
_compaction_disabler_for_views.clear();
|
|
co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s));
|
|
for (auto view : all_views()) {
|
|
co_await _t._compaction_manager.remove(*view, reason);
|
|
}
|
|
|
|
if (flush_future.failed()) {
|
|
co_await seastar::coroutine::return_exception_ptr(flush_future.get_exception());
|
|
}
|
|
}
|
|
|
|
bool compaction_group::empty() const noexcept {
|
|
return _memtables->empty() && live_sstable_count() == 0 && _sstable_add_gate.get_count() == 0;
|
|
}
|
|
|
|
const schema_ptr& compaction_group::schema() const {
|
|
return _t.schema();
|
|
}
|
|
|
|
void compaction_group::clear_sstables() {
|
|
_main_sstables = make_lw_shared<sstables::sstable_set>(make_main_sstable_set());
|
|
_maintenance_sstables = make_maintenance_sstable_set();
|
|
}
|
|
|
|
void storage_group::clear_sstables() {
|
|
for_each_compaction_group([] (const compaction_group_ptr& cg) {
|
|
cg->clear_sstables();
|
|
});
|
|
}
|
|
|
|
table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_options> sopts, compaction::compaction_manager& compaction_manager,
|
|
sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker,
|
|
locator::effective_replication_map_ptr erm)
|
|
: _schema(std::move(schema))
|
|
, _config(std::move(config))
|
|
, _erm(std::move(erm))
|
|
, _storage_opts(std::move(sopts))
|
|
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()),
|
|
keyspace_label(_schema->ks_name()),
|
|
column_family_label(_schema->cf_name())
|
|
)
|
|
, _compaction_manager(compaction_manager)
|
|
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
|
, _sg_manager(make_storage_group_manager())
|
|
, _sstables(make_compound_sstable_set())
|
|
, _sstable_deletion_gate(format("[table {}.{}] sstable_deletion_gate", _schema->ks_name(), _schema->cf_name()))
|
|
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
|
|
, _commitlog(nullptr)
|
|
, _readonly(true)
|
|
, _durable_writes(true)
|
|
, _sstables_manager(sst_manager)
|
|
, _index_manager(this->as_data_dictionary())
|
|
, _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name()))
|
|
, _counter_cell_locks(_schema->is_counter() ? std::make_unique<cell_locker>(_schema, cl_stats) : nullptr)
|
|
, _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_streams_phaser(format("[table {}.{}] pending_streams", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_flushes_phaser(format("[table {}.{}] pending_flushes", _schema->ks_name(), _schema->cf_name()))
|
|
, _row_locker(_schema)
|
|
, _flush_timer([this]{ on_flush_timer(); })
|
|
, _off_strategy_trigger([this] { trigger_offstrategy_compaction(); })
|
|
{
|
|
if (!_config.enable_disk_writes) {
|
|
tlogger.warn("Writes disabled, column family no durable.");
|
|
}
|
|
|
|
recalculate_tablet_count_stats();
|
|
set_metrics();
|
|
|
|
update_tombstone_gc_rf_one();
|
|
}
|
|
|
|
void table::on_flush_timer() {
|
|
tlogger.debug("on table {}.{} flush timer, period {}ms", _schema->ks_name(), _schema->cf_name(), _schema->memtable_flush_period());
|
|
(void)with_gate(_async_gate, [this] {
|
|
return flush().finally([this] {
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.rearm(timer<lowres_clock>::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
// The following functions return true if we should return the tablet size of a tablet in
|
|
// migration depending on its transition stage and whether it is a leaving or pending replica
|
|
bool has_size_on_leaving (locator::tablet_transition_stage stage) {
|
|
switch (stage) {
|
|
case locator::tablet_transition_stage::allow_write_both_read_old: [[fallthrough]];
|
|
case locator::tablet_transition_stage::write_both_read_old: [[fallthrough]];
|
|
case locator::tablet_transition_stage::write_both_read_old_fallback_cleanup: [[fallthrough]];
|
|
case locator::tablet_transition_stage::streaming: [[fallthrough]];
|
|
case locator::tablet_transition_stage::write_both_read_new: [[fallthrough]];
|
|
case locator::tablet_transition_stage::use_new: [[fallthrough]];
|
|
case locator::tablet_transition_stage::cleanup_target: [[fallthrough]];
|
|
case locator::tablet_transition_stage::revert_migration: [[fallthrough]];
|
|
case locator::tablet_transition_stage::rebuild_repair: [[fallthrough]];
|
|
case locator::tablet_transition_stage::repair: [[fallthrough]];
|
|
case locator::tablet_transition_stage::end_repair:
|
|
return true;
|
|
case locator::tablet_transition_stage::cleanup: [[fallthrough]];
|
|
case locator::tablet_transition_stage::end_migration:
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool has_size_on_pending (locator::tablet_transition_stage stage) {
|
|
switch (stage) {
|
|
case locator::tablet_transition_stage::allow_write_both_read_old: [[fallthrough]];
|
|
case locator::tablet_transition_stage::write_both_read_old: [[fallthrough]];
|
|
case locator::tablet_transition_stage::write_both_read_old_fallback_cleanup: [[fallthrough]];
|
|
case locator::tablet_transition_stage::streaming: [[fallthrough]];
|
|
case locator::tablet_transition_stage::cleanup_target: [[fallthrough]];
|
|
case locator::tablet_transition_stage::revert_migration: [[fallthrough]];
|
|
case locator::tablet_transition_stage::rebuild_repair:
|
|
return false;
|
|
case locator::tablet_transition_stage::write_both_read_new: [[fallthrough]];
|
|
case locator::tablet_transition_stage::use_new: [[fallthrough]];
|
|
case locator::tablet_transition_stage::cleanup: [[fallthrough]];
|
|
case locator::tablet_transition_stage::end_migration: [[fallthrough]];
|
|
case locator::tablet_transition_stage::repair: [[fallthrough]];
|
|
case locator::tablet_transition_stage::end_repair:
|
|
return true;
|
|
}
|
|
}
|
|
|
|
locator::combined_load_stats tablet_storage_group_manager::table_load_stats() const {
|
|
locator::table_load_stats table_stats;
|
|
table_stats.split_ready_seq_number = _split_ready_seq_number;
|
|
|
|
locator::tablet_load_stats tablet_stats;
|
|
|
|
for_each_storage_group([&] (size_t id, storage_group& sg) {
|
|
auto tid = locator::tablet_id(id);
|
|
locator::global_tablet_id gid { _t.schema()->id(), tid };
|
|
locator::tablet_replica me { _my_host_id, this_shard_id() };
|
|
const uint64_t tablet_size = sg.live_disk_space_used();
|
|
|
|
auto transition = _tablet_map->get_tablet_transition_info(tid);
|
|
auto& info = _tablet_map->get_tablet_info(tid);
|
|
bool is_pending = transition && transition->pending_replica == me;
|
|
bool is_leaving = transition && locator::get_leaving_replica(info, *transition) == me;
|
|
|
|
// It's important to tackle the anomaly in reported size, since both leaving and
|
|
// pending replicas could otherwise be accounted during tablet migration.
|
|
// If transition hasn't reached write_both_read_new stage, then leaving replicas are accounted.
|
|
// Otherwise, pending replicas are accounted.
|
|
// This helps to reduce the discrepancy window.
|
|
auto table_size_filter = [&] () {
|
|
// if tablet is not in transit, it's filtered in.
|
|
if (!transition) {
|
|
return true;
|
|
}
|
|
|
|
auto s = transition->reads; // read selector
|
|
|
|
return (!is_pending && !is_leaving)
|
|
|| (is_leaving && s == locator::read_replica_set_selector::previous)
|
|
|| (is_pending && s == locator::read_replica_set_selector::next);
|
|
};
|
|
|
|
// When a tablet is in migration, we want to send its size during any migration stage when
|
|
// we still know the tablet's size. This way the balancer will have better information about
|
|
// tablet sizes, and we reduce the chance that the node will be ignored during balancing
|
|
// due to missing tablet size. On the leaving replica we include tablets until the use_new
|
|
// stage (inclusive), and on the pending we include tablets after the streaming stage.
|
|
// There is an overlap in tablet sizes (we report sizes on both the leaving and pending
|
|
// replicas for some stages), but that should not be a problem.
|
|
auto tablet_size_filter = [&] () {
|
|
// if tablet is not in transit, it's filtered in.
|
|
if (!transition) {
|
|
return true;
|
|
}
|
|
|
|
if (is_leaving) {
|
|
return has_size_on_leaving(transition->stage);
|
|
} else if (is_pending) {
|
|
return has_size_on_pending(transition->stage);
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
if (table_size_filter()) {
|
|
table_stats.size_in_bytes += tablet_size;
|
|
}
|
|
|
|
if (tablet_size_filter()) {
|
|
const dht::token_range trange = _tablet_map->get_token_range(gid.tablet);
|
|
// Make sure the token range is in the form (a, b]
|
|
SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());
|
|
tablet_stats.tablet_sizes[gid.table][trange] = tablet_size;
|
|
}
|
|
});
|
|
return locator::combined_load_stats{
|
|
.table_ls = std::move(table_stats),
|
|
.tablet_ls = std::move(tablet_stats)
|
|
};
|
|
}
|
|
|
|
locator::combined_load_stats table::table_load_stats() const {
|
|
return _sg_manager->table_load_stats();
|
|
}
|
|
|
|
void tablet_storage_group_manager::handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
|
|
auto table_id = schema()->id();
|
|
size_t old_tablet_count = old_tmap.tablet_count();
|
|
size_t new_tablet_count = new_tmap.tablet_count();
|
|
storage_group_map new_storage_groups;
|
|
|
|
if (!old_tablet_count) {
|
|
on_internal_error(tlogger, format("Table {} had zero tablets, it should never happen when splitting.", table_id));
|
|
}
|
|
|
|
// NOTE: exception when applying replica changes to reflect token metadata will abort for obvious reasons,
|
|
// so exception safety is not required here.
|
|
|
|
unsigned growth_factor = log2ceil(new_tablet_count / old_tablet_count);
|
|
unsigned split_size = 1 << growth_factor;
|
|
tlogger.debug("Growth factor: {}, split size {}", growth_factor, split_size);
|
|
|
|
if (old_tablet_count * split_size != new_tablet_count) {
|
|
on_internal_error(tlogger, format("New tablet count for table {} is unexpected, actual: {}, expected {}.",
|
|
table_id, new_tablet_count, old_tablet_count * split_size));
|
|
}
|
|
|
|
// Stop the released main compaction groups asynchronously
|
|
for (auto& [id, sg] : _storage_groups) {
|
|
if (!sg->split_unready_groups_are_empty()) {
|
|
on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \
|
|
"therefore groups cannot be remapped with the new tablet count.",
|
|
id, table_id));
|
|
}
|
|
// Remove old empty groups, they're unused, but they need to be deregistered properly
|
|
// FIXME: indent.
|
|
for (auto cg_ptr : sg->split_unready_groups()) {
|
|
auto f = cg_ptr->stop("tablet split");
|
|
if (!f.available() || f.failed()) [[unlikely]] {
|
|
_stop_fut = _stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable {
|
|
return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) {
|
|
tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex));
|
|
});
|
|
});
|
|
}
|
|
}
|
|
unsigned first_new_id = id << growth_factor;
|
|
auto split_ready_groups = sg->split_ready_compaction_groups();
|
|
if (split_ready_groups.size() != split_size) {
|
|
on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size));
|
|
}
|
|
for (unsigned i = 0; i < split_size; i++) {
|
|
auto group_id = first_new_id + i;
|
|
auto old_range = old_tmap.get_token_range(locator::tablet_id(id));
|
|
auto new_range = new_tmap.get_token_range(locator::tablet_id(group_id));
|
|
auto sstables_repaired_at = new_tmap.get_tablet_info(locator::tablet_id(group_id)).sstables_repaired_at;
|
|
tlogger.debug("Setting sstables_repaired_at={} for split tablet_id={} old_tid={} new_tid={} old_range={} new_range={} idx={}",
|
|
sstables_repaired_at, table_id, id, group_id, old_range, new_range, i);
|
|
split_ready_groups[i]->update_id_and_range(group_id, new_range);
|
|
new_storage_groups[group_id] = make_lw_shared<storage_group>(std::move(split_ready_groups[i]));
|
|
}
|
|
|
|
tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].",
|
|
id, table_id, fmt::join(std::views::iota(first_new_id, first_new_id+split_size), ", "));
|
|
}
|
|
|
|
_storage_groups = std::move(new_storage_groups);
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::merge_completion_fiber() {
|
|
co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group);
|
|
|
|
while (!_t.async_gate().is_closed()) {
|
|
try {
|
|
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(5min));
|
|
auto ks_name = schema()->ks_name();
|
|
auto cf_name = schema()->cf_name();
|
|
// Enable compaction after merge is done.
|
|
auto cres = std::exchange(_compaction_reenablers_for_merging, {});
|
|
co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> {
|
|
auto main_group = sg.main_compaction_group();
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
|
int nr = 0;
|
|
int sz = sg.merging_groups().size();
|
|
for (auto& group : sg.merging_groups()) {
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} merging {} out of {} groups",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range(), ++nr, sz);
|
|
// Synchronize with ongoing writes that might be blocked waiting for memory.
|
|
// Also, disabling compaction provides stability on the sstable set.
|
|
// Flushes memtable, so all the data can be moved.
|
|
co_await group->stop("tablet merge");
|
|
if (utils::get_local_injector().enter("merge_completion_fiber_error")) {
|
|
tlogger.info("Got merge_completion_fiber_error");
|
|
co_await sleep(std::chrono::seconds(60));
|
|
}
|
|
co_await main_group->merge_sstables_from(*group);
|
|
}
|
|
co_await sg.remove_empty_merging_groups();
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
|
});
|
|
} catch (...) {
|
|
tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
|
|
tlogger.info("Merge completion fiber finished, about to sleep");
|
|
});
|
|
_pending_merge_fiber_work.reset();
|
|
co_await _merge_completion_event.wait();
|
|
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
}
|
|
|
|
void tablet_storage_group_manager::handle_tablet_merge_completion(locator::effective_replication_map_ptr old_erm,
|
|
const locator::tablet_map& old_tmap,
|
|
const locator::tablet_map& new_tmap) {
|
|
auto table_id = schema()->id();
|
|
size_t old_tablet_count = old_tmap.tablet_count();
|
|
size_t new_tablet_count = new_tmap.tablet_count();
|
|
storage_group_map new_storage_groups;
|
|
|
|
unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count);
|
|
unsigned merge_size = 1 << log2_reduce_factor;
|
|
|
|
if (merge_size != 2) {
|
|
throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}",
|
|
old_tablet_count, new_tablet_count, table_id));
|
|
}
|
|
|
|
for (auto& [id, sg] : _storage_groups) {
|
|
// Pick first (even) tablet of each sibling pair.
|
|
if (id % merge_size != 0) {
|
|
continue;
|
|
}
|
|
auto new_tid = id >> log2_reduce_factor;
|
|
auto new_range = new_tmap.get_token_range(locator::tablet_id(new_tid));
|
|
auto new_cg = make_lw_shared<compaction_group>(_t, new_tid, new_range, make_repair_sstable_classifier_func());
|
|
for (auto& view : new_cg->all_views()) {
|
|
auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging");
|
|
_compaction_reenablers_for_merging.push_back(background_merge_guard{std::move(cre), old_erm});
|
|
}
|
|
auto new_sg = make_lw_shared<storage_group>(std::move(new_cg));
|
|
|
|
for (unsigned i = 0; i < merge_size; i++) {
|
|
auto group_id = id + i;
|
|
|
|
auto it = _storage_groups.find(group_id);
|
|
if (it == _storage_groups.end()) {
|
|
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
|
|
}
|
|
auto& sg = it->second;
|
|
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {
|
|
cg->update_id(new_tid);
|
|
tlogger.debug("Adding merging_group: sstables_repaired_at={} old_range={} new_range={} old_tid={} new_tid={} old_group_id={}",
|
|
cg->get_sstables_repaired_at(), cg->token_range(), new_range, cg->group_id(), new_tid, group_id);
|
|
new_sg->add_merging_group(cg);
|
|
});
|
|
// Cannot wait for group to be closed, since it can only return after some long-running operation
|
|
// is done with it, and old erm is still held at this point.
|
|
(void) with_gate(_t.async_gate(), [sg] {
|
|
return sg->close().handle_exception([sg] (std::exception_ptr ex) {
|
|
tlogger.warn("Failed to close storage group: {}. Ignored", std::move(ex));
|
|
});
|
|
});
|
|
}
|
|
new_storage_groups[new_tid] = std::move(new_sg);
|
|
}
|
|
_storage_groups = std::move(new_storage_groups);
|
|
_pending_merge_fiber_work = _merge_fiber_barrier.start();
|
|
_merge_completion_event.signal();
|
|
}
|
|
|
|
void tablet_storage_group_manager::update_effective_replication_map(
|
|
const locator::effective_replication_map_ptr& old_erm,
|
|
const locator::effective_replication_map& erm,
|
|
noncopyable_function<void()> refresh_mutation_source)
|
|
{
|
|
auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id());
|
|
auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map);
|
|
|
|
size_t old_tablet_count = old_tablet_map->tablet_count();
|
|
size_t new_tablet_count = new_tablet_map->tablet_count();
|
|
if (new_tablet_count > old_tablet_count) {
|
|
tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets",
|
|
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
|
handle_tablet_split_completion(*old_tablet_map, *new_tablet_map);
|
|
} else if (new_tablet_count < old_tablet_count) {
|
|
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
|
|
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
|
if (utils::get_local_injector().is_enabled("tablet_force_tablet_count_decrease_once")) {
|
|
utils::get_local_injector().disable("tablet_force_tablet_count_decrease");
|
|
}
|
|
handle_tablet_merge_completion(old_erm, *old_tablet_map, *new_tablet_map);
|
|
}
|
|
|
|
// Allocate storage group if tablet is migrating in, or deallocate if it's migrating out.
|
|
auto this_replica = locator::tablet_replica{
|
|
.host = erm.get_token_metadata().get_my_id(),
|
|
.shard = this_shard_id()
|
|
};
|
|
auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) {
|
|
return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica;
|
|
};
|
|
bool tablet_migrating_in = false;
|
|
for (auto& transition : new_tablet_map->transitions()) {
|
|
auto tid = transition.first;
|
|
auto transition_info = transition.second;
|
|
if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) {
|
|
auto range = new_tablet_map->get_token_range(tid);
|
|
_storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range));
|
|
tablet_migrating_in = true;
|
|
} else if (_storage_groups.contains(tid.value()) && locator::is_post_cleanup(this_replica, new_tablet_map->get_tablet_info(tid), transition_info)) {
|
|
// The storage group should be cleaned up and stopped at this point usually by the tablet cleanup stage,
|
|
// unless the storage group was allocated after tablet cleanup was completed for this node. This could
|
|
// happen if the node was restarted after tablet cleanup was run but before moving to the next stage. To
|
|
// handle this case we stop the storage group here if it's not stopped already.
|
|
auto sg = _storage_groups[tid.value()];
|
|
|
|
remove_storage_group(tid.value());
|
|
|
|
(void) with_gate(_t.async_gate(), [sg] {
|
|
return sg->stop("tablet post-cleanup").then([sg] {});
|
|
});
|
|
}
|
|
}
|
|
|
|
// update the per compaction group tombstone GC enabled flag
|
|
for_each_storage_group([&] (size_t group_id, storage_group& sg) {
|
|
const locator::tablet_id tid = static_cast<locator::tablet_id>(group_id);
|
|
const locator::tablet_info& tinfo = new_tablet_map->get_tablet_info(tid);
|
|
const bool tombstone_gc_enabled = std::ranges::contains(tinfo.replicas, this_replica);
|
|
|
|
sg.for_each_compaction_group([tombstone_gc_enabled] (const compaction_group_ptr& cg_ptr) {
|
|
cg_ptr->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
|
});
|
|
});
|
|
|
|
// TODO: possibly use row_cache::invalidate(external_updater) instead on all ranges of new replicas,
|
|
// as underlying source will be refreshed and external_updater::execute can refresh the sstable set.
|
|
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
|
|
// problem as fresh node won't have any data in new range and migration cleanup invalidates the
|
|
// range being moved away.
|
|
if (tablet_migrating_in || old_tablet_count != new_tablet_count) {
|
|
refresh_mutation_source();
|
|
}
|
|
}
|
|
|
|
// This function is called in the topology::transition_state::tablet_resize_finalization transition which
|
|
// guarantees there is no tablet repair. The cm.stop_and_disable_compaction() ensures no compaction is running.
|
|
future<> table::update_repaired_at_for_merge() {
|
|
if (!uses_tablets()) {
|
|
co_return;
|
|
}
|
|
if (utils::get_local_injector().enter("skip_update_repaired_at_for_merge")) {
|
|
co_return;
|
|
}
|
|
auto sgs = storage_groups();
|
|
for (auto& x : sgs) {
|
|
auto sg = x.second;
|
|
if (sg) {
|
|
auto cgs = sg->compaction_groups_immediate();
|
|
for (auto& cg : cgs) {
|
|
auto cre = co_await cg->get_compaction_manager().stop_and_disable_compaction("update_repaired_at_for_merge", cg->view_for_unrepaired_data());
|
|
co_await cg->update_repaired_at_for_merge();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
|
auto old_erm = std::exchange(_erm, std::move(erm));
|
|
|
|
auto refresh_mutation_source = [this] {
|
|
refresh_compound_sstable_set();
|
|
_cache.refresh_snapshot();
|
|
};
|
|
|
|
if (uses_tablets()) {
|
|
_sg_manager->update_effective_replication_map(old_erm, *_erm, refresh_mutation_source);
|
|
}
|
|
if (old_erm) {
|
|
old_erm->invalidate();
|
|
}
|
|
|
|
recalculate_tablet_count_stats();
|
|
|
|
update_tombstone_gc_rf_one();
|
|
}
|
|
|
|
void table::update_tombstone_gc_rf_one() {
|
|
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
|
|
if (_erm && _erm->get_replication_factor() == 1) {
|
|
st.set_table_rf_one(_schema->id());
|
|
} else {
|
|
st.set_table_rf_n(_schema->id());
|
|
}
|
|
}
|
|
|
|
void table::recalculate_tablet_count_stats() {
|
|
_stats.tablet_count = calculate_tablet_count();
|
|
}
|
|
|
|
int64_t table::calculate_tablet_count() const {
|
|
if (!uses_tablets()) {
|
|
return 0;
|
|
}
|
|
|
|
return _sg_manager->storage_groups().size();
|
|
}
|
|
|
|
partition_presence_checker
|
|
table::make_partition_presence_checker(lw_shared_ptr<const sstables::sstable_set> sstables) {
|
|
auto sel = make_lw_shared<sstables::sstable_set::incremental_selector>(sstables->make_incremental_selector());
|
|
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
|
|
auto& sst = sel->select(key).sstables;
|
|
if (sst.empty()) {
|
|
return partition_presence_checker_result::definitely_doesnt_exist;
|
|
}
|
|
auto hk = sstables::sstable::make_hashed_key(*_schema, key.key());
|
|
for (auto&& s : sst) {
|
|
if (s->filter_has_key(hk)) {
|
|
return partition_presence_checker_result::maybe_exists;
|
|
}
|
|
}
|
|
return partition_presence_checker_result::definitely_doesnt_exist;
|
|
};
|
|
}
|
|
|
|
max_purgeable_fn table::get_max_purgeable_fn_for_cache_underlying_reader() const {
|
|
return [this](const dht::decorated_key& dk, ::is_shadowable is_shadowable) -> max_purgeable {
|
|
auto& sg = storage_group_for_token(dk.token());
|
|
max_purgeable mp;
|
|
|
|
sg.for_each_compaction_group([&dk, is_shadowable, &mp] (const compaction_group_ptr& cg) {
|
|
mp.combine(cg->memtables()->get_max_purgeable(dk, is_shadowable, cg->max_seen_timestamp()));
|
|
});
|
|
|
|
return mp;
|
|
};
|
|
}
|
|
|
|
snapshot_source
|
|
table::sstables_as_snapshot_source() {
|
|
return snapshot_source([this] () {
|
|
auto sst_set = _sstables;
|
|
return mutation_source([this, sst_set] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& r,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
auto reader = make_sstable_reader(std::move(s), std::move(permit), sst_set, r, slice, std::move(trace_state), fwd, fwd_mr);
|
|
return make_compacting_reader(
|
|
std::move(reader),
|
|
gc_clock::now(),
|
|
get_max_purgeable_fn_for_cache_underlying_reader(),
|
|
get_tombstone_gc_state().with_commitlog_check_disabled(),
|
|
fwd);
|
|
}, [this, sst_set] {
|
|
return make_partition_presence_checker(sst_set);
|
|
});
|
|
});
|
|
}
|
|
|
|
// define in .cc, since sstable is forward-declared in .hh
|
|
table::~table() {
|
|
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
|
|
st.remove_table_from_rf_registry(_schema->id());
|
|
}
|
|
|
|
logalloc::occupancy_stats table::occupancy() const {
|
|
logalloc::occupancy_stats res;
|
|
for_each_compaction_group([&] (const compaction_group& cg) {
|
|
for (auto& m : *const_cast<compaction_group&>(cg).memtables()) {
|
|
res += m->region().occupancy();
|
|
}
|
|
});
|
|
return res;
|
|
}
|
|
|
|
db::replay_position table::highest_flushed_replay_position() const {
|
|
return _highest_flushed_rp;
|
|
}
|
|
|
|
struct snapshot_tablet_info {
|
|
size_t id;
|
|
dht::token first_token, last_token;
|
|
db_clock::time_point repair_time;
|
|
int64_t repaired_at;
|
|
};
|
|
|
|
struct manifest_json : public json::json_base {
|
|
struct info : public json::json_base {
|
|
json::json_element<sstring> version;
|
|
json::json_element<sstring> scope;
|
|
|
|
info() {
|
|
register_params();
|
|
}
|
|
info(const info& e) {
|
|
register_params();
|
|
version = e.version;
|
|
scope = e.scope;
|
|
}
|
|
info& operator=(const info& e) {
|
|
if (this != &e) {
|
|
version = e.version;
|
|
scope = e.scope;
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&version, "version");
|
|
add(&scope, "scope");
|
|
}
|
|
};
|
|
|
|
struct node_info : public json::json_base {
|
|
json::json_element<sstring> host_id;
|
|
json::json_element<sstring> datacenter;
|
|
json::json_element<sstring> rack;
|
|
|
|
node_info() {
|
|
register_params();
|
|
}
|
|
node_info(const node_info& e) {
|
|
register_params();
|
|
host_id = e.host_id;
|
|
datacenter = e.datacenter;
|
|
rack = e.rack;
|
|
}
|
|
node_info& operator=(const node_info& e) {
|
|
if (this != &e) {
|
|
host_id = e.host_id;
|
|
datacenter = e.datacenter;
|
|
rack = e.rack;
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&host_id, "host_id");
|
|
add(&datacenter, "datacenter");
|
|
add(&rack, "rack");
|
|
}
|
|
};
|
|
|
|
struct snapshot_info : public json::json_base {
|
|
json::json_element<sstring> name;
|
|
json::json_element<time_t> created_at;
|
|
json::json_element<time_t> expires_at;
|
|
|
|
snapshot_info() {
|
|
register_params();
|
|
}
|
|
snapshot_info(const snapshot_info& e) {
|
|
register_params();
|
|
name = e.name;
|
|
created_at = e.created_at;
|
|
expires_at = e.expires_at;
|
|
}
|
|
snapshot_info& operator=(const snapshot_info& e) {
|
|
if (this != &e) {
|
|
name = e.name;
|
|
created_at = e.created_at;
|
|
expires_at = e.expires_at;
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&name, "name");
|
|
add(&created_at, "created_at");
|
|
add(&expires_at, "expires_at");
|
|
}
|
|
};
|
|
|
|
struct table_info : public json::json_base {
|
|
json::json_element<sstring> keyspace_name;
|
|
json::json_element<sstring> table_name;
|
|
json::json_element<sstring> table_id;
|
|
json::json_element<sstring> tablets_type;
|
|
json::json_element<size_t> tablet_count;
|
|
|
|
table_info() {
|
|
register_params();
|
|
}
|
|
table_info(const table_info& e) {
|
|
register_params();
|
|
keyspace_name = e.keyspace_name;
|
|
table_name = e.table_name;
|
|
table_id = e.table_id;
|
|
tablets_type = e.tablets_type;
|
|
tablet_count = e.tablet_count;
|
|
}
|
|
table_info& operator=(const table_info& e) {
|
|
if (this != &e) {
|
|
keyspace_name = e.keyspace_name;
|
|
table_name = e.table_name;
|
|
table_id = e.table_id;
|
|
tablets_type = e.tablets_type;
|
|
tablet_count = e.tablet_count;
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&keyspace_name, "keyspace_name");
|
|
add(&table_name, "table_name");
|
|
add(&table_id, "table_id");
|
|
add(&tablets_type, "tablets_type");
|
|
add(&tablet_count, "tablet_count");
|
|
}
|
|
};
|
|
|
|
struct sstable_info : public json::json_base {
|
|
json::json_element<sstring> id;
|
|
json::json_element<sstring> toc_name;
|
|
json::json_element<uint64_t> data_size;
|
|
json::json_element<uint64_t> index_size;
|
|
json::json_element<int64_t> first_token;
|
|
json::json_element<int64_t> last_token;
|
|
json::json_element<uint64_t> tablet_id;
|
|
|
|
sstable_info() {
|
|
register_params();
|
|
}
|
|
sstable_info(const sstables::sstable_snapshot_metadata& e) {
|
|
register_params();
|
|
id = fmt::to_string(e.id);
|
|
toc_name = e.toc_name;
|
|
data_size = e.data_size;
|
|
index_size = e.index_size;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
if (e.tablet_id) {
|
|
tablet_id = *e.tablet_id;
|
|
}
|
|
}
|
|
sstable_info(const sstable_info& e) {
|
|
register_params();
|
|
id = e.id;
|
|
toc_name = e.toc_name;
|
|
data_size = e.data_size;
|
|
index_size = e.index_size;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
tablet_id = e.tablet_id;
|
|
}
|
|
sstable_info(sstable_info&& e) {
|
|
register_params();
|
|
id = e.id;
|
|
toc_name = std::move(e.toc_name);
|
|
data_size = e.data_size;
|
|
index_size = e.index_size;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
tablet_id = e.tablet_id;
|
|
}
|
|
sstable_info& operator=(sstable_info&& e) {
|
|
id = e.id;
|
|
toc_name = std::move(e.toc_name);
|
|
data_size = e.data_size;
|
|
index_size = e.index_size;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
tablet_id = e.tablet_id;
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&id, "id");
|
|
add(&toc_name, "toc_name");
|
|
add(&data_size, "data_size");
|
|
add(&index_size, "index_size");
|
|
add(&first_token, "first_token");
|
|
add(&last_token, "last_token");
|
|
add(&tablet_id, "tablet_id");
|
|
}
|
|
};
|
|
|
|
struct tablet_info : public json::json_base {
|
|
json::json_element<uint64_t> id;
|
|
json::json_element<int64_t> first_token;
|
|
json::json_element<int64_t> last_token;
|
|
json::json_element<time_t> repair_time;
|
|
json::json_element<int64_t> repaired_at;
|
|
|
|
tablet_info() {
|
|
register_params();
|
|
}
|
|
tablet_info(const snapshot_tablet_info& e) {
|
|
register_params();
|
|
id = e.id;
|
|
first_token = dht::token::to_int64(e.first_token);
|
|
last_token = dht::token::to_int64(e.last_token);
|
|
repair_time = db_clock::to_time_t(e.repair_time);
|
|
repaired_at = e.repaired_at;
|
|
}
|
|
tablet_info(const tablet_info& e) {
|
|
register_params();
|
|
id = e.id;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
repair_time = e.repair_time;
|
|
repaired_at = e.repaired_at;
|
|
}
|
|
tablet_info& operator=(tablet_info&& e) {
|
|
id = e.id;
|
|
first_token = e.first_token;
|
|
last_token = e.last_token;
|
|
repair_time = e.repair_time;
|
|
repaired_at = e.repaired_at;
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&id, "id");
|
|
add(&first_token, "first_token");
|
|
add(&last_token, "last_token");
|
|
add(&repair_time, "repair_time");
|
|
add(&repaired_at, "repaired_at");
|
|
}
|
|
};
|
|
|
|
json::json_element<info> manifest;
|
|
json::json_element<node_info> node;
|
|
json::json_element<snapshot_info> snapshot;
|
|
json::json_element<table_info> table;
|
|
json::json_chunked_list<sstable_info> sstables;
|
|
json::json_chunked_list<tablet_info> tablets;
|
|
|
|
manifest_json() {
|
|
register_params();
|
|
}
|
|
manifest_json(manifest_json&& e) {
|
|
register_params();
|
|
manifest = std::move(e.manifest);
|
|
node = std::move(e.node);
|
|
snapshot = std::move(e.snapshot);
|
|
table = std::move(e.table);
|
|
sstables = std::move(e.sstables);
|
|
tablets = std::move(e.tablets);
|
|
}
|
|
manifest_json& operator=(manifest_json&& e) {
|
|
if (this != &e) {
|
|
manifest = std::move(e.manifest);
|
|
node = std::move(e.node);
|
|
snapshot = std::move(e.snapshot);
|
|
table = std::move(e.table);
|
|
sstables = std::move(e.sstables);
|
|
tablets = std::move(e.tablets);
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&manifest, "manifest");
|
|
add(&node, "node");
|
|
add(&snapshot, "snapshot");
|
|
add(&table, "table");
|
|
add(&sstables, "sstables");
|
|
add(&tablets, "tablets");
|
|
}
|
|
};
|
|
|
|
class snapshot_writer {
|
|
public:
|
|
virtual future<> init() = 0;
|
|
virtual future<> sync() = 0;
|
|
virtual future<output_stream<char>> stream_for(sstring component) = 0;
|
|
virtual ~snapshot_writer() = default;
|
|
};
|
|
|
|
using snapshot_sstable_set = foreign_ptr<std::unique_ptr<utils::chunked_vector<sstables::sstable_snapshot_metadata>>>;
|
|
|
|
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, std::vector<snapshot_tablet_info> tablets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
|
|
manifest_json manifest;
|
|
|
|
manifest_json::info info;
|
|
info.version = "1.0";
|
|
info.scope = "node";
|
|
manifest.manifest = std::move(info);
|
|
|
|
manifest_json::node_info node;
|
|
node.host_id = topology.my_host_id().to_sstring();
|
|
const auto& loc = topology.get_location();
|
|
node.datacenter = loc.dc;
|
|
node.rack = loc.rack;
|
|
manifest.node = std::move(node);
|
|
|
|
manifest_json::snapshot_info snapshot;
|
|
snapshot.name = name;
|
|
snapshot.created_at = opts.created_at.time_since_epoch().count();
|
|
if (opts.expires_at) {
|
|
snapshot.expires_at = opts.expires_at->time_since_epoch().count();
|
|
}
|
|
manifest.snapshot = std::move(snapshot);
|
|
|
|
manifest_json::table_info table;
|
|
table.keyspace_name = schema->ks_name();
|
|
table.table_name = schema->cf_name();
|
|
table.table_id = to_sstring(schema->id());
|
|
table.tablets_type = tablet_count ? "powof2" : "none";
|
|
table.tablet_count = tablet_count.value_or(0);
|
|
manifest.table = std::move(table);
|
|
|
|
for (const auto& fsp : sstable_sets) {
|
|
for (auto& md : *fsp) {
|
|
manifest.sstables.push(manifest_json::sstable_info(md));
|
|
}
|
|
}
|
|
|
|
for (const auto& sti : tablets) {
|
|
manifest.tablets.push(manifest_json::tablet_info(sti));
|
|
}
|
|
|
|
auto streamer = json::stream_object(std::move(manifest));
|
|
auto out = co_await writer.stream_for("manifest.json");
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await streamer(std::move(out));
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
/*!
|
|
* \brief write the schema to a 'schema.cql' file at the given directory.
|
|
*
|
|
* When doing a snapshot, the snapshot directory contains a 'schema.cql' file
|
|
* with a CQL command that can be used to generate the schema.
|
|
* The content is is similar to the result of the CQL DESCRIBE command of the table.
|
|
*
|
|
* When a schema has indexes, local indexes or views, those indexes and views
|
|
* are represented by their own schemas.
|
|
* In those cases, the method would write the relevant information for each of the schemas:
|
|
*
|
|
* The schema of the base table would output a file with the CREATE TABLE command
|
|
* and the schema of the view that is used for the index would output a file with the
|
|
* CREATE INDEX command.
|
|
* The same is true for local index and MATERIALIZED VIEW.
|
|
*/
|
|
static future<> write_schema_as_cql(snapshot_writer& writer, cql3::description schema_desc) {
|
|
auto schema_description = std::move(*schema_desc.create_statement);
|
|
auto out = co_await writer.stream_for("schema.cql");
|
|
std::exception_ptr ex;
|
|
|
|
auto view = managed_bytes_view(schema_description.as_managed_bytes());
|
|
|
|
try {
|
|
for (auto&& fragment : fragment_range(view)) {
|
|
auto sv = to_string_view(fragment);
|
|
co_await out.write(sv.data(), sv.size());
|
|
}
|
|
co_await out.flush();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await out.close();
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
class local_snapshot_writer : public snapshot_writer {
|
|
std::filesystem::path _dir;
|
|
db::snapshot_options _opts;
|
|
|
|
public:
|
|
local_snapshot_writer(std::filesystem::path dir, sstring name, db::snapshot_options opts)
|
|
: _dir(dir / sstables::snapshots_dir / name)
|
|
, _opts(std::move(opts))
|
|
{}
|
|
future<> init() override {
|
|
co_await io_check([this] { return recursive_touch_directory(_dir.native()); });
|
|
}
|
|
future<> sync() override {
|
|
co_await io_check([this] { return sync_directory(_dir.native()); });
|
|
}
|
|
future<output_stream<char>> stream_for(sstring component) override {
|
|
auto file_name = (_dir / component).native();
|
|
auto f = co_await open_checked_file_dma(general_disk_error_handler, file_name, open_flags::wo | open_flags::create | open_flags::truncate);
|
|
co_return co_await make_file_output_stream(std::move(f));
|
|
}
|
|
};
|
|
|
|
// Runs the orchestration code on an arbitrary shard to balance the load.
|
|
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
|
|
auto writer = std::visit(overloaded_functor{
|
|
[&name, &opts] (const data_dictionary::storage_options::local& loc) -> std::unique_ptr<snapshot_writer> {
|
|
if (loc.dir.empty()) {
|
|
// virtual tables don't have initialized local storage
|
|
return nullptr;
|
|
}
|
|
|
|
return std::make_unique<local_snapshot_writer>(loc.dir, name, opts);
|
|
},
|
|
[] (const data_dictionary::storage_options::s3&) -> std::unique_ptr<snapshot_writer> {
|
|
throw std::runtime_error("Snapshotting non-local tables is not implemented");
|
|
}
|
|
}, table_shards->get_storage_options().value);
|
|
if (!writer) {
|
|
co_return;
|
|
}
|
|
|
|
auto orchestrator = std::hash<sstring>()(name) % smp::count;
|
|
co_await smp::submit_to(orchestrator, [&] () -> future<> {
|
|
auto& t = *table_shards;
|
|
auto s = t.schema();
|
|
tlogger.debug("Taking snapshot of {}.{}: name={}", s->ks_name(), s->cf_name(), name);
|
|
|
|
std::vector<snapshot_sstable_set> sstable_sets(smp::count);
|
|
|
|
co_await writer->init();
|
|
co_await smp::invoke_on_all([&] -> future<> {
|
|
auto& t = *table_shards;
|
|
auto [tables, permit] = co_await t.snapshot_sstables();
|
|
auto sstables_metadata = co_await t.get_sstables_manager().take_snapshot(std::move(tables), name);
|
|
sstable_sets[this_shard_id()] = make_foreign(std::make_unique<utils::chunked_vector<sstables::sstable_snapshot_metadata>>(std::move(sstables_metadata)));
|
|
});
|
|
co_await writer->sync();
|
|
|
|
std::exception_ptr ex;
|
|
|
|
tlogger.debug("snapshot {}: writing schema.cql", name);
|
|
auto schema_desc = s->describe(replica::make_schema_describe_helper(table_shards), cql3::describe_option::STMTS);
|
|
co_await write_schema_as_cql(*writer, std::move(schema_desc)).handle_exception([&] (std::exception_ptr ptr) {
|
|
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", name, ptr);
|
|
ex = std::move(ptr);
|
|
});
|
|
tlogger.debug("snapshot {}: seal_snapshot", name);
|
|
const auto& topology = sharded_db.local().get_token_metadata().get_topology();
|
|
std::optional<int64_t> tablet_count;
|
|
std::vector<snapshot_tablet_info> tablets;
|
|
std::unordered_set<size_t> tids;
|
|
if (t.uses_tablets()) {
|
|
auto erm = t.get_effective_replication_map();
|
|
auto& tm = erm->get_token_metadata().tablets().get_tablet_map(s->id());
|
|
tablet_count = tm.tablet_count();
|
|
for (auto& ssts : sstable_sets) {
|
|
for (auto& sst : *ssts) {
|
|
auto tok = sst.first_token;
|
|
auto tid = tm.get_tablet_id(dht::token::from_int64(tok));
|
|
sst.tablet_id = tid.id;
|
|
if (tids.emplace(tid.id).second) {
|
|
auto& tinfo = tm.get_tablet_info(tid);
|
|
tablets.emplace_back(snapshot_tablet_info{
|
|
.id = tid.id,
|
|
.first_token = tm.get_first_token(tid),
|
|
.last_token = tm.get_last_token(tid),
|
|
.repair_time = tinfo.repair_time,
|
|
.repaired_at = tinfo.sstables_repaired_at,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s, tablet_count).handle_exception([&] (std::exception_ptr ptr) {
|
|
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
|
|
ex = std::move(ptr);
|
|
});
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
|
|
co_await writer->sync();
|
|
});
|
|
}
|
|
|
|
future<std::pair<std::vector<sstables::shared_sstable>, table::sstable_list_permit>> table::snapshot_sstables() {
|
|
auto permit = co_await get_sstable_list_permit();
|
|
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
|
|
co_return std::make_pair(std::move(tables), std::move(permit));
|
|
}
|
|
|
|
future<bool> table::snapshot_exists(sstring tag) {
|
|
auto* so = std::get_if<storage_options::local>(&_storage_opts->value);
|
|
if (so == nullptr || so->dir.empty()) {
|
|
co_return false; // Technically it doesn't as snapshots only work for local storage
|
|
}
|
|
|
|
sstring jsondir = (so->dir / sstables::snapshots_dir / tag).native();
|
|
bool exists = false;
|
|
try {
|
|
future<stat_data> (&file_stat)(std::string_view, follow_symlink) noexcept = seastar::file_stat;
|
|
auto sd = co_await io_check(file_stat, jsondir, follow_symlink::no);
|
|
if (sd.type != directory_entry_type::directory) {
|
|
throw std::error_code(ENOTDIR, std::system_category());
|
|
}
|
|
exists = true;
|
|
} catch (std::system_error& e) {
|
|
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
|
throw;
|
|
}
|
|
}
|
|
co_return exists;
|
|
}
|
|
|
|
future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot_details() {
|
|
return seastar::async([this] {
|
|
std::unordered_map<sstring, snapshot_details> all_snapshots;
|
|
auto* so = std::get_if<storage_options::local>(&_storage_opts->value);
|
|
if (so == nullptr || so->dir.empty()) {
|
|
return all_snapshots;
|
|
}
|
|
|
|
auto datadirs = _sstables_manager.get_local_directories(*so);
|
|
for (auto& datadir : datadirs) {
|
|
fs::path snapshots_dir = datadir / sstables::snapshots_dir;
|
|
auto file_exists = io_check([&snapshots_dir] { return seastar::file_exists(snapshots_dir.native()); }).get();
|
|
if (!file_exists) {
|
|
continue;
|
|
}
|
|
|
|
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
|
auto close_lister = deferred_close(lister);
|
|
while (auto de = lister.get().get()) {
|
|
auto snapshot_name = de->name;
|
|
all_snapshots.emplace(snapshot_name, snapshot_details());
|
|
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
|
|
auto& sd = all_snapshots.at(snapshot_name);
|
|
sd.total += details.total;
|
|
sd.live += details.live;
|
|
utils::get_local_injector().inject("get_snapshot_details", [&] (auto& handler) -> future<> {
|
|
throw std::runtime_error("Injected exception in get_snapshot_details");
|
|
}).get();
|
|
}
|
|
}
|
|
return all_snapshots;
|
|
});
|
|
}
|
|
|
|
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
|
|
table::snapshot_details details{};
|
|
file snapshot_directory = co_await io_check(open_directory, snapshot_dir.native());
|
|
file data_directory = co_await io_check(open_directory, datadir.native());
|
|
file staging_directory;
|
|
std::optional<fs::path> staging_dir = datadir / sstables::staging_dir;
|
|
if (!co_await file_exists(staging_dir->native())) {
|
|
staging_dir.reset();
|
|
} else {
|
|
staging_directory = co_await io_check(open_directory, staging_dir->native());
|
|
}
|
|
|
|
auto lister = directory_lister(snapshot_directory, snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
std::exception_ptr ex;
|
|
try {
|
|
while (auto de = co_await lister.get()) {
|
|
const auto& name = de->name;
|
|
future<stat_data> (&file_stat)(file& directory, std::string_view name, follow_symlink) noexcept = seastar::file_stat;
|
|
auto sd = co_await io_check(file_stat, snapshot_directory, name, follow_symlink::no);
|
|
auto size = sd.allocated_size;
|
|
|
|
utils::get_local_injector().inject("per-snapshot-get_snapshot_details", [&] (auto& handler) -> future<> {
|
|
throw std::runtime_error("Injected exception in per-snapshot-get_snapshot_details");
|
|
}).get();
|
|
|
|
// The manifest and schema.cql files are the only files expected to be in this directory not belonging to the SSTable.
|
|
//
|
|
// All the others should just generate an exception: there is something wrong, so don't blindly
|
|
// add it to the size.
|
|
if (name != "manifest.json" && name != "schema.cql") {
|
|
details.total += size;
|
|
if (sd.number_of_links == 1) {
|
|
// File exists only in the snapshot directory.
|
|
details.live += size;
|
|
continue;
|
|
}
|
|
// If the number of links is greater than 1, it is still possible that the file is linked to another snapshot
|
|
// So check the datadir for the file too.
|
|
} else {
|
|
continue;
|
|
}
|
|
|
|
auto exists_in_dir = [&] (file& dir, const fs::path& path, std::string_view name) -> future<bool> {
|
|
try {
|
|
// File exists in the main SSTable directory. Snapshots are not contributing to size
|
|
auto psd = co_await io_check(file_stat, dir, name, follow_symlink::no);
|
|
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
|
|
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
|
|
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
|
|
(path / name).native(), psd.device_id, psd.inode_number, psd.size,
|
|
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
|
|
co_return false;
|
|
}
|
|
co_return true;
|
|
} catch (std::system_error& e) {
|
|
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
|
throw;
|
|
}
|
|
co_return false;
|
|
}
|
|
};
|
|
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
|
|
if ((!staging_dir || !co_await exists_in_dir(staging_directory, *staging_dir, name)) &&
|
|
!co_await exists_in_dir(data_directory, datadir, name)) {
|
|
details.live += size;
|
|
}
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await lister.close();
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
|
|
co_return details;
|
|
}
|
|
|
|
future<> compaction_group::flush() noexcept {
|
|
try {
|
|
return _memtables->flush();
|
|
} catch (...) {
|
|
return current_exception_as_future<>();
|
|
}
|
|
}
|
|
|
|
future<> storage_group::flush() noexcept {
|
|
for (auto& cg : compaction_groups_immediate()) {
|
|
co_await cg->flush();
|
|
}
|
|
}
|
|
|
|
bool compaction_group::can_flush() const {
|
|
return _memtables->can_flush();
|
|
}
|
|
|
|
bool compaction_group::needs_flush() const {
|
|
return _memtables->needs_flush();
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>& compaction_group::memtables() noexcept {
|
|
return _memtables;
|
|
}
|
|
|
|
size_t compaction_group::memtable_count() const noexcept {
|
|
return _memtables->size();
|
|
}
|
|
|
|
size_t storage_group::memtable_count() const {
|
|
size_t count = 0;
|
|
for_each_compaction_group([&count] (const compaction_group_ptr& cg) {
|
|
count += cg->memtable_count();
|
|
});
|
|
return count;
|
|
}
|
|
|
|
future<> table::flush(std::optional<db::replay_position> pos) {
|
|
if (pos && *pos < _flush_rp) {
|
|
co_return;
|
|
}
|
|
// There is nothing to flush if the table was stopped.
|
|
if (_pending_flushes_phaser.is_closed()) {
|
|
co_return;
|
|
}
|
|
auto op = _pending_flushes_phaser.start();
|
|
auto fp = _highest_rp;
|
|
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::flush));
|
|
_flush_rp = std::max(_flush_rp, fp);
|
|
}
|
|
|
|
bool storage_group::can_flush() const {
|
|
return std::ranges::any_of(compaction_groups_immediate(), std::mem_fn(&compaction_group::can_flush));
|
|
}
|
|
|
|
bool table::can_flush() const {
|
|
return std::ranges::any_of(storage_groups() | std::views::values, std::mem_fn(&storage_group::can_flush));
|
|
}
|
|
|
|
bool storage_group::needs_flush() const {
|
|
return std::ranges::any_of(compaction_groups_immediate(), std::mem_fn(&compaction_group::needs_flush));
|
|
}
|
|
|
|
bool table::needs_flush() const {
|
|
return std::ranges::any_of(storage_groups() | std::views::values, std::mem_fn(&storage_group::needs_flush));
|
|
}
|
|
|
|
future<> compaction_group::clear_memtables() {
|
|
if (_t.commitlog()) {
|
|
for (auto& t : *_memtables) {
|
|
_t.commitlog()->discard_completed_segments(_t.schema()->id(), t->get_and_discard_rp_set());
|
|
}
|
|
}
|
|
auto old_memtables = _memtables->clear_and_add();
|
|
for (auto& smt : old_memtables) {
|
|
co_await smt->clear_gently();
|
|
}
|
|
}
|
|
|
|
future<> table::clear() {
|
|
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
|
|
|
|
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::clear_memtables));
|
|
|
|
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
|
}
|
|
|
|
bool storage_group::compaction_disabled() const {
|
|
// Compaction group that has been stopped will be excluded, since the group will not be available for a caller
|
|
// to disable compaction explicitly on it, e.g. on truncate, and the caller might want to perform a check
|
|
// that compaction was disabled on all groups. Stopping a group is equivalent to disabling compaction on it.
|
|
bool all_disabled = true;
|
|
for_each_compaction_group([&all_disabled] (const compaction_group_ptr& cg) {
|
|
all_disabled &= cg->stopped() || cg->compaction_disabled();
|
|
});
|
|
return all_disabled;
|
|
}
|
|
|
|
// NOTE: does not need to be futurized, but might eventually, depending on
|
|
// if we implement notifications, whatnot.
|
|
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
|
|
// truncate_table_on_all_shards() disables compaction for the truncated
|
|
// tables and views, so we normally expect compaction to be disabled on
|
|
// this table. But as shown in issue #17543, it is possible that a new
|
|
// materialized view was created right after truncation started, and it
|
|
// would not have compaction disabled when this function is called on it.
|
|
if (!schema()->is_view()) {
|
|
// Check if the storage groups have compaction disabled, but also check if they have been stopped.
|
|
// This is to avoid races with tablet cleanup which stops the storage group, and then stops the
|
|
// compaction groups. We could have a situation where compaction couldn't have been disabled by
|
|
// truncate because the storage group has been stopped, but the compaction groups have not yet been stopped.
|
|
auto compaction_disabled = std::ranges::all_of(storage_groups() | std::views::values, [] (const storage_group_ptr& sgp) {
|
|
return sgp->async_gate().is_closed() || sgp->compaction_disabled();
|
|
});
|
|
if (!compaction_disabled) {
|
|
utils::on_internal_error(fmt::format("compaction not disabled on table {}.{} during TRUNCATE",
|
|
schema()->ks_name(), schema()->cf_name()));
|
|
}
|
|
}
|
|
|
|
db::replay_position rp;
|
|
struct removed_sstable {
|
|
compaction_group& cg;
|
|
sstables::shared_sstable sst;
|
|
replica::enable_backlog_tracker enable_backlog_tracker;
|
|
};
|
|
std::vector<removed_sstable> remove;
|
|
|
|
_stats.pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_stats.pending_sstable_deletions--;
|
|
});
|
|
|
|
auto permit = co_await get_sstable_list_permit();
|
|
|
|
co_await _cache.invalidate(row_cache::external_updater([this, &rp, &remove, truncated_at] {
|
|
// FIXME: the following isn't exception safe.
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
|
|
auto pruned = make_lw_shared<sstables::sstable_set>(cg.make_main_sstable_set());
|
|
auto maintenance_pruned = cg.make_maintenance_sstable_set();
|
|
|
|
auto prune = [&] (lw_shared_ptr<sstables::sstable_set>& pruned,
|
|
const lw_shared_ptr<sstables::sstable_set>& pruning,
|
|
replica::enable_backlog_tracker enable_backlog_tracker) mutable {
|
|
pruning->for_each_sstable([&] (const sstables::shared_sstable& p) mutable {
|
|
if (p->max_data_age() <= truncated_at) {
|
|
if (p->originated_on_this_node().value_or(false) && p->get_stats_metadata().position.shard_id() == this_shard_id()) {
|
|
rp = std::max(p->get_stats_metadata().position, rp);
|
|
}
|
|
remove.emplace_back(removed_sstable{cg, p, enable_backlog_tracker});
|
|
return;
|
|
}
|
|
pruned->insert(p);
|
|
});
|
|
};
|
|
prune(pruned, cg.main_sstables(), enable_backlog_tracker::yes);
|
|
prune(maintenance_pruned, cg.maintenance_sstables(), enable_backlog_tracker::no);
|
|
|
|
cg.set_main_sstables(std::move(pruned));
|
|
cg.set_maintenance_sstables(std::move(maintenance_pruned));
|
|
});
|
|
refresh_compound_sstable_set();
|
|
tlogger.debug("cleaning out row cache");
|
|
}));
|
|
rebuild_statistics();
|
|
|
|
std::vector<sstables::shared_sstable> del;
|
|
del.reserve(remove.size());
|
|
for (auto& r : remove) {
|
|
if (r.enable_backlog_tracker) {
|
|
remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst);
|
|
}
|
|
erase_sstable_cleanup_state(r.sst);
|
|
del.emplace_back(r.sst);
|
|
};
|
|
co_await delete_sstables_atomically(permit, std::move(del));
|
|
co_return rp;
|
|
}
|
|
|
|
future<> table::discard_logstor_segments() {
|
|
if (!uses_logstor()) {
|
|
co_return;
|
|
}
|
|
|
|
_logstor_index->clear();
|
|
|
|
co_await parallel_foreach_compaction_group([] (compaction_group& cg) {
|
|
return cg.discard_logstor_segments();
|
|
});
|
|
}
|
|
|
|
void table::mark_ready_for_writes(db::commitlog* cl) {
|
|
if (!_readonly) {
|
|
on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
if (_config.enable_commitlog) {
|
|
_commitlog = cl;
|
|
}
|
|
_readonly = false;
|
|
}
|
|
|
|
void table::init_logstor(logstor::logstor* ls) {
|
|
_logstor = ls;
|
|
_logstor_index = std::make_unique<logstor::primary_index>(_schema);
|
|
}
|
|
|
|
size_t table::get_logstor_memory_usage() const {
|
|
size_t m = 0;
|
|
if (_logstor_index) {
|
|
m += _logstor_index->get_memory_usage();
|
|
}
|
|
return m;
|
|
}
|
|
|
|
db::commitlog* table::commitlog() const {
|
|
if (_readonly) [[unlikely]] {
|
|
on_internal_error(dblog, ::format("table {}.{} is readonly", _schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
return _commitlog;
|
|
}
|
|
|
|
void table::set_schema(schema_ptr s) {
|
|
SCYLLA_ASSERT(s->is_counter() == _schema->is_counter());
|
|
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",
|
|
_schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version());
|
|
|
|
_flush_timer.cancel();
|
|
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
for (auto& m: *cg.memtables()) {
|
|
m->set_schema(s);
|
|
}
|
|
});
|
|
|
|
_cache.set_schema(s);
|
|
if (_counter_cell_locks) {
|
|
_counter_cell_locks->set_schema(s);
|
|
}
|
|
if (_logstor_index) {
|
|
_logstor_index->set_schema(s);
|
|
}
|
|
_schema = std::move(s);
|
|
|
|
for (auto&& v : _views) {
|
|
v->view_info()->reset_view_info();
|
|
if (auto reverse_schema = local_schema_registry().get_or_null(reversed(v->version()))) {
|
|
reverse_schema->view_info()->reset_view_info();
|
|
}
|
|
}
|
|
|
|
set_compaction_strategy(_schema->compaction_strategy());
|
|
trigger_compaction();
|
|
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.rearm(timer<lowres_clock>::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
}
|
|
|
|
static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, const view_ptr& v) {
|
|
return std::find_if(views.begin(), views.end(), [&v] (auto&& e) {
|
|
return e->id() == v->id();
|
|
});
|
|
}
|
|
|
|
void table::add_or_update_view(view_ptr v) {
|
|
auto existing = find_view(_views, v);
|
|
if (existing != _views.end()) {
|
|
*existing = std::move(v);
|
|
} else {
|
|
_views.push_back(std::move(v));
|
|
}
|
|
}
|
|
|
|
void table::remove_view(view_ptr v) {
|
|
auto existing = find_view(_views, v);
|
|
if (existing != _views.end()) {
|
|
_views.erase(existing);
|
|
}
|
|
}
|
|
|
|
void table::clear_views() {
|
|
_views.clear();
|
|
}
|
|
|
|
const std::vector<view_ptr>& table::views() const {
|
|
return _views;
|
|
}
|
|
|
|
std::vector<view_ptr> table::affected_views(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base, const mutation& update) const {
|
|
//FIXME: Avoid allocating a vector here; consider returning the boost iterator.
|
|
return std::ranges::to<std::vector<view_ptr>>(_views | std::views::filter([&] (auto&& view) {
|
|
return db::view::partition_key_matches(gen->get_db().as_data_dictionary(), *base, *view->view_info(), update.decorated_key());
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Shard-local locking of clustering rows or entire partitions of the base
|
|
* table during a Materialized-View read-modify-update:
|
|
*
|
|
* Consider that two concurrent base-table updates set column C, a column
|
|
* added to a view's primary key, to two different values - V1 and V2.
|
|
* Say that that before the updates, C's value was V0. Both updates may remove
|
|
* from the view the old row with V0, one will add a view row with V1 and the
|
|
* second will add a view row with V2, and we end up with two rows, with the
|
|
* two different values, instead of just one row with the last value.
|
|
*
|
|
* The solution is to lock the base row which we read to ensure atomic read-
|
|
* modify-write to the view table: Under one locked section, the row with V0
|
|
* is deleted and a new one with V1 is created, and then under a second locked
|
|
* section the row with V1 is deleted and a new one with V2 is created.
|
|
* Note that the lock is node-local (and in fact shard-local) and the locked
|
|
* section doesn't include the view table modifications - it includes just the
|
|
* read and the creation of the update commands - commands which will
|
|
* eventually be sent to the view replicas.
|
|
*
|
|
* We need to lock a base-table row even if an update does not modify the
|
|
* view's new key column C: Consider an update that only updates a non-key
|
|
* column (but also in the view) D. We still need to read the current base row
|
|
* to retrieve the view row's current key (column C), and then write the
|
|
* modification to *that* view row. Having several such modifications in
|
|
* parallel is fine. What is not fine is to have in parallel a modification
|
|
* of the value of C. So basically we need a reader-writer lock (a.k.a.
|
|
* shared-exclusive lock) on base rows:
|
|
* 1. Updates which do not modify the view's key column take a reader lock
|
|
* on the base row.
|
|
* 2. Updates which do modify the view's key column take a writer lock.
|
|
*
|
|
* Further complicating matters is that some operations involve multiple
|
|
* base rows - such as a deletion of an entire partition or a range of rows.
|
|
* In that case, we should lock the entire partition, and forbid parallel
|
|
* work on the same partition or one of its rows. We can do this with a
|
|
* read-writer lock on base partitions:
|
|
* 1. Before we lock a row (as described above), we lock its partition key
|
|
* with the reader lock.
|
|
* 2. When an operation involves an entire partition (or range of rows),
|
|
* we lock the partition key with a writer lock.
|
|
*
|
|
* If an operation involves only a range of rows, not an entire partition,
|
|
* we could in theory lock only this range and not an entire partition.
|
|
* However, we expect this case to be rare enough to not care about and we
|
|
* currently just lock the entire partition.
|
|
*
|
|
* If a base table has *multiple* views, we still read the base table row
|
|
* only once, and have to keep a lock around this read and all the view
|
|
* updates generation. This lock needs to be the strictest of the above -
|
|
* i.e., if a column is modified which is not part of one view's key but is
|
|
* part of a second view's key - we should lock the base row with the
|
|
* stricter writer lock, not a reader lock.
|
|
*/
|
|
future<row_locker::lock_holder>
|
|
table::local_base_lock(
|
|
const schema_ptr& s,
|
|
const dht::decorated_key& pk,
|
|
const query::clustering_row_ranges& rows,
|
|
db::timeout_clock::time_point timeout) const {
|
|
// FIXME: Optimization:
|
|
// Below we always pass "true" to the lock functions and take an exclusive
|
|
// lock on the affected row or partition. But as explained above, if all
|
|
// the modified columns are not key columns in *any* of the views, and
|
|
// shared lock is enough. We should test for this case and pass false.
|
|
// This will allow more parallelism in concurrent modifications to the
|
|
// same row - probably not a very urgent case.
|
|
_row_locker.upgrade(s);
|
|
if (rows.size() == 1 && rows[0].is_singular() && rows[0].start() && !rows[0].start()->value().is_empty(*s)) {
|
|
// A single clustering row is involved.
|
|
return _row_locker.lock_ck(pk, rows[0].start()->value(), true, timeout, _row_locker_stats);
|
|
} else {
|
|
// More than a single clustering row is involved. Most commonly it's
|
|
// the entire partition, so let's lock the entire partition. We could
|
|
// lock less than the entire partition in more elaborate cases where
|
|
// just a few individual rows are involved, or row ranges, but we
|
|
// don't think this will make a practical difference.
|
|
return _row_locker.lock_pk(pk, true, timeout, _row_locker_stats);
|
|
}
|
|
}
|
|
|
|
const ssize_t new_reader_base_cost{16 * 1024};
|
|
|
|
size_t table::estimate_read_memory_cost() const {
|
|
return new_reader_base_cost;
|
|
}
|
|
|
|
void table::set_hit_rate(locator::host_id addr, cache_temperature rate) {
|
|
auto& e = _cluster_cache_hit_rates[addr];
|
|
e.rate = rate;
|
|
e.last_updated = lowres_clock::now();
|
|
}
|
|
|
|
table::cache_hit_rate table::get_my_hit_rate() const {
|
|
return cache_hit_rate { _global_cache_hit_rate, lowres_clock::now()};
|
|
}
|
|
|
|
table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, locator::host_id addr) {
|
|
if (gossiper.my_host_id() == addr) {
|
|
return get_my_hit_rate();
|
|
}
|
|
auto it = _cluster_cache_hit_rates.find(addr);
|
|
if (it == _cluster_cache_hit_rates.end()) {
|
|
// no data yet, get it from the gossiper
|
|
auto eps = gossiper.get_endpoint_state_ptr(addr);
|
|
if (eps) {
|
|
auto* state = eps->get_application_state_ptr(gms::application_state::CACHE_HITRATES);
|
|
float f = -1.0f; // missing state means old node
|
|
if (state) {
|
|
const auto me = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
|
const auto& value = state->value();
|
|
const auto i = value.find(me);
|
|
if (i != sstring::npos) {
|
|
f = strtof(&value[i + me.size() + 1], nullptr);
|
|
} else {
|
|
f = 0.0f; // empty state means that node has rebooted
|
|
}
|
|
set_hit_rate(addr, cache_temperature(f));
|
|
return cache_hit_rate{cache_temperature(f), lowres_clock::now()};
|
|
}
|
|
}
|
|
return cache_hit_rate {cache_temperature(0.0f), lowres_clock::now()};
|
|
} else {
|
|
return it->second;
|
|
}
|
|
}
|
|
|
|
void table::drop_hit_rate(locator::host_id addr) {
|
|
_cluster_cache_hit_rates.erase(addr);
|
|
}
|
|
|
|
void
|
|
table::check_valid_rp(const db::replay_position& rp) const {
|
|
if (rp != db::replay_position() && rp < _lowest_allowed_rp) {
|
|
throw mutation_reordered_with_truncate_exception();
|
|
}
|
|
}
|
|
|
|
db::replay_position table::set_low_replay_position_mark() {
|
|
_lowest_allowed_rp = _highest_rp;
|
|
return _lowest_allowed_rp;
|
|
}
|
|
|
|
template<typename... Args>
|
|
void table::do_apply(compaction_group& cg, db::rp_handle&& h, Args&&... args) {
|
|
utils::latency_counter lc;
|
|
_stats.writes.set_latency(lc);
|
|
db::replay_position rp = h;
|
|
check_valid_rp(rp);
|
|
try {
|
|
cg.memtables()->active_memtable().apply(std::forward<Args>(args)..., std::move(h));
|
|
_highest_rp = std::max(_highest_rp, rp);
|
|
} catch (...) {
|
|
_failed_counter_applies_to_memtable++;
|
|
throw;
|
|
}
|
|
_stats.writes.mark(lc);
|
|
}
|
|
|
|
api::timestamp_type table::get_max_timestamp_for_tablet(locator::tablet_id tid) const {
|
|
return std::ranges::max(storage_group_for_id(tid.value()).compaction_groups_immediate()
|
|
| std::views::transform([](const compaction_group_ptr& cg_ptr) {
|
|
return std::max(cg_ptr->max_seen_timestamp(), cg_ptr->max_memtable_timestamp());
|
|
}));
|
|
}
|
|
|
|
future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
|
|
if (_virtual_writer) [[unlikely]] {
|
|
return (*_virtual_writer)(freeze(m));
|
|
}
|
|
|
|
auto& cg = compaction_group_for_token(m.token());
|
|
auto holder = cg.async_gate().hold();
|
|
|
|
if (_logstor) [[unlikely]] {
|
|
return _logstor->write(m, cg, std::move(holder));
|
|
}
|
|
|
|
return dirty_memory_region_group().run_when_memory_available([this, &m, h = std::move(h), &cg, holder = std::move(holder)] () mutable {
|
|
do_apply(cg, std::move(h), m);
|
|
}, timeout);
|
|
}
|
|
|
|
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&);
|
|
|
|
future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
|
|
if (_virtual_writer) [[unlikely]] {
|
|
return (*_virtual_writer)(m);
|
|
}
|
|
|
|
auto& cg = compaction_group_for_key(m.key(), m_schema);
|
|
auto holder = cg.async_gate().hold();
|
|
|
|
if (_logstor) [[unlikely]] {
|
|
return _logstor->write(m.unfreeze(m_schema), cg, std::move(holder));
|
|
}
|
|
|
|
return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable {
|
|
do_apply(cg, std::move(h), m, m_schema);
|
|
}, timeout);
|
|
}
|
|
|
|
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&);
|
|
|
|
future<>
|
|
write_memtable_to_sstable(mutation_reader reader,
|
|
memtable& mt, sstables::shared_sstable sst,
|
|
size_t estimated_partitions,
|
|
sstables::write_monitor& monitor,
|
|
sstables::sstable_writer_config& cfg) {
|
|
cfg.replay_position = mt.replay_position();
|
|
cfg.monitor = &monitor;
|
|
cfg.origin = "memtable";
|
|
schema_ptr s = reader.schema();
|
|
return sst->write_components(std::move(reader), estimated_partitions, s, cfg, mt.get_encoding_stats());
|
|
}
|
|
|
|
future<>
|
|
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) {
|
|
auto cfg = sst->manager().configure_writer("memtable");
|
|
auto monitor = replica::permit_monitor(make_lw_shared(sstable_write_permit::unconditional()));
|
|
auto semaphore = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable",
|
|
reader_concurrency_semaphore::register_metrics::no);
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {});
|
|
auto reader = mt.make_flush_reader(mt.schema(), std::move(permit));
|
|
co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await semaphore.stop();
|
|
if (ex) {
|
|
std::rethrow_exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<lw_shared_ptr<query::result>>
|
|
table::query(schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
query::result_options opts,
|
|
const dht::partition_range_vector& partition_ranges,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_limiter& memory_limiter,
|
|
db::timeout_clock::time_point timeout,
|
|
std::optional<querier>* saved_querier) {
|
|
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
|
|
co_return make_lw_shared<query::result>();
|
|
}
|
|
|
|
const auto table_async_gate_holder = _async_gate.hold();
|
|
utils::latency_counter lc;
|
|
_stats.reads.set_latency(lc);
|
|
|
|
auto finally = defer([&] () noexcept {
|
|
_stats.reads.mark(lc);
|
|
});
|
|
|
|
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
|
|
auto accounter = co_await (opts.request == query::result_request::only_digest
|
|
? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed)
|
|
: memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed));
|
|
|
|
query_state qs(query_schema, cmd, opts, partition_ranges, std::move(accounter));
|
|
|
|
std::optional<querier> querier_opt;
|
|
if (saved_querier) {
|
|
querier_opt = std::move(*saved_querier);
|
|
}
|
|
|
|
co_await utils::get_local_injector().inject("replica_query_wait", [&] (auto& handler) -> future<> {
|
|
auto table_name = handler.template get<std::string_view>("table");
|
|
if (table_name && *table_name == _schema->cf_name()) {
|
|
tlogger.info("replica_query_wait: waiting");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes(5));
|
|
}
|
|
});
|
|
|
|
while (!qs.done()) {
|
|
auto&& range = *qs.current_partition_range++;
|
|
|
|
if (!querier_opt) {
|
|
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
|
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_tombstone_gc_state(), conf);
|
|
}
|
|
auto& q = *querier_opt;
|
|
|
|
future<> fut = co_await coroutine::as_future(q.consume_page(query_result_builder(*query_schema, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, trace_state));
|
|
|
|
if (fut.failed() || !qs.done()) {
|
|
co_await q.close();
|
|
querier_opt = {};
|
|
}
|
|
if (fut.failed()) {
|
|
co_return coroutine::exception(fut.get_exception());
|
|
}
|
|
}
|
|
|
|
std::optional<full_position> last_pos;
|
|
if (querier_opt) {
|
|
if (querier_opt->current_position()) {
|
|
last_pos.emplace(*querier_opt->current_position());
|
|
}
|
|
if (!saved_querier || (!querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
|
|
co_await querier_opt->close();
|
|
querier_opt = {};
|
|
}
|
|
}
|
|
if (saved_querier) {
|
|
*saved_querier = std::move(querier_opt);
|
|
}
|
|
|
|
co_return make_lw_shared<query::result>(qs.builder.build(std::move(last_pos)));
|
|
}
|
|
|
|
future<reconcilable_result>
|
|
table::mutation_query(schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
const dht::partition_range& range,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_accounter accounter,
|
|
db::timeout_clock::time_point timeout,
|
|
bool tombstone_gc_enabled,
|
|
std::optional<querier>* saved_querier) {
|
|
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
|
|
co_return reconcilable_result();
|
|
}
|
|
|
|
const auto table_async_gate_holder = _async_gate.hold();
|
|
|
|
std::optional<querier> querier_opt;
|
|
if (saved_querier) {
|
|
querier_opt = std::move(*saved_querier);
|
|
}
|
|
if (!querier_opt) {
|
|
auto tombstone_gc_state = tombstone_gc_enabled ? get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
|
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
|
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, tombstone_gc_state, conf);
|
|
}
|
|
auto& q = *querier_opt;
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
auto rrb = reconcilable_result_builder(*query_schema, cmd.slice, std::move(accounter));
|
|
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, trace_state);
|
|
|
|
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {
|
|
co_await q.close();
|
|
querier_opt = {};
|
|
}
|
|
if (saved_querier) {
|
|
*saved_querier = std::move(querier_opt);
|
|
}
|
|
|
|
co_return r;
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await q.close();
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
|
|
mutation_source
|
|
table::as_mutation_source() const {
|
|
return mutation_source([this] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return this->make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}
|
|
|
|
void table::add_coordinator_read_latency(utils::estimated_histogram::duration latency) {
|
|
_stats.estimated_coordinator_read.add(std::chrono::duration_cast<std::chrono::microseconds>(latency).count());
|
|
}
|
|
|
|
std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double percentile) {
|
|
if (_cached_percentile != percentile || lowres_clock::now() - _percentile_cache_timestamp > 1s) {
|
|
_percentile_cache_timestamp = lowres_clock::now();
|
|
_cached_percentile = percentile;
|
|
_percentile_cache_value = std::max(_stats.estimated_coordinator_read.percentile(percentile) / 1000, int64_t(1)) * 1ms;
|
|
_stats.estimated_coordinator_read *= 0.9; // decay values a little to give new data points more weight
|
|
}
|
|
return _percentile_cache_value;
|
|
}
|
|
|
|
void
|
|
table::enable_auto_compaction() {
|
|
// FIXME: unmute backlog. turn table backlog back on.
|
|
// see table::disable_auto_compaction() notes.
|
|
_compaction_disabled_by_user = false;
|
|
trigger_compaction();
|
|
|
|
if (uses_logstor()) {
|
|
trigger_logstor_compaction();
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::disable_auto_compaction() {
|
|
// FIXME: mute backlog. When we disable background compactions
|
|
// for the table, we must also disable current backlog of the
|
|
// table compaction strategy that contributes to the scheduling
|
|
// group resources prioritization.
|
|
//
|
|
// There are 2 possibilities possible:
|
|
// - there are no ongoing background compaction, and we can freely
|
|
// mute table backlog.
|
|
// - there are compactions happening. than we must decide either
|
|
// we want to allow them to finish not allowing submitting new
|
|
// compactions tasks, or we may "suspend" them until the bg
|
|
// compactions will be enabled back. This is not a worst option
|
|
// because it will allow bg compactions to finish if there are
|
|
// unused resourced, it will not lose any writers/readers stats.
|
|
//
|
|
// Besides that:
|
|
// - there are major compactions that additionally uses constant
|
|
// size backlog of shares,
|
|
// - sstables rewrites tasks that do the same.
|
|
//
|
|
// Setting NullCompactionStrategy is not an option due to the
|
|
// following reasons:
|
|
// - it will 0 backlog if suspending current compactions is not an
|
|
// option
|
|
// - it will break computation of major compaction descriptor
|
|
// for new submissions
|
|
_compaction_disabled_by_user = true;
|
|
|
|
auto holder = _async_gate.hold();
|
|
|
|
co_await parallel_foreach_compaction_group_view([this] (compaction::compaction_group_view& view) {
|
|
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &view, compaction::compaction_type::Compaction);
|
|
});
|
|
|
|
if (uses_logstor()) {
|
|
co_await parallel_foreach_compaction_group([this] (compaction_group& cg) {
|
|
return get_logstor_compaction_manager().stop_ongoing_compactions(cg);
|
|
});
|
|
}
|
|
}
|
|
|
|
void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
|
|
_tombstone_gc_enabled = tombstone_gc_enabled;
|
|
tlogger.info0("Tombstone GC was {} for {}.{}", tombstone_gc_enabled ? "enabled" : "disabled", _schema->ks_name(), _schema->cf_name());
|
|
if (_tombstone_gc_enabled) {
|
|
trigger_compaction();
|
|
}
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_mutation_reader_excluding_staging(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const {
|
|
std::vector<mutation_reader> readers;
|
|
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
|
|
static const sstables::sstable_predicate excl_staging_predicate = [] (const sstables::sstable& sst) {
|
|
return !sst.requires_view_building();
|
|
};
|
|
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate));
|
|
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
}
|
|
|
|
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
|
auto permit = co_await get_sstable_list_permit();
|
|
sstables::delayed_commit_changes delay_commit;
|
|
std::unordered_set<compaction_group*> compaction_groups_to_notify;
|
|
for (auto sst : sstables) {
|
|
try {
|
|
// Off-strategy can happen in parallel to view building, so the SSTable may be deleted already if the former
|
|
// completed first.
|
|
// The sstable list permit prevents list update on off-strategy completion and move_sstables_from_staging()
|
|
// from stepping on each other's toe.
|
|
co_await sst->change_state(sstables::sstable_state::normal, &delay_commit);
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
if (get_compaction_manager().requires_cleanup(cg.view_for_sstable(sst), sst)) {
|
|
compaction_groups_to_notify.insert(&cg);
|
|
}
|
|
// If view building finished faster, SSTable with repair origin still exists.
|
|
// It can also happen the SSTable is not going through reshape, so it doesn't have a repair origin.
|
|
// That being said, we'll only add this SSTable to tracker if its origin is other than repair.
|
|
// Otherwise, we can count on off-strategy completion to add it when updating lists.
|
|
if (sst->get_origin() != sstables::repair_origin) {
|
|
add_sstable_to_backlog_tracker(cg.get_backlog_tracker(), sst);
|
|
}
|
|
} catch (...) {
|
|
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
co_await delay_commit.commit();
|
|
|
|
for (auto* cg : compaction_groups_to_notify) {
|
|
cg->get_staging_done_condition().broadcast();
|
|
}
|
|
|
|
// Off-strategy timer will be rearmed, so if there's more incoming data through repair / streaming,
|
|
// the timer can be updated once again. In practice, it allows off-strategy compaction to kick off
|
|
// at the end of the node operation on behalf of this table, which brings more efficiency in terms
|
|
// of write amplification.
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
|
|
/**
|
|
* Given an update for the base table, calculates the set of potentially affected views,
|
|
* generates the relevant updates, and sends them to the paired view replicas.
|
|
*/
|
|
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, const frozen_mutation& fm,
|
|
db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
|
|
//FIXME: Avoid unfreezing here.
|
|
auto m = fm.unfreeze(s);
|
|
return push_view_replica_updates(std::move(gen), s, std::move(m), timeout, std::move(tr_state), sem);
|
|
}
|
|
|
|
future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, query::partition_slice::option_set custom_opts) const {
|
|
schema_ptr base = schema();
|
|
m.upgrade(base);
|
|
gc_clock::time_point now = gc_clock::now();
|
|
|
|
if (!db::view::should_generate_view_updates_on_this_shard(base, get_effective_replication_map(), m.token())) {
|
|
// This could happen if we are a pending replica.
|
|
// A pending replica may have incomplete data, and building view updates could result
|
|
// in wrong updates. Therefore we don't send updates from a pending replica. Instead, the
|
|
// base replicas send the updates to the view replicas, including pending replicas.
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
|
|
auto views = affected_views(gen, base, m);
|
|
if (views.empty()) {
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(gen->get_db().as_data_dictionary(), *base, m.decorated_key(), m.partition(), views);
|
|
const bool need_regular = !cr_ranges.empty();
|
|
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
|
if (!need_regular && !need_static) {
|
|
tracing::trace(tr_state, "View updates do not require read-before-write");
|
|
co_await gen->generate_and_propagate_view_updates(*this, base, sem.make_tracking_only_permit(s, "push-view-updates-no-read-before-write", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now, timeout);
|
|
// In this case we are not doing a read-before-write, just a
|
|
// write, so no lock is needed.
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
// We read whole sets of regular and/or static columns in case the update now causes a base row to pass
|
|
// a view's filters, and a view happens to include columns that have no value in this update.
|
|
// Also, one of those columns can determine the lifetime of the base row, if it has a TTL.
|
|
query::column_id_vector static_columns;
|
|
query::column_id_vector regular_columns;
|
|
if (need_regular) {
|
|
std::ranges::copy(base->regular_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(regular_columns));
|
|
}
|
|
if (need_static) {
|
|
std::ranges::copy(base->static_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(static_columns));
|
|
}
|
|
query::partition_slice::option_set opts;
|
|
opts.set(query::partition_slice::option::send_partition_key);
|
|
opts.set_if<query::partition_slice::option::send_clustering_key>(need_regular);
|
|
opts.set_if<query::partition_slice::option::distinct>(need_static && !need_regular);
|
|
opts.set_if<query::partition_slice::option::always_return_static_content>(need_static);
|
|
opts.set(query::partition_slice::option::send_timestamp);
|
|
opts.set(query::partition_slice::option::send_ttl);
|
|
opts.add(custom_opts);
|
|
auto slice = query::partition_slice(
|
|
std::move(cr_ranges), std::move(static_columns), std::move(regular_columns), std::move(opts), { }, query::max_rows);
|
|
// Take the shard-local lock on the base-table row or partition as needed.
|
|
// We'll return this lock to the caller, which will release it after
|
|
// writing the base-table update.
|
|
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
|
|
auto lock = co_await std::move(lockf);
|
|
auto pk = dht::partition_range::make_singular(m.decorated_key());
|
|
auto permit = co_await sem.obtain_permit(base, "push-view-updates-read-before-write", estimate_read_memory_cost(), timeout, tr_state);
|
|
auto reader = source.make_mutation_reader(base, permit, pk, slice, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
co_await gen->generate_and_propagate_view_updates(*this, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now, timeout);
|
|
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
|
// return the local partition/row lock we have taken so it
|
|
// remains locked until the caller is done modifying this
|
|
// partition/row and destroys the lock object.
|
|
co_return std::move(lock);
|
|
|
|
}
|
|
|
|
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
|
|
return do_push_view_replica_updates(std::move(gen), s, std::move(m), timeout, as_mutation_source(),
|
|
std::move(tr_state), sem, {});
|
|
}
|
|
|
|
future<row_locker::lock_holder>
|
|
table::stream_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
std::vector<sstables::shared_sstable>& excluded_sstables) const {
|
|
return do_push_view_replica_updates(
|
|
std::move(gen),
|
|
s,
|
|
std::move(m),
|
|
timeout,
|
|
as_mutation_source_excluding_staging(),
|
|
tracing::trace_state_ptr(),
|
|
*_config.streaming_read_concurrency_semaphore,
|
|
query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
|
|
}
|
|
|
|
mutation_source
|
|
table::as_mutation_source_excluding_staging() const {
|
|
return mutation_source([this] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return this->make_mutation_reader_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}
|
|
|
|
std::vector<mutation_source> table::select_memtables_as_mutation_sources(dht::token token) const {
|
|
auto& sg = storage_group_for_token(token);
|
|
std::vector<mutation_source> mss;
|
|
mss.reserve(sg.memtable_count());
|
|
sg.for_each_compaction_group([&mss] (const compaction_group_ptr &cg) {
|
|
for (auto& mt : *cg->memtables()) {
|
|
mss.emplace_back(mt->as_data_source());
|
|
}
|
|
});
|
|
return mss;
|
|
}
|
|
|
|
compaction::compaction_backlog_tracker& compaction_group::get_backlog_tracker() {
|
|
return *_backlog_tracker;
|
|
}
|
|
|
|
void compaction_group::register_backlog_tracker(compaction::compaction_backlog_tracker new_backlog_tracker) {
|
|
_backlog_tracker.emplace(std::move(new_backlog_tracker));
|
|
get_compaction_manager().register_backlog_tracker(*_backlog_tracker);
|
|
}
|
|
|
|
compaction::compaction_manager& compaction_group::get_compaction_manager() noexcept {
|
|
return _t.get_compaction_manager();
|
|
}
|
|
|
|
const compaction::compaction_manager& compaction_group::get_compaction_manager() const noexcept {
|
|
return _t.get_compaction_manager();
|
|
}
|
|
|
|
logstor::segment_manager& compaction_group::get_logstor_segment_manager() noexcept {
|
|
return _t.get_logstor_segment_manager();
|
|
}
|
|
|
|
const logstor::segment_manager& compaction_group::get_logstor_segment_manager() const noexcept {
|
|
return _t.get_logstor_segment_manager();
|
|
}
|
|
|
|
logstor::compaction_manager& compaction_group::get_logstor_compaction_manager() noexcept {
|
|
return _t.get_logstor_compaction_manager();
|
|
}
|
|
|
|
const logstor::compaction_manager& compaction_group::get_logstor_compaction_manager() const noexcept {
|
|
return _t.get_logstor_compaction_manager();
|
|
}
|
|
|
|
logstor::primary_index& compaction_group::get_logstor_index() noexcept {
|
|
return _t.logstor_index();
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::as_view_for_static_sharding() const {
|
|
return view_for_unrepaired_data();
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::view_for_unrepaired_data() const {
|
|
return *_unrepaired_view;
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const {
|
|
switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) {
|
|
case repair_sstable_classification::unrepaired: return *_unrepaired_view;
|
|
case repair_sstable_classification::repairing: return *_repairing_view;
|
|
case repair_sstable_classification::repaired: return *_repaired_view;
|
|
}
|
|
std::unreachable();
|
|
}
|
|
|
|
utils::small_vector<compaction::compaction_group_view*, 3> compaction_group::all_views() const {
|
|
utils::small_vector<compaction::compaction_group_view*, 3> ret;
|
|
ret.push_back(_unrepaired_view.get());
|
|
ret.push_back(_repairing_view.get());
|
|
ret.push_back(_repaired_view.get());
|
|
return ret;
|
|
}
|
|
|
|
compaction_group* table::try_get_compaction_group_with_static_sharding() const {
|
|
if (!uses_static_sharding()) {
|
|
return nullptr;
|
|
}
|
|
return get_compaction_group(0);
|
|
}
|
|
|
|
compaction::compaction_group_view& table::try_get_compaction_group_view_with_static_sharding() const {
|
|
auto* cg = try_get_compaction_group_with_static_sharding();
|
|
if (!cg) {
|
|
throw std::runtime_error("Getting table state is allowed only with static sharding");
|
|
}
|
|
return cg->as_view_for_static_sharding();
|
|
}
|
|
|
|
future<> table::parallel_foreach_compaction_group_view(std::function<future<>(compaction::compaction_group_view&)> action) {
|
|
return parallel_foreach_compaction_group([action = std::move(action)] (compaction_group& cg) -> future<> {
|
|
for (auto view : cg.all_views()) {
|
|
co_await action(*view);
|
|
}
|
|
});
|
|
}
|
|
|
|
compaction::compaction_group_view& table::compaction_group_view_for_sstable(const sstables::shared_sstable& sst) const {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return cg.view_for_sstable(sst);
|
|
}
|
|
|
|
data_dictionary::table
|
|
table::as_data_dictionary() const {
|
|
static constinit data_dictionary_impl _impl;
|
|
return _impl.wrap(*this);
|
|
}
|
|
|
|
bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return get_compaction_manager().erase_sstable_cleanup_state(cg.as_view_for_static_sharding(), sst);
|
|
}
|
|
|
|
bool table::requires_cleanup(const sstables::shared_sstable& sst) const {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return get_compaction_manager().requires_cleanup(cg.as_view_for_static_sharding(), sst);
|
|
}
|
|
|
|
bool table::requires_cleanup(const sstables::sstable_set& set) const {
|
|
return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return stop_iteration(_compaction_manager.requires_cleanup(cg.as_view_for_static_sharding(), sst));
|
|
}));
|
|
}
|
|
|
|
future<> compaction_group::cleanup() {
|
|
class compaction_group_cleaner : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
const lw_shared_ptr<sstables::sstable_set> _empty_main_set;
|
|
const lw_shared_ptr<sstables::sstable_set> _empty_maintenance_set;
|
|
private:
|
|
lw_shared_ptr<sstables::sstable_set> empty_sstable_set() const {
|
|
return make_lw_shared<sstables::sstable_set>(_cg.make_main_sstable_set());
|
|
}
|
|
public:
|
|
explicit compaction_group_cleaner(compaction_group& cg)
|
|
: _t(cg._t)
|
|
, _cg(cg)
|
|
, _empty_main_set(empty_sstable_set())
|
|
, _empty_maintenance_set(empty_sstable_set())
|
|
{
|
|
}
|
|
virtual future<> prepare() override {
|
|
// Capture SSTables after flush, and with compaction disabled, to avoid missing any.
|
|
auto set = _cg.make_sstable_set();
|
|
auto& sstables_compacted_but_not_deleted = _cg._sstables_compacted_but_not_deleted;
|
|
sstables_compacted_but_not_deleted.reserve(set->size() + sstables_compacted_but_not_deleted.size());
|
|
set->for_each_sstable([&] (const sstables::shared_sstable& sst) mutable {
|
|
sstables_compacted_but_not_deleted.push_back(sst);
|
|
});
|
|
if (utils::get_local_injector().enter("tablet_cleanup_failure")) {
|
|
co_await sleep(std::chrono::seconds(1));
|
|
tlogger.info("Cleanup failed for tablet {}", _cg.group_id());
|
|
throw std::runtime_error("tablet cleanup failure");
|
|
}
|
|
}
|
|
|
|
virtual void execute() override {
|
|
_t.subtract_compaction_group_from_stats(_cg);
|
|
_cg.set_main_sstables(std::move(_empty_main_set));
|
|
_cg.set_maintenance_sstables(std::move(_empty_maintenance_set));
|
|
_t.refresh_compound_sstable_set();
|
|
}
|
|
};
|
|
|
|
auto permit = co_await _t.get_sstable_list_permit();
|
|
auto updater = row_cache::external_updater(std::make_unique<compaction_group_cleaner>(*this));
|
|
|
|
auto p_range = to_partition_range(token_range());
|
|
tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.",
|
|
p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name());
|
|
// Since permit is still held, all actions below will be executed atomically:
|
|
co_await _t._cache.invalidate(std::move(updater), p_range);
|
|
_t._cache.refresh_snapshot();
|
|
|
|
co_await _t.delete_sstables_atomically(permit, _sstables_compacted_but_not_deleted);
|
|
// Clearing sstables_compacted_but_not_deleted only on success allows a retry caused
|
|
// by a failure during deletion to still find the sstables, despite they were removed
|
|
// from the sstable sets.
|
|
_sstables_compacted_but_not_deleted.clear();
|
|
if (utils::get_local_injector().enter("tablet_cleanup_failure_post_deletion")) {
|
|
tlogger.info("Cleanup failed for tablet {}", group_id());
|
|
throw std::runtime_error("tablet cleanup failure");
|
|
}
|
|
}
|
|
|
|
future<> table::clear_inactive_reads_for_tablet(database& db, storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups_immediate()) {
|
|
co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range());
|
|
}
|
|
}
|
|
|
|
future<> storage_group::stop(sstring reason) noexcept {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
// Carefully waits for close of gate after stopping compaction groups, since we don't want
|
|
// to wait on an ongoing compaction, *but* start it earlier to prevent iterations from
|
|
// picking this group that is being stopped.
|
|
auto closed_gate_fut = _async_gate.close();
|
|
|
|
co_await utils::get_local_injector().inject("wait_before_stop_compaction_groups", [] (auto& handler) -> future<> {
|
|
dblog.info("wait_before_stop_compaction_groups: wait");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
dblog.info("wait_before_stop_compaction_groups: done");
|
|
}, false);
|
|
|
|
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
|
|
|
|
// The reason we have to stop main cg first, is because an ongoing split always run in main cg
|
|
// and output will be written to left and right groups. If either left or right are stopped before
|
|
// main, split completion will add sstable to a closed group, and that might in turn trigger an
|
|
// exception while running under row_cache::external_updater::execute, resulting in node crash.
|
|
co_await _main_cg->stop(reason);
|
|
co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
|
|
return cg_ptr->stop(reason);
|
|
});
|
|
co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
|
|
return cg_ptr->stop(reason);
|
|
});
|
|
co_await std::move(closed_gate_fut);
|
|
}
|
|
|
|
future<> table::stop_compaction_groups(storage_group& sg) {
|
|
return sg.stop("tablet cleanup");
|
|
}
|
|
|
|
future<> table::flush_compaction_groups(storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups_immediate()) {
|
|
co_await cg_ptr->flush();
|
|
}
|
|
}
|
|
|
|
future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups_immediate()) {
|
|
co_await cg_ptr->cleanup();
|
|
// FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation,
|
|
// and can cover some mutations which weren't cleaned, causing them to be lost during replay.
|
|
//
|
|
// This should be okay, because writes are not supposed to race with cleanups
|
|
// in the first place, but it would be better to extract the exact replay_position from
|
|
// the actually flushed/deleted sstables, like discard_sstable() does, so that the mutations
|
|
// cleaned from sstables are exactly the same as the ones cleaned from commitlog.
|
|
co_await sys_ks.save_commitlog_cleanup_record(schema()->id(), sg.token_range(), _highest_rp);
|
|
// This is the only place (outside of reboot) where we delete unneeded commitlog cleanup
|
|
// records. This isn't ideal -- it would be more natural if the unneeded records
|
|
// were deleted as soon as they become unneeded -- but this gets the job done with a
|
|
// minimal amount of code.
|
|
co_await sys_ks.drop_old_commitlog_cleanup_records(db.commitlog()->min_position());
|
|
}
|
|
|
|
tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name());
|
|
}
|
|
|
|
future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
|
auto holder = async_gate().hold();
|
|
|
|
auto sgp = _sg_manager->maybe_storage_group_for_id(_schema, tid.value());
|
|
if (!sgp) {
|
|
tlogger.warn("Storage group for tablet {} is deallocated. Ignore cleanup.", tid);
|
|
co_return;
|
|
}
|
|
|
|
auto& sg = *sgp;
|
|
co_await clear_inactive_reads_for_tablet(db, sg);
|
|
// compaction_group::stop takes care of flushing.
|
|
co_await stop_compaction_groups(sg);
|
|
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
|
|
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
|
}
|
|
|
|
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
|
auto holder = async_gate().hold();
|
|
auto& sg = storage_group_for_id(tid.value());
|
|
|
|
co_await clear_inactive_reads_for_tablet(db, sg);
|
|
co_await flush_compaction_groups(sg);
|
|
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
|
}
|
|
|
|
shard_id table::shard_for_reads(dht::token t) const {
|
|
return _erm ? _erm->shard_for_reads(*_schema, t)
|
|
: dht::static_shard_of(*_schema, t); // for tests.
|
|
}
|
|
|
|
dht::shard_replica_set table::shard_for_writes(dht::token t) const {
|
|
return _erm ? _erm->shard_for_writes(*_schema, t)
|
|
: dht::shard_replica_set{dht::static_shard_of(*_schema, t)}; // for tests.
|
|
}
|
|
|
|
future<uint64_t> table::estimated_partitions_in_range(dht::token_range tr) const {
|
|
// FIXME: use a better estimation for the set than a simple sum of individual estimations for each sstable.
|
|
//
|
|
// If sstables can be grouped by token range,
|
|
// and tokens within each sstable are uniformly distributed
|
|
// over the token range, (that's the usual case, and it's always
|
|
// the case with tablets), then we could use an alternative calculation.
|
|
//
|
|
// The result would be the sum of sub-results for each compaction group.
|
|
// To get an estimate for a compaction group,
|
|
// we could first use the sstable cardinality sketches (hyperloglog)
|
|
// to get a decent post-compaction estimate of total cardinality.
|
|
// And then, we could multiply the total estimate by the fraction of the group's
|
|
// range which is covered by `tr`.
|
|
auto sstables = select_sstables(dht::to_partition_range(tr));
|
|
uint64_t partition_count = 0;
|
|
co_await seastar::max_concurrent_for_each(sstables, 10, [&partition_count, &tr] (sstables::shared_sstable sst) -> future<> {
|
|
partition_count += co_await sst->estimated_keys_for_range(tr);
|
|
});
|
|
co_return partition_count;
|
|
}
|
|
|
|
tombstone_gc_state table::get_tombstone_gc_state() const {
|
|
return tombstone_gc_state(_compaction_manager.get_shared_tombstone_gc_state());
|
|
}
|
|
|
|
} // namespace replica
|