Ref https://github.com/scylladb/seastar/pull/3163 We can optimize the stat calls we use here by using open_directory to open the snapshot, base, and staging directory once, and using statat calls for the relative name instead of the full blown file_stat that needs to traverse the whole path prefix for every call (the dirents are likely to be cached, but still why waste cpu cycles on that over and over again). Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
4617 lines
211 KiB
C++
4617 lines
211 KiB
C++
/*
|
|
* Copyright (C) 2018-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/with_scheduling_group.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/exception.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/coroutine/switch_to.hh>
|
|
#include <seastar/coroutine/as_future.hh>
|
|
#include <seastar/util/closeable.hh>
|
|
#include <seastar/util/defer.hh>
|
|
#include <seastar/json/json_elements.hh>
|
|
|
|
#include "dht/decorated_key.hh"
|
|
#include "replica/database.hh"
|
|
#include "replica/data_dictionary_impl.hh"
|
|
#include "replica/compaction_group.hh"
|
|
#include "replica/query_state.hh"
|
|
#include "sstables/shared_sstable.hh"
|
|
#include "sstables/sstable_set.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "cell_locking.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/logalloc.hh"
|
|
#include "utils/checked-file-impl.hh"
|
|
#include "utils/managed_bytes.hh"
|
|
#include "view_info.hh"
|
|
#include "db/data_listeners.hh"
|
|
#include "memtable-sstable.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "compaction/compaction_group_view.hh"
|
|
#include "sstables/sstable_directory.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/extensions.hh"
|
|
#include "query/query-result-writer.hh"
|
|
#include "db/view/view_update_generator.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/histogram_metrics_helper.hh"
|
|
#include "mutation/mutation_source_metadata.hh"
|
|
#include "gms/gossiper.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "db/config.hh"
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "utils/lister.hh"
|
|
#include "dht/token.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "replica/global_table_ptr.hh"
|
|
#include "locator/tablets.hh"
|
|
|
|
#include "utils/error_injection.hh"
|
|
#include "readers/reversing.hh"
|
|
#include "readers/empty.hh"
|
|
#include "readers/multi_range.hh"
|
|
#include "readers/combined.hh"
|
|
#include "readers/compacting.hh"
|
|
#include "replica/schema_describe_helper.hh"
|
|
#include "repair/incremental.hh"
|
|
|
|
namespace replica {
|
|
|
|
static logging::logger tlogger("table");
|
|
static seastar::metrics::label column_family_label("cf");
|
|
static seastar::metrics::label keyspace_label("ks");
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
table_holder::table_holder(table& t)
|
|
: _holder(t.async_gate())
|
|
, _table_ptr(t.shared_from_this())
|
|
{ }
|
|
|
|
sstables::generation_type table::calculate_generation_for_new_table() {
|
|
auto ret = _sstable_generation_generator();
|
|
tlogger.debug("{}.{} new sstable generation {}", schema()->ks_name(), schema()->cf_name(), ret);
|
|
return ret;
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_sstable_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
lw_shared_ptr<const sstables::sstable_set> sstables,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstables::sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const {
|
|
// CAVEAT: if make_sstable_reader() is called on a single partition
|
|
// we want to optimize and read exactly this partition. As a
|
|
// consequence, fast_forward_to() will *NOT* work on the result,
|
|
// regardless of what the fwd_mr parameter says.
|
|
if (pr.is_singular() && pr.start()->value().has_key()) {
|
|
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
|
|
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
|
|
} else {
|
|
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate, nullptr, integrity);
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> compaction_group::make_sstable_set() const {
|
|
// Bypasses compound set if maintenance one is empty. If a SSTable is added later into maintenance,
|
|
// the sstable set is refreshed to reflect the current state.
|
|
if (!_maintenance_sstables->size()) {
|
|
return _main_sstables;
|
|
}
|
|
return make_lw_shared(sstables::make_compound_sstable_set(_t.schema(), { _main_sstables, _maintenance_sstables }));
|
|
}
|
|
|
|
int64_t compaction_group::get_sstables_repaired_at() const noexcept {
|
|
try {
|
|
auto tid = locator::tablet_id(group_id());
|
|
auto erm = _t.get_effective_replication_map();
|
|
if (!erm) {
|
|
return 0;
|
|
}
|
|
if (!erm->get_replication_strategy().uses_tablets()) {
|
|
return 0;
|
|
}
|
|
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_t.schema()->id());
|
|
auto& tinfo = tmap.get_tablet_info(tid);
|
|
return tinfo.sstables_repaired_at;
|
|
} catch (locator::no_such_tablet_map) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> table::make_compound_sstable_set() const {
|
|
return _sg_manager->make_sstable_set();
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> compaction_group::make_maintenance_sstable_set() const {
|
|
return make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_t.schema(), token_range()));
|
|
}
|
|
|
|
void table::refresh_compound_sstable_set() {
|
|
_sstables = make_compound_sstable_set();
|
|
}
|
|
|
|
// Exposed for testing, not performance critical.
|
|
future<table::const_mutation_partition_ptr>
|
|
table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const {
|
|
return do_with(dht::partition_range::make_singular(key), [s = std::move(s), permit = std::move(permit), this] (auto& range) mutable {
|
|
return with_closeable(this->make_mutation_reader(std::move(s), std::move(permit), range), [] (mutation_reader& reader) {
|
|
return read_mutation_from_mutation_reader(reader).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
|
|
if (!mo) {
|
|
return {};
|
|
}
|
|
return std::make_unique<const mutation_partition>(std::move(mo->partition()));
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<table::const_row_ptr>
|
|
table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
|
|
return find_partition(s, std::move(permit), partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) {
|
|
if (!p) {
|
|
return make_ready_future<const_row_ptr>();
|
|
}
|
|
auto r = p->find_row(*s, clustering_key);
|
|
if (r) {
|
|
// FIXME: remove copy if only one data source
|
|
return make_ready_future<const_row_ptr>(std::make_unique<row>(*s, column_kind::regular_column, *r));
|
|
} else {
|
|
return make_ready_future<const_row_ptr>();
|
|
}
|
|
});
|
|
}
|
|
|
|
void
|
|
table::add_memtables_to_reader_list(std::vector<mutation_reader>& readers,
|
|
const schema_ptr& s,
|
|
const reader_permit& permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
const tracing::trace_state_ptr& trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
std::function<void(size_t)> reserve_fn) const {
|
|
auto add_memtables_from_cg = [&] (compaction_group& cg) mutable {
|
|
for (auto&& mt: *cg.memtables()) {
|
|
if (auto reader_opt = mt->make_mutation_reader_opt(s, permit, range, slice, trace_state, fwd, fwd_mr)) {
|
|
readers.emplace_back(std::move(*reader_opt));
|
|
}
|
|
}
|
|
};
|
|
|
|
// point queries can be optimized as they span a single compaction group.
|
|
if (range.is_singular() && range.start()->value().has_key()) {
|
|
const dht::ring_position& pos = range.start()->value();
|
|
auto& sg = storage_group_for_token(pos.token());
|
|
reserve_fn(sg.memtable_count());
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
add_memtables_from_cg(*cg);
|
|
});
|
|
return;
|
|
}
|
|
auto token_range = range.transform(std::mem_fn(&dht::ring_position::token));
|
|
auto sgs = storage_groups_for_token_range(token_range);
|
|
reserve_fn(std::ranges::fold_left(sgs | std::views::transform(std::mem_fn(&storage_group::memtable_count)), uint64_t(0), std::plus{}));
|
|
for (auto& sg : sgs) {
|
|
for (auto& cg : sg->compaction_groups()) {
|
|
add_memtables_from_cg(*cg);
|
|
}
|
|
}
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_mutation_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const {
|
|
if (_virtual_reader) [[unlikely]] {
|
|
return (*_virtual_reader).make_mutation_reader(s, std::move(permit), range, slice, trace_state, fwd, fwd_mr);
|
|
}
|
|
|
|
std::vector<mutation_reader> readers;
|
|
|
|
// We're assuming that cache and memtables are both read atomically
|
|
// for single-key queries, so we don't need to special case memtable
|
|
// undergoing a move to cache. At any given point in time between
|
|
// deferring points the sum of data in memtable and cache is coherent. If
|
|
// single-key queries for each data source were performed across deferring
|
|
// points, it would be possible that partitions which are ahead of the
|
|
// memtable cursor would be placed behind the cache cursor, resulting in
|
|
// those partitions being missing in the combined reader.
|
|
//
|
|
// We need to handle this in range queries though, as they are always
|
|
// deferring. scanning_reader from memtable.cc is falling back to reading
|
|
// the sstable when memtable is flushed. After memtable is moved to cache,
|
|
// new readers will no longer use the old memtable, but until then
|
|
// performance may suffer. We should fix this when we add support for
|
|
// range queries in cache, so that scans can always be satisfied form
|
|
// memtable and cache only, as long as data is not evicted.
|
|
//
|
|
// https://github.com/scylladb/scylla/issues/309
|
|
// https://github.com/scylladb/scylla/issues/185
|
|
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
|
|
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
|
|
if (cache_enabled() && !bypass_cache) {
|
|
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(),
|
|
get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) {
|
|
readers.emplace_back(std::move(*reader_opt));
|
|
}
|
|
} else {
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr));
|
|
}
|
|
|
|
auto rd = make_combined_reader(s, permit, std::move(readers), fwd, fwd_mr);
|
|
|
|
if (_config.data_listeners && !_config.data_listeners->empty()) {
|
|
rd = _config.data_listeners->on_read(s, range, slice, std::move(rd));
|
|
}
|
|
|
|
return rd;
|
|
}
|
|
|
|
sstables::shared_sstable table::make_streaming_sstable_for_write() {
|
|
auto newtab = make_sstable(sstables::sstable_state::normal);
|
|
tlogger.debug("Created sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
|
|
return newtab;
|
|
}
|
|
|
|
sstables::shared_sstable table::make_streaming_staging_sstable() {
|
|
auto newtab = make_sstable(sstables::sstable_state::staging);
|
|
tlogger.debug("Created staging sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
|
|
return newtab;
|
|
}
|
|
|
|
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, const compaction::compaction_manager& cm,
|
|
gc_clock::time_point compaction_time, bool compaction_enabled, bool compaction_can_gc) {
|
|
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_enabled", fmt::to_string(compaction_enabled));
|
|
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_can_gc", fmt::to_string(compaction_can_gc));
|
|
if (!compaction_enabled) {
|
|
return underlying;
|
|
}
|
|
return make_compacting_reader(
|
|
std::move(underlying),
|
|
compaction_time,
|
|
compaction_can_gc ? can_always_purge : can_never_purge,
|
|
cm.get_tombstone_gc_state(),
|
|
streamed_mutation::forwarding::no);
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_streaming_reader(schema_ptr s, reader_permit permit,
|
|
const dht::partition_range_vector& ranges,
|
|
gc_clock::time_point compaction_time) const {
|
|
auto& slice = s->full_slice();
|
|
|
|
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
|
std::vector<mutation_reader> readers;
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
|
|
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
});
|
|
|
|
return maybe_compact_for_streaming(
|
|
make_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, nullptr, mutation_reader::forwarding::no),
|
|
get_compaction_manager(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, gc_clock::time_point compaction_time) const {
|
|
auto trace_state = tracing::trace_state_ptr();
|
|
const auto fwd = streamed_mutation::forwarding::no;
|
|
|
|
std::vector<mutation_reader> readers;
|
|
add_memtables_to_reader_list(readers, schema, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
|
|
return maybe_compact_for_streaming(
|
|
make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr),
|
|
get_compaction_manager(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const {
|
|
auto& slice = schema->full_slice();
|
|
auto trace_state = tracing::trace_state_ptr();
|
|
const auto fwd = streamed_mutation::forwarding::no;
|
|
const auto fwd_mr = mutation_reader::forwarding::no;
|
|
return maybe_compact_for_streaming(
|
|
sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice,
|
|
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes),
|
|
get_compaction_manager(),
|
|
compaction_time,
|
|
_config.enable_compacting_data_for_streaming_and_repair(),
|
|
_config.enable_tombstone_gc_for_streaming_and_repair());
|
|
}
|
|
|
|
mutation_reader table::make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice, tracing::trace_state_ptr ts) {
|
|
if (!range.is_singular()) {
|
|
throw std::runtime_error("table::make_cache_reader(): only singular ranges are supported");
|
|
}
|
|
return _cache.make_nonpopulating_reader(std::move(schema), std::move(permit), range, slice, std::move(ts));
|
|
}
|
|
|
|
future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout) {
|
|
SCYLLA_ASSERT(m.schema() == _counter_cell_locks->schema());
|
|
return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout);
|
|
}
|
|
|
|
void table::for_each_active_memtable(noncopyable_function<void(memtable&)> action) {
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
action(cg.memtables()->active_memtable());
|
|
});
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_timestamp(); }
|
|
));
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_live_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_live_timestamp(); }
|
|
));
|
|
}
|
|
|
|
api::timestamp_type compaction_group::min_memtable_live_row_marker_timestamp() const {
|
|
if (_memtables->empty()) {
|
|
return api::max_timestamp;
|
|
}
|
|
|
|
return std::ranges::min(
|
|
*_memtables
|
|
| std::views::transform(
|
|
[](const shared_memtable& m) { return m->get_min_live_row_marker_timestamp(); }
|
|
));
|
|
}
|
|
|
|
bool compaction_group::memtable_has_key(const dht::decorated_key& key) const {
|
|
if (_memtables->empty()) {
|
|
return false;
|
|
}
|
|
return std::ranges::any_of(*_memtables,
|
|
std::bind(&memtable::contains_partition, std::placeholders::_1, std::ref(key)));
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_timestamp() const {
|
|
return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_live_timestamp() const {
|
|
return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_live_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type storage_group::min_memtable_live_row_marker_timestamp() const {
|
|
return std::ranges::min(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::min_memtable_live_row_marker_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_live_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_live_timestamp)));
|
|
}
|
|
|
|
api::timestamp_type table::min_memtable_live_row_marker_timestamp() const {
|
|
return std::ranges::min(storage_groups() | std::views::values
|
|
| std::views::transform(std::mem_fn(&storage_group::min_memtable_live_row_marker_timestamp)));
|
|
}
|
|
|
|
static bool belongs_to_current_shard(const std::vector<shard_id>& shards) {
|
|
return std::ranges::contains(shards, this_shard_id());
|
|
}
|
|
|
|
static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
|
|
return shards.size() != size_t(belongs_to_current_shard(shards));
|
|
}
|
|
|
|
sstables::shared_sstable table::make_sstable(sstables::sstable_state state) {
|
|
auto& sstm = get_sstables_manager();
|
|
return sstm.make_sstable(_schema, *_storage_opts, calculate_generation_for_new_table(), state, sstm.get_preferred_sstable_version(), sstables::sstable::format_types::big);
|
|
}
|
|
|
|
sstables::shared_sstable table::make_sstable() {
|
|
return make_sstable(sstables::sstable_state::normal);
|
|
}
|
|
|
|
db_clock::time_point table::get_truncation_time() const {
|
|
if (!_truncated_at) [[unlikely]] {
|
|
on_internal_error(dblog, ::format("truncation time is not set, table {}.{}",
|
|
_schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
return *_truncated_at;
|
|
}
|
|
|
|
void table::notify_bootstrap_or_replace_start() {
|
|
_is_bootstrap_or_replace = true;
|
|
}
|
|
|
|
void table::notify_bootstrap_or_replace_end() {
|
|
_is_bootstrap_or_replace = false;
|
|
trigger_offstrategy_compaction();
|
|
}
|
|
|
|
inline void table::add_sstable_to_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
|
tracker.replace_sstables({}, {std::move(sstable)});
|
|
}
|
|
|
|
inline void table::remove_sstable_from_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
|
tracker.replace_sstables({std::move(sstable)}, {});
|
|
}
|
|
|
|
void compaction_group::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
|
// If group was closed / is being closed, it's ok to ignore request to adjust backlog tracker,
|
|
// since that might result in an exception due to the group being deregistered from compaction
|
|
// manager already. And the group is being removed anyway, so that won't have any practical
|
|
// impact.
|
|
if (_async_gate.is_closed()) {
|
|
return;
|
|
}
|
|
|
|
auto& tracker = get_backlog_tracker();
|
|
tracker.replace_sstables(old_sstables, new_sstables);
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set>
|
|
compaction_group::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
|
|
enable_backlog_tracker backlog_tracker) {
|
|
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
|
|
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
|
|
}
|
|
// allow in-progress reads to continue using old list
|
|
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
|
|
new_sstables->insert(sstable);
|
|
if (backlog_tracker) {
|
|
table::add_sstable_to_backlog_tracker(get_backlog_tracker(), sstable);
|
|
}
|
|
_max_seen_timestamp = std::max(_max_seen_timestamp, sstable->get_stats_metadata().max_timestamp);
|
|
return new_sstables;
|
|
}
|
|
|
|
void compaction_group::add_sstable(sstables::shared_sstable sstable) {
|
|
_main_sstables = do_add_sstable(_main_sstables, std::move(sstable), enable_backlog_tracker::yes);
|
|
}
|
|
|
|
const lw_shared_ptr<sstables::sstable_set>& compaction_group::main_sstables() const noexcept {
|
|
return _main_sstables;
|
|
}
|
|
|
|
sstables::sstable_set compaction_group::make_main_sstable_set() const {
|
|
// Uses view for unrepaired data, but doesn't really matter since this is only building an empty set,
|
|
// and will get through the view info like group id and token range.
|
|
return _t._compaction_strategy.make_sstable_set(view_for_unrepaired_data());
|
|
}
|
|
|
|
void compaction_group::set_main_sstables(lw_shared_ptr<sstables::sstable_set> new_main_sstables) {
|
|
_main_sstables = std::move(new_main_sstables);
|
|
}
|
|
|
|
void compaction_group::add_maintenance_sstable(sstables::shared_sstable sst) {
|
|
_maintenance_sstables = do_add_sstable(_maintenance_sstables, std::move(sst), enable_backlog_tracker::no);
|
|
}
|
|
|
|
const lw_shared_ptr<sstables::sstable_set>& compaction_group::maintenance_sstables() const noexcept {
|
|
return _maintenance_sstables;
|
|
}
|
|
|
|
void compaction_group::set_maintenance_sstables(lw_shared_ptr<sstables::sstable_set> new_maintenance_sstables) {
|
|
_maintenance_sstables = std::move(new_maintenance_sstables);
|
|
}
|
|
|
|
void table::add_sstable(compaction_group& cg, sstables::shared_sstable sstable) {
|
|
cg.add_sstable(std::move(sstable));
|
|
refresh_compound_sstable_set();
|
|
}
|
|
|
|
void table::add_maintenance_sstable(compaction_group& cg, sstables::shared_sstable sst) {
|
|
cg.add_maintenance_sstable(std::move(sst));
|
|
refresh_compound_sstable_set();
|
|
}
|
|
|
|
void table::do_update_off_strategy_trigger() {
|
|
_off_strategy_trigger.rearm(timer<>::clock::now() + std::chrono::minutes(5));
|
|
}
|
|
|
|
// If there are more sstables to be added to the off-strategy sstable set, call
|
|
// update_off_strategy_trigger to update the timer and delay to trigger
|
|
// off-strategy compaction. The off-strategy compaction will be triggered when
|
|
// the timer is expired.
|
|
void table::update_off_strategy_trigger() {
|
|
if (_off_strategy_trigger.armed()) {
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
}
|
|
|
|
// Call enable_off_strategy_trigger to enable the automatic off-strategy
|
|
// compaction trigger.
|
|
void table::enable_off_strategy_trigger() {
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
|
|
storage_group_manager::~storage_group_manager() = default;
|
|
|
|
// exception-less attempt to hold gate.
|
|
// TODO: move it to seastar.
|
|
static std::optional<gate::holder> try_hold_gate(gate& g) noexcept {
|
|
return g.is_closed() ? std::nullopt : std::make_optional(g.hold());
|
|
}
|
|
|
|
future<> storage_group_manager::parallel_foreach_storage_group(std::function<future<>(storage_group&)> f) {
|
|
co_await coroutine::parallel_for_each(_storage_groups | std::views::values, [&] (const storage_group_ptr sg) -> future<> {
|
|
// Table-wide ops, like 'nodetool compact', are inherently racy with migrations, so it's okay to skip
|
|
// storage of tablets being migrated away.
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
co_await f(*sg.get());
|
|
}
|
|
});
|
|
}
|
|
|
|
future<> storage_group_manager::for_each_storage_group_gently(std::function<future<>(storage_group&)> f) {
|
|
auto storage_groups = _storage_groups | std::views::values | std::ranges::to<std::vector>();
|
|
for (auto& sg: storage_groups) {
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
co_await f(*sg.get());
|
|
}
|
|
}
|
|
}
|
|
|
|
void storage_group_manager::for_each_storage_group(std::function<void(size_t, storage_group&)> f) const {
|
|
for (auto& [id, sg]: _storage_groups) {
|
|
if (auto holder = try_hold_gate(sg->async_gate())) {
|
|
f(id, *sg);
|
|
}
|
|
}
|
|
}
|
|
|
|
const storage_group_map& storage_group_manager::storage_groups() const {
|
|
return _storage_groups;
|
|
}
|
|
|
|
future<> storage_group_manager::stop_storage_groups() noexcept {
|
|
co_await parallel_for_each(_storage_groups | std::views::values, [] (auto sg) { return sg->stop("table removal"); });
|
|
co_await stop();
|
|
}
|
|
|
|
void storage_group_manager::clear_storage_groups() {
|
|
for (auto& [id, sg]: _storage_groups) {
|
|
sg->clear_sstables();
|
|
}
|
|
}
|
|
|
|
void storage_group_manager::remove_storage_group(size_t id) {
|
|
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
|
|
_storage_groups.erase(it);
|
|
} else {
|
|
throw std::out_of_range(format("remove_storage_group: storage group with id={} not found", id));
|
|
}
|
|
}
|
|
|
|
storage_group& storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const {
|
|
auto it = _storage_groups.find(i);
|
|
if (it == _storage_groups.end()) [[unlikely]] {
|
|
throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name()));
|
|
}
|
|
return *it->second.get();
|
|
}
|
|
|
|
storage_group* storage_group_manager::maybe_storage_group_for_id(const schema_ptr& s, size_t i) const {
|
|
auto it = _storage_groups.find(i);
|
|
return it != _storage_groups.end() ? &*it->second.get() : nullptr;
|
|
}
|
|
|
|
class single_storage_group_manager final : public storage_group_manager {
|
|
replica::table& _t;
|
|
storage_group_ptr _single_sg;
|
|
compaction_group* _single_cg;
|
|
|
|
compaction_group& get_compaction_group() const noexcept {
|
|
return *_single_cg;
|
|
}
|
|
public:
|
|
single_storage_group_manager(replica::table& t)
|
|
: _t(t)
|
|
{
|
|
storage_group_map r;
|
|
|
|
// Incremental repair is not supported with vnodes, so all sstables will be considered unrepaired.
|
|
auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&, int64_t sstables_repaired_at) { return repair_sstable_classification::unrepaired; };
|
|
|
|
// this might not reflect real vnode range for this node, but with 256 tokens, the actual
|
|
// first and last tokens are likely to be ~0.5% of the edges, so any measurement against
|
|
// this accurate enough token range will be likely up to ~1% off.
|
|
// TODO: we could fed actual vnode range here, but we might bump into a chicken and egg
|
|
// problem if e.g. a system table is created before tokens were allocated.
|
|
auto full_token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
auto cg = make_lw_shared<compaction_group>(_t, size_t(0), std::move(full_token_range), noop_repair_sstable_classifier);
|
|
_single_cg = cg.get();
|
|
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
|
_single_sg = sg;
|
|
r[0] = std::move(sg);
|
|
_storage_groups = std::move(r);
|
|
}
|
|
|
|
future<> stop() override {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
void update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override {}
|
|
|
|
compaction_group& compaction_group_for_token(dht::token token) const override {
|
|
return get_compaction_group();
|
|
}
|
|
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const override {
|
|
utils::chunked_vector<storage_group_ptr> ret;
|
|
ret.push_back(_single_sg);
|
|
return ret;
|
|
}
|
|
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override {
|
|
return get_compaction_group();
|
|
}
|
|
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override {
|
|
return get_compaction_group();
|
|
}
|
|
size_t log2_storage_groups() const override {
|
|
return 0;
|
|
}
|
|
storage_group& storage_group_for_token(dht::token token) const override {
|
|
return *_single_sg;
|
|
}
|
|
|
|
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const override {
|
|
return locator::combined_load_stats{
|
|
.table_ls = locator::table_load_stats{
|
|
.size_in_bytes = _single_sg->live_disk_space_used(),
|
|
.split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min()},
|
|
.tablet_ls = locator::tablet_load_stats{}
|
|
};
|
|
}
|
|
|
|
bool all_storage_groups_split() override { return true; }
|
|
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
|
|
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
|
|
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
|
|
}
|
|
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
|
|
|
|
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
|
return get_compaction_group().make_sstable_set();
|
|
}
|
|
};
|
|
|
|
class tablet_storage_group_manager final : public storage_group_manager {
|
|
replica::table& _t;
|
|
locator::host_id _my_host_id;
|
|
const locator::tablet_map* _tablet_map;
|
|
future<> _stop_fut = make_ready_future();
|
|
// Every table replica that completes split work will load the seq number from tablet metadata into its local
|
|
// state. So when coordinator pull the local state of a table, it will know whether the table is ready for the
|
|
// current split, and not a previously revoked (stale) decision.
|
|
// The minimum value, which is a negative number, is not used by coordinator for first decision.
|
|
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
|
future<> _merge_completion_fiber;
|
|
condition_variable _merge_completion_event;
|
|
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
|
std::vector<compaction::compaction_reenabler> _compaction_reenablers_for_merging;
|
|
private:
|
|
const schema_ptr& schema() const {
|
|
return _t.schema();
|
|
}
|
|
|
|
const locator::tablet_map& tablet_map() const noexcept {
|
|
return *_tablet_map;
|
|
}
|
|
|
|
size_t tablet_count() const noexcept {
|
|
return tablet_map().tablet_count();
|
|
}
|
|
|
|
future<compaction::compaction_type_options::split> split_compaction_options() const noexcept;
|
|
|
|
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
|
|
// each tablet split into two, so this replica will remap all of its compaction groups
|
|
// that were previously split.
|
|
void handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
|
|
|
|
// Called when coordinator executes tablet merge. Tablet ids X and X+1 are merged into
|
|
// the new tablet id (X >> 1). In practice, that means storage groups for X and X+1
|
|
// are merged into a new storage group with id (X >> 1).
|
|
void handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
|
|
|
|
// When merge completes, compaction groups of sibling tablets are added to same storage
|
|
// group, but they're not merged yet into one, since the merge completion handler happens
|
|
// inside the erm updater which must complete ASAP. Therefore, those groups will be merged
|
|
// into a single one (main) in background.
|
|
future<> merge_completion_fiber();
|
|
|
|
storage_group& storage_group_for_id(size_t i) const {
|
|
return storage_group_manager::storage_group_for_id(schema(), i);
|
|
}
|
|
|
|
size_t tablet_id_for_token(dht::token t) const noexcept {
|
|
return tablet_map().get_tablet_id(t).value();
|
|
}
|
|
|
|
std::pair<size_t, locator::tablet_range_side> storage_group_of(dht::token t) const {
|
|
auto [id, side] = tablet_map().get_tablet_id_and_range_side(t);
|
|
auto idx = id.value();
|
|
#ifndef SCYLLA_BUILD_MODE_RELEASE
|
|
if (idx >= tablet_count()) {
|
|
on_fatal_internal_error(tlogger, format("storage_group_of: index out of range: idx={} size_log2={} size={} token={}",
|
|
idx, log2_storage_groups(), tablet_count(), t));
|
|
}
|
|
auto& sg = storage_group_for_id(idx);
|
|
if (!t.is_minimum() && !t.is_maximum() && !sg.token_range().contains(t, dht::token_comparator())) {
|
|
on_fatal_internal_error(tlogger, format("storage_group_of: storage_group idx={} range={} does not contain token={}",
|
|
idx, sg.token_range(), t));
|
|
}
|
|
#endif
|
|
return { idx, side };
|
|
}
|
|
|
|
repair_classifier_func make_repair_sstable_classifier_func() const {
|
|
// FIXME: implement it for incremental repair!
|
|
return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
|
|
bool is_repaired = repair::is_repaired(sstables_repaired_at, sst);
|
|
if (is_repaired) {
|
|
return repair_sstable_classification::repaired;
|
|
} else {
|
|
if (!sst->being_repaired.uuid().is_null()) {
|
|
return repair_sstable_classification::repairing;
|
|
} else {
|
|
return repair_sstable_classification::unrepaired;
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
storage_group_ptr allocate_storage_group(const locator::tablet_map& tmap, locator::tablet_id tid, dht::token_range range) const {
|
|
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range), make_repair_sstable_classifier_func());
|
|
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
|
if (tmap.needs_split()) {
|
|
sg->set_split_mode();
|
|
}
|
|
return sg;
|
|
}
|
|
public:
|
|
tablet_storage_group_manager(table& t, const locator::effective_replication_map& erm)
|
|
: _t(t)
|
|
, _my_host_id(erm.get_token_metadata().get_my_id())
|
|
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
|
|
, _merge_completion_fiber(merge_completion_fiber())
|
|
{
|
|
storage_group_map ret;
|
|
|
|
auto& tmap = tablet_map();
|
|
auto local_replica = locator::tablet_replica{_my_host_id, this_shard_id()};
|
|
|
|
for (auto tid : tmap.tablet_ids()) {
|
|
if (!tmap.has_replica(tid, local_replica)) {
|
|
continue;
|
|
}
|
|
|
|
// if the tablet was cleaned up already on this replica, don't allocate a storage group for it.
|
|
auto trinfo = tmap.get_tablet_transition_info(tid);
|
|
if (trinfo && locator::is_post_cleanup(local_replica, tmap.get_tablet_info(tid), *trinfo)) {
|
|
continue;
|
|
}
|
|
|
|
auto range = tmap.get_token_range(tid);
|
|
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
|
|
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
|
|
}
|
|
_storage_groups = std::move(ret);
|
|
}
|
|
|
|
future<> stop() override {
|
|
_merge_completion_event.signal();
|
|
return when_all(std::exchange(_merge_completion_fiber, make_ready_future<>()),
|
|
std::exchange(_stop_fut, make_ready_future())).discard_result();
|
|
}
|
|
|
|
void update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override;
|
|
|
|
compaction_group& compaction_group_for_token(dht::token token) const override;
|
|
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const override;
|
|
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override;
|
|
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override;
|
|
|
|
size_t log2_storage_groups() const override {
|
|
return log2ceil(tablet_map().tablet_count());
|
|
}
|
|
storage_group& storage_group_for_token(dht::token token) const override {
|
|
return storage_group_for_id(storage_group_of(token).first);
|
|
}
|
|
|
|
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const override;
|
|
bool all_storage_groups_split() override;
|
|
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
|
|
future<> maybe_split_compaction_group_of(size_t idx) override;
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
|
|
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
|
|
return tablet_map().get_token_range_after_split(token);
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
|
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
|
|
return make_tablet_sstable_set(schema(), *this, *_tablet_map);
|
|
}
|
|
};
|
|
|
|
bool table::uses_tablets() const {
|
|
return _erm && _erm->get_replication_strategy().uses_tablets();
|
|
}
|
|
|
|
storage_group::storage_group(compaction_group_ptr cg)
|
|
: _main_cg(cg)
|
|
, _async_gate(format("[storage_group {}.{} {}]", cg->schema()->ks_name(), cg->schema()->cf_name(), cg->group_id()))
|
|
{
|
|
}
|
|
|
|
const dht::token_range& storage_group::token_range() const noexcept {
|
|
return _main_cg->token_range();
|
|
}
|
|
|
|
const compaction_group_ptr& storage_group::main_compaction_group() const noexcept {
|
|
return _main_cg;
|
|
}
|
|
|
|
const std::vector<compaction_group_ptr>& storage_group::split_ready_compaction_groups() const {
|
|
return _split_ready_groups;
|
|
}
|
|
|
|
size_t storage_group::to_idx(locator::tablet_range_side side) const {
|
|
return size_t(side);
|
|
}
|
|
|
|
compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_range_side side) noexcept {
|
|
if (splitting_mode()) {
|
|
return _split_ready_groups[to_idx(side)];
|
|
}
|
|
return _main_cg;
|
|
}
|
|
|
|
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
|
|
action(_main_cg);
|
|
for (auto& cg : _merging_groups) {
|
|
action(cg);
|
|
}
|
|
for (auto& cg : _split_ready_groups) {
|
|
action(cg);
|
|
}
|
|
}
|
|
|
|
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() {
|
|
utils::small_vector<compaction_group_ptr, 3> cgs;
|
|
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
|
|
cgs.push_back(cg);
|
|
});
|
|
return cgs;
|
|
}
|
|
|
|
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const {
|
|
utils::small_vector<const_compaction_group_ptr, 3> cgs;
|
|
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
|
|
cgs.push_back(cg);
|
|
});
|
|
return cgs;
|
|
}
|
|
|
|
utils::small_vector<compaction_group_ptr, 3> storage_group::split_unready_groups() const {
|
|
utils::small_vector<compaction_group_ptr, 3> cgs;
|
|
cgs.push_back(_main_cg);
|
|
std::copy(_merging_groups.begin(), _merging_groups.end(), std::back_inserter(cgs));
|
|
return cgs;
|
|
}
|
|
|
|
bool storage_group::split_unready_groups_are_empty() const {
|
|
return std::ranges::all_of(split_unready_groups(), std::mem_fn(&compaction_group::empty));
|
|
}
|
|
|
|
bool storage_group::set_split_mode() {
|
|
// A group being stopped (e.g. during migration cleanup) cannot satisfy split mode.
|
|
// Also, a race can happen if new groups are added while old ones are being stopped,
|
|
// so the new ones can be left unstopped, potentially resulting in use-after-free.
|
|
if (_async_gate.is_closed()) {
|
|
return false;
|
|
}
|
|
if (!splitting_mode()) {
|
|
auto create_cg = [this] () -> compaction_group_ptr {
|
|
// TODO: use the actual sub-ranges instead, to help incremental selection on the read path.
|
|
return compaction_group::make_empty_group(*_main_cg);
|
|
};
|
|
tlogger.debug("storage_group::set_split_mode: Set sstables_repaired_at={} for split old_group={} old_range={}",
|
|
_main_cg->get_sstables_repaired_at(), _main_cg->group_id(), _main_cg->token_range());
|
|
std::vector<compaction_group_ptr> split_ready_groups(2);
|
|
split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg();
|
|
split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg();
|
|
_split_ready_groups = std::move(split_ready_groups);
|
|
}
|
|
|
|
// The storage group is considered "split ready" if all split unready groups (main + merging) are empty.
|
|
return split_unready_groups_are_empty();
|
|
}
|
|
|
|
void storage_group::add_merging_group(compaction_group_ptr cg) {
|
|
_merging_groups.push_back(std::move(cg));
|
|
}
|
|
|
|
const std::vector<compaction_group_ptr>& storage_group::merging_groups() const {
|
|
return _merging_groups;
|
|
}
|
|
|
|
future<> storage_group::remove_empty_merging_groups() {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
for (auto& group : _merging_groups | std::views::filter(std::mem_fn(&compaction_group::empty))) {
|
|
co_await group->stop("tablet merge");
|
|
}
|
|
std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty));
|
|
}
|
|
|
|
future<> compaction_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) {
|
|
auto& cm = get_compaction_manager();
|
|
|
|
for (auto view : all_views()) {
|
|
auto lock_holder = co_await cm.get_incremental_repair_read_lock(*view, "storage_group_split");
|
|
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
|
|
co_await cm.perform_offstrategy(*view, tablet_split_task_info);
|
|
co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info);
|
|
}
|
|
}
|
|
|
|
future<> storage_group::split(compaction::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) {
|
|
if (set_split_mode()) {
|
|
co_return;
|
|
}
|
|
co_await utils::get_local_injector().inject("delay_split_compaction", 5s);
|
|
|
|
if (split_unready_groups_are_empty()) {
|
|
co_return;
|
|
}
|
|
for (auto cg : split_unready_groups()) {
|
|
if (cg->async_gate().is_closed()) {
|
|
continue;
|
|
}
|
|
auto holder = cg->async_gate().hold();
|
|
co_await cg->flush();
|
|
co_await cg->split(opt, tablet_split_task_info);
|
|
}
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> storage_group::make_sstable_set() const {
|
|
if (_split_ready_groups.empty() && _merging_groups.empty()) {
|
|
return _main_cg->make_sstable_set();
|
|
}
|
|
const auto& schema = _main_cg->_t.schema();
|
|
std::vector<lw_shared_ptr<sstables::sstable_set>> underlying;
|
|
underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size());
|
|
underlying.emplace_back(_main_cg->make_sstable_set());
|
|
for (const auto& cg : _merging_groups) {
|
|
if (!cg->empty()) {
|
|
underlying.emplace_back(cg->make_sstable_set());
|
|
}
|
|
}
|
|
for (const auto& cg : _split_ready_groups) {
|
|
underlying.emplace_back(cg->make_sstable_set());
|
|
}
|
|
return make_lw_shared(sstables::make_compound_sstable_set(schema, std::move(underlying)));
|
|
}
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> table::sstable_set_for_tombstone_gc(const compaction_group& cg) const {
|
|
auto& sg = storage_group_for_id(cg.group_id());
|
|
return sg.make_sstable_set();
|
|
}
|
|
|
|
bool tablet_storage_group_manager::all_storage_groups_split() {
|
|
auto& tmap = tablet_map();
|
|
if (_split_ready_seq_number == tmap.resize_decision().sequence_number) {
|
|
return true;
|
|
}
|
|
|
|
bool split_ready = true;
|
|
for (const storage_group_ptr& sg : _storage_groups | std::views::values) {
|
|
split_ready &= sg->set_split_mode();
|
|
}
|
|
|
|
// The table replica will say to coordinator that its split status is ready by
|
|
// mirroring the sequence number from tablet metadata into its local state,
|
|
// which is pulled periodically by coordinator.
|
|
if (split_ready) {
|
|
_split_ready_seq_number = tmap.resize_decision().sequence_number;
|
|
tlogger.info0("Setting split ready sequence number to {} for table {}.{}",
|
|
_split_ready_seq_number, schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
return split_ready;
|
|
}
|
|
|
|
bool table::all_storage_groups_split() {
|
|
return _sg_manager->all_storage_groups_split();
|
|
}
|
|
|
|
future<compaction::compaction_type_options::split> tablet_storage_group_manager::split_compaction_options() const noexcept {
|
|
// Split must work with a snapshot of tablet map, since it expects stability
|
|
// throughout its execution.
|
|
auto erm = _t.get_effective_replication_map();
|
|
auto tablet_map_ptr = co_await erm->get_token_metadata().tablets().get_tablet_map_ptr(schema()->id());
|
|
|
|
co_return compaction::compaction_type_options::split([tablet_map_ptr = make_lw_shared(std::move(tablet_map_ptr))] (dht::token t) {
|
|
// Classifies the input stream into either left or right side.
|
|
auto [_, side] = (*tablet_map_ptr)->get_tablet_id_and_range_side(t);
|
|
return mutation_writer::token_group_id(side);
|
|
});
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
|
|
compaction::compaction_type_options::split opt = co_await split_compaction_options();
|
|
|
|
co_await utils::get_local_injector().inject("split_storage_groups_wait", [] (auto& handler) -> future<> {
|
|
dblog.info("split_storage_groups_wait: waiting");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
dblog.info("split_storage_groups_wait: done");
|
|
}, false);
|
|
|
|
co_await for_each_storage_group_gently([opt, tablet_split_task_info] (storage_group& storage_group) {
|
|
return storage_group.split(opt, tablet_split_task_info);
|
|
});
|
|
}
|
|
|
|
future<> table::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
|
|
auto holder = async_gate().hold();
|
|
co_await _sg_manager->split_all_storage_groups(tablet_split_task_info);
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t idx) {
|
|
if (!tablet_map().needs_split()) {
|
|
co_return;
|
|
}
|
|
tasks::task_info tablet_split_task_info{tasks::task_id{tablet_map().resize_task_info().tablet_task_id.uuid()}, 0};
|
|
|
|
auto sg = _storage_groups[idx];
|
|
if (!sg) {
|
|
on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard",
|
|
idx, schema()->ks_name(), schema()->cf_name()));
|
|
}
|
|
|
|
co_return co_await sg->split(co_await split_compaction_options(), tablet_split_task_info);
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>>
|
|
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
|
|
if (!tablet_map().needs_split()) {
|
|
co_return std::vector<sstables::shared_sstable>{sst};
|
|
}
|
|
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
auto holder = cg.async_gate().hold();
|
|
auto& view = cg.view_for_sstable(sst);
|
|
auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable");
|
|
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, co_await split_compaction_options());
|
|
}
|
|
|
|
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
|
auto holder = async_gate().hold();
|
|
co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value());
|
|
}
|
|
|
|
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
|
|
auto holder = async_gate().hold();
|
|
co_return co_await _sg_manager->maybe_split_sstable(sst);
|
|
}
|
|
|
|
dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept {
|
|
return _sg_manager->get_token_range_after_split(token);
|
|
}
|
|
|
|
std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
|
|
std::unique_ptr<storage_group_manager> ret;
|
|
if (uses_tablets()) {
|
|
ret = std::make_unique<tablet_storage_group_manager>(*this, *_erm);
|
|
} else {
|
|
ret = std::make_unique<single_storage_group_manager>(*this);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
compaction_group* table::get_compaction_group(size_t id) const {
|
|
return storage_group_for_id(id).main_compaction_group().get();
|
|
}
|
|
|
|
storage_group& table::storage_group_for_token(dht::token token) const {
|
|
return _sg_manager->storage_group_for_token(token);
|
|
}
|
|
|
|
storage_group& table::storage_group_for_id(size_t i) const {
|
|
return _sg_manager->storage_group_for_id(_schema, i);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_token(dht::token token) const {
|
|
auto [idx, range_side] = storage_group_of(token);
|
|
auto& sg = storage_group_for_id(idx);
|
|
return *sg.select_compaction_group(range_side);
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_token(dht::token token) const {
|
|
return _sg_manager->compaction_group_for_token(token);
|
|
}
|
|
|
|
utils::chunked_vector<storage_group_ptr> tablet_storage_group_manager::storage_groups_for_token_range(dht::token_range tr) const {
|
|
utils::chunked_vector<storage_group_ptr> ret;
|
|
auto cmp = dht::token_comparator();
|
|
|
|
size_t candidate_start = tr.start() ? tablet_id_for_token(tr.start()->value()) : size_t(0);
|
|
size_t candidate_end = tr.end() ? tablet_id_for_token(tr.end()->value()) : (tablet_count() - 1);
|
|
|
|
while (candidate_start <= candidate_end) {
|
|
auto it = _storage_groups.find(candidate_start++);
|
|
if (it == _storage_groups.end()) {
|
|
continue;
|
|
}
|
|
auto& sg = it->second;
|
|
if (sg && tr.overlaps(sg->token_range(), cmp)) {
|
|
ret.push_back(sg);
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
utils::chunked_vector<storage_group_ptr> table::storage_groups_for_token_range(dht::token_range tr) const {
|
|
return _sg_manager->storage_groups_for_token_range(tr);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const {
|
|
return compaction_group_for_token(dht::get_token(*s, key));
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const {
|
|
return _sg_manager->compaction_group_for_key(key, s);
|
|
}
|
|
|
|
compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
|
|
auto [first_id, first_range_side] = storage_group_of(sst->get_first_decorated_key().token());
|
|
auto [last_id, last_range_side] = storage_group_of(sst->get_last_decorated_key().token());
|
|
|
|
if (first_id != last_id) {
|
|
on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}",
|
|
sst->get_filename(), first_id, last_id));
|
|
}
|
|
|
|
try {
|
|
auto& sg = storage_group_for_id(first_id);
|
|
|
|
if (first_range_side != last_range_side) {
|
|
return *sg.main_compaction_group();
|
|
}
|
|
|
|
return *sg.select_compaction_group(first_range_side);
|
|
} catch (std::out_of_range& e) {
|
|
on_internal_error(tlogger, format("Unable to load SSTable {} : {}", sst->get_filename(), e.what()));
|
|
}
|
|
}
|
|
|
|
compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
|
|
return _sg_manager->compaction_group_for_sstable(sst);
|
|
}
|
|
|
|
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
|
|
co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> {
|
|
co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> {
|
|
tlogger.info("foreach_compaction_group_wait: waiting");
|
|
while (!handler.poll_for_message() && !_async_gate.is_closed() && !sg.async_gate().is_closed()) {
|
|
co_await sleep(std::chrono::milliseconds(5));
|
|
}
|
|
tlogger.info("foreach_compaction_group_wait: released");
|
|
});
|
|
|
|
co_await coroutine::parallel_for_each(sg.compaction_groups(), [&] (compaction_group_ptr cg) -> future<> {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
co_await action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
void table::for_each_compaction_group(std::function<void(compaction_group&)> action) {
|
|
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
void table::for_each_compaction_group(std::function<void(const compaction_group&)> action) const {
|
|
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
|
|
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
|
|
if (auto holder = try_hold_gate(cg->async_gate())) {
|
|
action(*cg);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
const storage_group_map& table::storage_groups() const {
|
|
return _sg_manager->storage_groups();
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::sstable_files_snapshot>> table::take_storage_snapshot(dht::token_range tr) {
|
|
utils::chunked_vector<sstables::sstable_files_snapshot> ret;
|
|
|
|
for (auto& sg : storage_groups_for_token_range(tr)) {
|
|
co_await utils::get_local_injector().inject("take_storage_snapshot", utils::wait_for_message(60s));
|
|
|
|
// We don't care about sstables in snapshot being unlinked, as the file
|
|
// descriptors remain opened until last reference to them are gone.
|
|
// Also, we should be careful with taking a deletion lock here as a
|
|
// deadlock might occur due to memtable flush backpressure waiting on
|
|
// compaction to reduce the backlog.
|
|
|
|
co_await sg->flush();
|
|
|
|
// The sstable set must be obtained *after* the deletion lock is taken,
|
|
// otherwise components of sstables in the set might be unlinked from the filesystem
|
|
// by compaction while we are waiting for the lock.
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
// It's vital that we build a set on the storage group level, since sstables
|
|
// might move across compaction groups in the background.
|
|
co_await sg->make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
|
ret.push_back({
|
|
.sst = sst,
|
|
.files = co_await sst->readable_file_for_all_components(),
|
|
});
|
|
});
|
|
}
|
|
|
|
co_return std::move(ret);
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_snapshot() {
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
utils::chunked_vector<sstables::shared_sstable> result;
|
|
co_await get_sstable_set().for_each_sstable_gently([&] (sstables::shared_sstable sst) {
|
|
result.push_back(sst);
|
|
});
|
|
co_return result;
|
|
}
|
|
|
|
future<utils::chunked_vector<sstables::entry_descriptor>>
|
|
table::clone_tablet_storage(locator::tablet_id tid) {
|
|
utils::chunked_vector<sstables::entry_descriptor> ret;
|
|
auto holder = async_gate().hold();
|
|
|
|
auto& sg = storage_group_for_id(tid.value());
|
|
auto sg_holder = sg.async_gate().hold();
|
|
co_await sg.flush();
|
|
// The sstable set must be obtained *after* the deletion lock is taken,
|
|
// otherwise components of sstables in the set might be unlinked from the filesystem
|
|
// by compaction while we are waiting for the lock.
|
|
auto deletion_guard = co_await get_sstable_list_permit();
|
|
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
|
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
|
|
});
|
|
co_return ret;
|
|
}
|
|
|
|
void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept {
|
|
_stats.live_disk_space_used += sst->get_file_size_stats();
|
|
_stats.total_disk_space_used += sst->get_file_size_stats();
|
|
_stats.live_sstable_count++;
|
|
}
|
|
|
|
future<>
|
|
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
|
|
bool trigger_compaction) {
|
|
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
|
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
|
|
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
|
// atomically load all opened sstables into column family.
|
|
if (!offstrategy) {
|
|
add_sstable(cg, sst);
|
|
} else {
|
|
add_maintenance_sstable(cg, sst);
|
|
}
|
|
update_stats_for_new_sstable(sst);
|
|
if (trigger_compaction) {
|
|
try_trigger_compaction(cg);
|
|
}
|
|
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) {
|
|
return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key()));
|
|
});
|
|
}
|
|
|
|
future<>
|
|
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
|
|
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
// Hold gate to make share compaction group is alive.
|
|
auto holder = cg.async_gate().hold();
|
|
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
|
|
bool do_trigger_compaction = offstrategy == sstables::offstrategy::no;
|
|
co_await do_add_sstable_and_update_cache(std::move(sst), offstrategy, do_trigger_compaction);
|
|
}
|
|
|
|
future<>
|
|
table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts) {
|
|
constexpr bool do_not_trigger_compaction = false;
|
|
for (auto& sst : ssts) {
|
|
co_await do_add_sstable_and_update_cache(sst, sstables::offstrategy::no, do_not_trigger_compaction);
|
|
}
|
|
trigger_compaction();
|
|
}
|
|
|
|
future<>
|
|
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
|
|
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
|
mutation_source_opt ms_opt;
|
|
if (ssts.size() == 1) {
|
|
ms_opt = ssts.front()->as_mutation_source();
|
|
} else {
|
|
std::vector<mutation_source> sources;
|
|
sources.reserve(ssts.size());
|
|
for (auto& sst : ssts) {
|
|
sources.push_back(sst->as_mutation_source());
|
|
}
|
|
ms_opt = make_combined_mutation_source(std::move(sources));
|
|
}
|
|
auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(*ms_opt), &cg] () mutable {
|
|
// FIXME: the following isn't exception safe.
|
|
for (auto& sst : ssts) {
|
|
add_sstable(cg, sst);
|
|
update_stats_for_new_sstable(sst);
|
|
}
|
|
m->mark_flushed(std::move(new_ssts_ms));
|
|
try_trigger_compaction(cg);
|
|
});
|
|
if (cache_enabled()) {
|
|
co_return co_await _cache.update(std::move(adder), *m);
|
|
} else {
|
|
co_return co_await _cache.invalidate(std::move(adder)).then([m] { return m->clear_gently(); });
|
|
}
|
|
}
|
|
|
|
// Handles permit management only, used for situations where we don't want to inform
|
|
// the compaction manager about backlogs (i.e., tests)
|
|
class permit_monitor : public sstables::write_monitor {
|
|
lw_shared_ptr<sstable_write_permit> _permit;
|
|
public:
|
|
permit_monitor(lw_shared_ptr<sstable_write_permit> permit)
|
|
: _permit(std::move(permit)) {
|
|
}
|
|
|
|
virtual void on_write_started(const sstables::writer_offset_tracker& t) override { }
|
|
virtual void on_data_write_completed() override {
|
|
// We need to start a flush before the current one finishes, otherwise
|
|
// we'll have a period without significant disk activity when the current
|
|
// SSTable is being sealed, the caches are being updated, etc. To do that,
|
|
// we ensure the permit doesn't outlive this continuation.
|
|
*_permit = sstable_write_permit::unconditional();
|
|
}
|
|
};
|
|
|
|
// Handles all tasks related to sstable writing: permit management, compaction backlog updates, etc
|
|
class database_sstable_write_monitor : public permit_monitor, public compaction::backlog_write_progress_manager {
|
|
sstables::shared_sstable _sst;
|
|
compaction_group& _cg;
|
|
const sstables::writer_offset_tracker* _tracker = nullptr;
|
|
uint64_t _progress_seen = 0;
|
|
api::timestamp_type _maximum_timestamp;
|
|
public:
|
|
database_sstable_write_monitor(lw_shared_ptr<sstable_write_permit> permit, sstables::shared_sstable sst,
|
|
compaction_group& cg, api::timestamp_type max_timestamp)
|
|
: permit_monitor(std::move(permit))
|
|
, _sst(std::move(sst))
|
|
, _cg(cg)
|
|
, _maximum_timestamp(max_timestamp)
|
|
{}
|
|
|
|
database_sstable_write_monitor(const database_sstable_write_monitor&) = delete;
|
|
database_sstable_write_monitor(database_sstable_write_monitor&& x) = default;
|
|
|
|
~database_sstable_write_monitor() {
|
|
// We failed to finish handling this SSTable, so we have to update the backlog_tracker
|
|
// about it.
|
|
if (_sst) {
|
|
_cg.get_backlog_tracker().revert_charges(_sst);
|
|
}
|
|
}
|
|
|
|
virtual void on_write_started(const sstables::writer_offset_tracker& t) override {
|
|
_tracker = &t;
|
|
_cg.get_backlog_tracker().register_partially_written_sstable(_sst, *this);
|
|
}
|
|
|
|
virtual void on_data_write_completed() override {
|
|
permit_monitor::on_data_write_completed();
|
|
_progress_seen = _tracker->offset;
|
|
_tracker = nullptr;
|
|
}
|
|
|
|
virtual uint64_t written() const override {
|
|
if (_tracker) {
|
|
return _tracker->offset;
|
|
}
|
|
return _progress_seen;
|
|
}
|
|
|
|
api::timestamp_type maximum_timestamp() const override {
|
|
return _maximum_timestamp;
|
|
}
|
|
|
|
unsigned level() const override {
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
// The function never fails.
|
|
// It either succeeds eventually after retrying or aborts.
|
|
future<>
|
|
table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) noexcept {
|
|
auto old = cg.memtables()->back();
|
|
tlogger.debug("Sealing active memtable of {}.{}, partitions: {}, occupancy: {}", _schema->ks_name(), _schema->cf_name(), old->partition_count(), old->occupancy());
|
|
|
|
if (old->empty()) {
|
|
tlogger.debug("Memtable is empty");
|
|
co_return co_await _flush_barrier.advance_and_await();
|
|
}
|
|
|
|
auto permit = std::move(flush_permit);
|
|
auto r = exponential_backoff_retry(100ms, 10s);
|
|
// Try flushing for around half an hour (30 minutes every 10 seconds)
|
|
int default_retries = 30 * 60 / 10;
|
|
int allowed_retries = default_retries;
|
|
std::optional<utils::phased_barrier::operation> op;
|
|
size_t memtable_size;
|
|
future<> previous_flush = make_ready_future<>();
|
|
|
|
auto with_retry = [&] (std::function<future<>()> func) -> future<> {
|
|
for (;;) {
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_return co_await func();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
_config.cf_stats->failed_memtables_flushes_count++;
|
|
|
|
auto should_retry = [](auto* ep) {
|
|
int ec = ep->code().value();
|
|
return ec == ENOSPC || ec == EDQUOT || ec == EPERM || ec == EACCES;
|
|
};
|
|
if (try_catch<std::bad_alloc>(ex)) {
|
|
// There is a chance something else will free the memory, so we can try again
|
|
allowed_retries--;
|
|
} else if (auto ep = try_catch<std::system_error>(ex)) {
|
|
allowed_retries = should_retry(ep) ? default_retries : 0;
|
|
} else if (auto ep = try_catch<storage_io_error>(ex)) {
|
|
allowed_retries = should_retry(ep) ? default_retries : 0;
|
|
} else if (try_catch<db::extension_storage_exception>(ex)) {
|
|
allowed_retries = default_retries;
|
|
} else {
|
|
allowed_retries = 0;
|
|
}
|
|
|
|
if (allowed_retries <= 0) {
|
|
// At this point we don't know what has happened and it's better to potentially
|
|
// take the node down and rely on commitlog to replay.
|
|
//
|
|
// FIXME: enter maintenance mode when available.
|
|
// since replaying the commitlog with a corrupt mutation
|
|
// may end up in an infinite crash loop.
|
|
tlogger.error("Memtable flush failed due to: {}. Aborting, at {}", ex, current_backtrace());
|
|
std::abort();
|
|
}
|
|
}
|
|
if (_async_gate.is_closed()) {
|
|
tlogger.warn("Memtable flush failed due to: {}. Dropped due to shutdown", ex);
|
|
co_await std::move(previous_flush);
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
tlogger.warn("Memtable flush failed due to: {}. Will retry in {}ms", ex, r.sleep_time().count());
|
|
co_await r.retry();
|
|
}
|
|
};
|
|
|
|
co_await with_retry([&] {
|
|
tlogger.debug("seal_active_memtable: adding memtable");
|
|
utils::get_local_injector().inject("table_seal_active_memtable_add_memtable", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
|
|
cg.memtables()->add_memtable();
|
|
_highest_flushed_rp = std::max(_highest_flushed_rp, old->replay_position());
|
|
|
|
// no exceptions allowed (nor expected) from this point on
|
|
_stats.memtable_switch_count++;
|
|
[&] () noexcept {
|
|
// This will set evictable occupancy of the old memtable region to zero, so that
|
|
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
|
|
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
|
|
// the permit is released even though there is not much to flush in the active memtable of this list.
|
|
old->region().ground_evictable_occupancy();
|
|
memtable_size = old->occupancy().total_space();
|
|
}();
|
|
return make_ready_future<>();
|
|
});
|
|
|
|
co_await with_retry([&] {
|
|
previous_flush = _flush_barrier.advance_and_await();
|
|
utils::get_local_injector().inject("table_seal_active_memtable_start_op", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
op = _flush_barrier.start();
|
|
|
|
// no exceptions allowed (nor expected) from this point on
|
|
_stats.pending_flushes++;
|
|
_config.cf_stats->pending_memtables_flushes_count++;
|
|
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
|
|
return make_ready_future<>();
|
|
});
|
|
|
|
auto undo_stats = std::make_optional(deferred_action([this, memtable_size] () noexcept {
|
|
_stats.pending_flushes--;
|
|
_config.cf_stats->pending_memtables_flushes_count--;
|
|
_config.cf_stats->pending_memtables_flushes_bytes -= memtable_size;
|
|
}));
|
|
|
|
co_await with_retry([&] () -> future<> {
|
|
// Reacquiring the write permit might be needed if retrying flush
|
|
if (!permit.has_sstable_write_permit()) {
|
|
tlogger.debug("seal_active_memtable: reacquiring write permit");
|
|
utils::get_local_injector().inject("table_seal_active_memtable_reacquire_write_permit", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
permit = co_await std::move(permit).reacquire_sstable_write_permit();
|
|
}
|
|
auto write_permit = permit.release_sstable_write_permit();
|
|
|
|
utils::get_local_injector().inject("table_seal_active_memtable_try_flush", []() {
|
|
throw std::system_error(ENOSPC, std::system_category(), "Injected error");
|
|
});
|
|
co_return co_await this->try_flush_memtable_to_sstable(cg, old, std::move(write_permit));
|
|
});
|
|
|
|
undo_stats.reset();
|
|
|
|
if (_commitlog) {
|
|
_commitlog->discard_completed_segments(_schema->id(), old->get_and_discard_rp_set());
|
|
}
|
|
co_await std::move(previous_flush);
|
|
// keep `op` alive until after previous_flush resolves
|
|
|
|
// FIXME: release commit log
|
|
// FIXME: provide back-pressure to upper layers
|
|
}
|
|
|
|
future<>
|
|
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
|
|
auto try_flush = [this, old = std::move(old), permit = make_lw_shared(std::move(permit)), &cg] () mutable -> future<> {
|
|
// Note that due to our sharded architecture, it is possible that
|
|
// in the face of a value change some shards will backup sstables
|
|
// while others won't.
|
|
//
|
|
// This is, in theory, possible to mitigate through a rwlock.
|
|
// However, this doesn't differ from the situation where all tables
|
|
// are coming from a single shard and the toggle happens in the
|
|
// middle of them.
|
|
//
|
|
// The code as is guarantees that we'll never partially backup a
|
|
// single sstable, so that is enough of a guarantee.
|
|
|
|
auto newtabs = std::vector<sstables::shared_sstable>();
|
|
auto metadata = mutation_source_metadata{};
|
|
metadata.min_timestamp = old->get_min_timestamp();
|
|
metadata.max_timestamp = old->get_max_timestamp();
|
|
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
|
|
|
|
if (!cg.async_gate().is_closed()) {
|
|
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
|
|
}
|
|
|
|
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
|
|
std::exception_ptr ex;
|
|
try {
|
|
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
|
|
cfg.backup = incremental_backups_enabled();
|
|
|
|
auto newtab = make_sstable();
|
|
newtabs.push_back(newtab);
|
|
tlogger.debug("Flushing to {}", newtab->get_filename());
|
|
|
|
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
|
|
old->get_max_timestamp());
|
|
|
|
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await reader.close();
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
});
|
|
|
|
auto f = consumer(old->make_flush_reader(
|
|
old->schema(),
|
|
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
|
|
|
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
|
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
|
// priority inversion.
|
|
auto post_flush = [this, old = std::move(old), &newtabs, f = std::move(f), &cg] () mutable -> future<> {
|
|
try {
|
|
co_await std::move(f);
|
|
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
|
|
co_await newtab->open_data();
|
|
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
|
});
|
|
|
|
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
|
|
return update_cache(cg, old, newtabs);
|
|
});
|
|
|
|
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
|
|
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
|
if (this_table_name == handler.get("table_name")) {
|
|
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
|
|
handler.set("suspended", true);
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
|
|
}
|
|
});
|
|
|
|
cg.memtables()->erase(old);
|
|
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
|
co_return;
|
|
} catch (const std::exception& e) {
|
|
for (auto& newtab : newtabs) {
|
|
newtab->mark_for_deletion();
|
|
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
|
}
|
|
_config.cf_stats->failed_memtables_flushes_count++;
|
|
// If we failed this write we will try the write again and that will create a new flush reader
|
|
// that will decrease dirty memory again. So we need to reset the accounting.
|
|
old->revert_flushed_memory();
|
|
throw;
|
|
}
|
|
};
|
|
co_return co_await with_scheduling_group(default_scheduling_group(), std::ref(post_flush));
|
|
};
|
|
co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush));
|
|
}
|
|
|
|
void
|
|
table::start() {
|
|
start_compaction();
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.arm(std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::stop() {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
_flush_timer.cancel();
|
|
// Allow `compaction_group::stop` to stop ongoing compactions
|
|
// while they may still hold the table _async_gate
|
|
auto gate_closed_fut = _async_gate.close();
|
|
co_await when_all(
|
|
_pending_reads_phaser.close(),
|
|
_pending_writes_phaser.close(),
|
|
_pending_streams_phaser.close());
|
|
// Allow parallel flushes from the commitlog path
|
|
// to synchronize with table::stop
|
|
{
|
|
auto op = _pending_flushes_phaser.start();
|
|
co_await _sg_manager->stop_storage_groups();
|
|
}
|
|
co_await _pending_flushes_phaser.close();
|
|
co_await _sstable_deletion_gate.close();
|
|
co_await std::move(gate_closed_fut);
|
|
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
|
_sg_manager->clear_storage_groups();
|
|
_sstables = make_compound_sstable_set();
|
|
}));
|
|
_cache.refresh_snapshot();
|
|
}
|
|
static seastar::metrics::label_instance node_table_metrics("__per_table", "node");
|
|
void table::set_metrics() {
|
|
auto cf = column_family_label(_schema->cf_name());
|
|
auto ks = keyspace_label(_schema->ks_name());
|
|
namespace ms = seastar::metrics;
|
|
if (_config.enable_metrics_reporting) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_rows_dropped_by_tombstones", _stats.memtable_app_stats.rows_dropped_by_tombstones, ms::description("Number of rows dropped in memtables by a tombstone write"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_rows_compacted_with_tombstones", _stats.memtable_app_stats.rows_compacted_with_tombstones, ms::description("Number of rows scanned during write of a tombstone for the purpose of compaction in memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_range_tombstone_reads", _stats.memtable_range_tombstone_reads, ms::description("Number of range tombstones read from memtables"))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_tombstone_reads", _stats.memtable_row_tombstone_reads, ms::description("Number of row tombstones read from memtables"))(cf)(ks),
|
|
ms::make_gauge("pending_tasks", ms::description("Estimated number of tasks pending for this column family"), _stats.pending_flushes)(cf)(ks),
|
|
ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks),
|
|
ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks),
|
|
ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks),
|
|
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
|
|
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks),
|
|
ms::make_gauge("pending_sstable_deletions",
|
|
ms::description("Number of tasks waiting to delete sstables from a table"),
|
|
[this] { return _stats.pending_sstable_deletions; })(cf)(ks)
|
|
});
|
|
|
|
// Metrics related to row locking
|
|
auto add_row_lock_metrics = [this, ks, cf] (row_locker::single_lock_stats& stats, sstring stat_name) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_total_operations(format("row_lock_{}_acquisitions", stat_name), stats.lock_acquisitions, ms::description(format("Row lock acquisitions for {} lock", stat_name)))(cf)(ks).set_skip_when_empty(),
|
|
ms::make_queue_length(format("row_lock_{}_operations_currently_waiting_for_lock", stat_name), stats.operations_currently_waiting_for_lock, ms::description(format("Operations currently waiting for {} lock", stat_name)))(cf)(ks),
|
|
ms::make_histogram(format("row_lock_{}_waiting_time", stat_name), ms::description(format("Histogram representing time that operations spent on waiting for {} lock", stat_name)),
|
|
[&stats] {return to_metrics_histogram(stats.estimated_waiting_for_lock);})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
});
|
|
};
|
|
add_row_lock_metrics(_row_locker_stats.exclusive_row, "exclusive_row");
|
|
add_row_lock_metrics(_row_locker_stats.shared_row, "shared_row");
|
|
add_row_lock_metrics(_row_locker_stats.exclusive_partition, "exclusive_partition");
|
|
add_row_lock_metrics(_row_locker_stats.shared_partition, "shared_partition");
|
|
|
|
// View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views)
|
|
if (!_schema->is_view()) {
|
|
_view_stats.register_stats();
|
|
}
|
|
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks)
|
|
});
|
|
}
|
|
|
|
if (!is_internal_keyspace(_schema->ks_name())) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_summary("read_latency_summary", ms::description("Read latency summary"), [this] {return to_metrics_summary(_stats.reads.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("write_latency_summary", ms::description("Write latency summary"), [this] {return to_metrics_summary(_stats.writes.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_prepare_latency_summary", ms::description("CAS prepare round latency summary"), [this] {return to_metrics_summary(_stats.cas_prepare.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_propose_latency_summary", ms::description("CAS accept round latency summary"), [this] {return to_metrics_summary(_stats.cas_accept.summary());})(cf)(ks).set_skip_when_empty(),
|
|
ms::make_summary("cas_commit_latency_summary", ms::description("CAS learn round latency summary"), [this] {return to_metrics_summary(_stats.cas_learn.summary());})(cf)(ks).set_skip_when_empty(),
|
|
|
|
ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_prepare_latency", ms::description("CAS prepare round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_prepare.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_propose_latency", ms::description("CAS accept round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_accept.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("cas_commit_latency", ms::description("CAS learn round latency histogram"), [this] {return to_metrics_histogram(_stats.cas_learn.histogram());})(cf)(ks).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)
|
|
});
|
|
}
|
|
} else {
|
|
if (_config.enable_node_aggregated_table_metrics && !is_internal_keyspace(_schema->ks_name())) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_counter("memtable_switch", ms::description("Number of times flush has resulted in the memtable being switched out"), _stats.memtable_switch_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_writes", [this] () { return _stats.memtable_partition_insertions + _stats.memtable_partition_hits; }, ms::description("Number of write operations performed on partitions in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_partition_hits", _stats.memtable_partition_hits, ms::description("Number of times a write operation was issued on an existing partition in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_writes", _stats.memtable_app_stats.row_writes, ms::description("Number of row writes performed in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_counter("memtable_row_hits", _stats.memtable_app_stats.row_hits, ms::description("Number of rows overwritten by write operations in memtables"))(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("total_disk_space_before_compression", ms::description("Hypothetical total disk space used if data files weren't compressed"), _stats.total_disk_space_used.before_compression)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}),
|
|
ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used.on_disk)(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}),
|
|
ms::make_histogram("read_latency", ms::description("Read latency histogram"), [this] {return to_metrics_histogram(_stats.reads.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
|
ms::make_histogram("write_latency", ms::description("Write latency histogram"), [this] {return to_metrics_histogram(_stats.writes.histogram());})(cf)(ks)(node_table_metrics).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
});
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("tablet_count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({seastar::metrics::shard_label})
|
|
});
|
|
}
|
|
if (this_shard_id() == 0) {
|
|
_metrics.add_group("column_family", {
|
|
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)(ms::shard_label(""))
|
|
});
|
|
}
|
|
}
|
|
}
|
|
if (uses_tablets()) {
|
|
_metrics.add_group("tablets", {
|
|
ms::make_gauge("count", ms::description("Tablet count"), _stats.tablet_count)(cf)(ks).aggregate({column_family_label, keyspace_label})
|
|
});
|
|
}
|
|
}
|
|
|
|
void table::deregister_metrics() {
|
|
_metrics.clear();
|
|
_view_stats._metrics.clear();
|
|
}
|
|
|
|
size_t compaction_group::live_sstable_count() const noexcept {
|
|
return _main_sstables->size() + _maintenance_sstables->size();
|
|
}
|
|
|
|
uint64_t compaction_group::live_disk_space_used() const noexcept {
|
|
return _main_sstables->bytes_on_disk() + _maintenance_sstables->bytes_on_disk();
|
|
}
|
|
|
|
sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() const noexcept {
|
|
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
|
|
}
|
|
|
|
uint64_t storage_group::live_disk_space_used() const {
|
|
auto cgs = const_cast<storage_group&>(*this).compaction_groups();
|
|
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
|
|
}
|
|
|
|
uint64_t compaction_group::total_disk_space_used() const noexcept {
|
|
return live_disk_space_used() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0), std::plus{});
|
|
}
|
|
|
|
sstables::file_size_stats compaction_group::total_disk_space_used_full_stats() const noexcept {
|
|
return live_disk_space_used_full_stats() + std::ranges::fold_left(_sstables_compacted_but_not_deleted | std::views::transform(std::mem_fn(&sstables::sstable::get_file_size_stats)), sstables::file_size_stats{}, std::plus{});
|
|
}
|
|
|
|
void table::rebuild_statistics() {
|
|
_stats.live_disk_space_used = {};
|
|
_stats.live_sstable_count = 0;
|
|
_stats.total_disk_space_used = {};
|
|
|
|
for_each_compaction_group([this] (const compaction_group& cg) {
|
|
_stats.live_disk_space_used += cg.live_disk_space_used_full_stats();
|
|
_stats.total_disk_space_used += cg.total_disk_space_used_full_stats();
|
|
_stats.live_sstable_count += cg.live_sstable_count();
|
|
});
|
|
}
|
|
|
|
void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept {
|
|
_stats.live_disk_space_used -= cg.live_disk_space_used_full_stats();
|
|
_stats.total_disk_space_used -= cg.total_disk_space_used_full_stats();
|
|
_stats.live_sstable_count -= cg.live_sstable_count();
|
|
}
|
|
|
|
future<table::sstable_list_permit>
|
|
table::get_sstable_list_permit() {
|
|
co_return sstable_list_permit(co_await seastar::get_units(_sstable_set_mutation_sem, 1));
|
|
}
|
|
|
|
future<table::sstable_list_builder::result>
|
|
table::sstable_list_builder::build_new_list(const sstables::sstable_set& current_sstables,
|
|
sstables::sstable_set new_sstable_list,
|
|
const std::vector<sstables::shared_sstable>& new_sstables,
|
|
const std::vector<sstables::shared_sstable>& old_sstables) {
|
|
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
|
|
|
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
|
|
|
|
// add sstables from the current list into the new list except the ones that are in the old list
|
|
std::vector<sstables::shared_sstable> removed_sstables;
|
|
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
|
|
if (s.contains(tab)) {
|
|
removed_sstables.push_back(tab);
|
|
} else {
|
|
new_sstable_list.insert(tab);
|
|
}
|
|
});
|
|
|
|
// add new sstables into the new list
|
|
for (auto& tab : new_sstables) {
|
|
new_sstable_list.insert(tab);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return table::sstable_list_builder::result {
|
|
make_lw_shared<sstables::sstable_set>(std::move(new_sstable_list)), std::move(removed_sstables)};
|
|
}
|
|
|
|
future<>
|
|
table::delete_sstables_atomically(const sstable_list_permit&, std::vector<sstables::shared_sstable> sstables_to_remove) {
|
|
try {
|
|
auto gh = _sstable_deletion_gate.hold();
|
|
co_await get_sstables_manager().delete_atomically(std::move(sstables_to_remove));
|
|
} catch (...) {
|
|
// There is nothing more we can do here.
|
|
// Any remaining SSTables will eventually be re-compacted and re-deleted.
|
|
tlogger.error("Compacted SSTables deletion failed: {}. Ignored.", std::current_exception());
|
|
}
|
|
}
|
|
|
|
future<>
|
|
table::sstable_list_builder::delete_sstables_atomically(std::vector<sstables::shared_sstable> sstables_to_remove) {
|
|
return _t->delete_sstables_atomically(_permit, std::move(sstables_to_remove));
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable>
|
|
compaction_group::unused_sstables_for_deletion(compaction::compaction_completion_desc desc) const {
|
|
std::unordered_set<sstables::shared_sstable> output(desc.new_sstables.begin(), desc.new_sstables.end());
|
|
|
|
return std::ranges::to<std::vector<sstables::shared_sstable>>(desc.old_sstables
|
|
| std::views::filter([&output] (const sstables::shared_sstable& input_sst) {
|
|
return !output.contains(input_sst);
|
|
}));
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable> compaction_group::all_sstables() const {
|
|
std::vector<sstables::shared_sstable> all;
|
|
auto main_sstables = _main_sstables->all();
|
|
auto maintenance_sstables = _maintenance_sstables->all();
|
|
all.reserve(main_sstables->size() + maintenance_sstables->size());
|
|
std::ranges::copy(*main_sstables, std::back_inserter(all));
|
|
std::ranges::copy(*maintenance_sstables, std::back_inserter(all));
|
|
return all;
|
|
}
|
|
|
|
future<>
|
|
compaction_group::update_repaired_at_for_merge() {
|
|
auto sstables = all_sstables();
|
|
auto sstables_repaired_at = get_sstables_repaired_at();
|
|
co_await seastar::async([&] {
|
|
for (auto& sst : sstables) {
|
|
thread::maybe_yield();
|
|
auto& stats = sst->get_stats_metadata();
|
|
if (stats.repaired_at > sstables_repaired_at) {
|
|
auto neww = 0;
|
|
auto old = sst->update_repaired_at(neww);
|
|
tlogger.info("Finished repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}",
|
|
sst->get_filename(), old, neww, sstables_repaired_at, group_id(), token_range());
|
|
} else {
|
|
auto old = stats.repaired_at;
|
|
tlogger.debug("Skipped repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}",
|
|
sst->get_filename(), old, old, sstables_repaired_at, group_id(), token_range());
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
future<std::vector<compaction::compaction_group_view*>> table::get_compaction_group_views_for_repair(dht::token_range range) {
|
|
std::vector<compaction::compaction_group_view*> ret;
|
|
auto sgs = storage_groups_for_token_range(range);
|
|
for (auto& sg : sgs) {
|
|
co_await coroutine::maybe_yield();
|
|
auto cgs = sg->compaction_groups();
|
|
for (auto& cg : cgs) {
|
|
ret.push_back(&cg->view_for_unrepaired_data());
|
|
}
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<compaction_reenablers_and_lock_holders> table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
|
const service::frozen_topology_guard& guard, dht::token_range range) {
|
|
auto ret = compaction_reenablers_and_lock_holders();
|
|
auto views = co_await get_compaction_group_views_for_repair(range);
|
|
for (auto view : views) {
|
|
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(*view);
|
|
tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard);
|
|
ret.cres.push_back(std::make_unique<compaction::compaction_reenabler>(std::move(cre)));
|
|
|
|
// This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair.
|
|
// The unrepaired compaction started by minor compaction does not need to take the lock since it ignores
|
|
// sstables being repaired, so it can run in parallel with repair.
|
|
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(*view, "row_level_repair");
|
|
tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard);
|
|
ret.lock_holders.push_back(std::move(lock_holder));
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<> table::clear_being_repaired_for_range(dht::token_range range) {
|
|
auto sgs = storage_groups_for_token_range(range);
|
|
for (auto& sg : sgs) {
|
|
auto cgs = sg->compaction_groups();
|
|
for (auto& cg : cgs) {
|
|
auto sstables = cg->all_sstables();
|
|
co_await coroutine::maybe_yield();
|
|
for (auto& sst : sstables) {
|
|
co_await coroutine::maybe_yield();
|
|
if (!sst->being_repaired.uuid().is_null()) {
|
|
sst->being_repaired = service::session_id(utils::UUID());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
future<>
|
|
compaction_group::merge_sstables_from(compaction_group& group) {
|
|
auto permit = co_await _t.get_sstable_list_permit();
|
|
table::sstable_list_builder builder(_t, std::move(permit));
|
|
|
|
auto sstables_to_merge = group.all_sstables();
|
|
// re-build new list for this group with sstables of the group being merged.
|
|
auto res = co_await builder.build_new_list(*main_sstables(), make_main_sstable_set(), sstables_to_merge, {});
|
|
// execute:
|
|
std::invoke([&] noexcept {
|
|
set_main_sstables(std::move(res.new_sstable_set));
|
|
group.clear_sstables();
|
|
// FIXME: backlog adjustment is not exception safe.
|
|
backlog_tracker_adjust_charges({}, sstables_to_merge);
|
|
});
|
|
_t.rebuild_statistics();
|
|
}
|
|
|
|
future<>
|
|
compaction_group::update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc) {
|
|
// Build a new list of _sstables: We remove from the existing list the
|
|
// tables we compacted (by now, there might be more sstables flushed
|
|
// later), and we add the new tables generated by the compaction.
|
|
// We create a new list rather than modifying it in-place, so that
|
|
// on-going reads can continue to use the old list.
|
|
//
|
|
// We only remove old sstables after they are successfully deleted,
|
|
// to avoid a new compaction from ignoring data in the old sstables
|
|
// if the deletion fails (note deletion of shared sstables can take
|
|
// unbounded time, because all shards must agree on the deletion).
|
|
|
|
// make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion.
|
|
for (auto& sst : desc.old_sstables) {
|
|
auto shards = sst->get_shards_for_this_sstable();
|
|
auto& schema = _t.schema();
|
|
if (shards.size() > 1) {
|
|
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!",
|
|
schema->ks_name(), schema->cf_name(), sst->toc_filename()));
|
|
}
|
|
if (!belongs_to_current_shard(shards)) {
|
|
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used sstable {} which doesn't belong to this shard!",
|
|
schema->ks_name(), schema->cf_name(), sst->toc_filename()));
|
|
}
|
|
}
|
|
|
|
// Precompute before so undo_compacted_but_not_deleted can be sure not to throw
|
|
std::unordered_set<sstables::shared_sstable> s(
|
|
desc.old_sstables.begin(), desc.old_sstables.end());
|
|
auto& sstables_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
|
sstables_compacted_but_not_deleted.insert(sstables_compacted_but_not_deleted.end(), desc.old_sstables.begin(), desc.old_sstables.end());
|
|
// After we are done, unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
|
|
// or they could stay forever in the set, resulting in deleted files remaining
|
|
// opened and disk space not being released until shutdown.
|
|
auto undo_compacted_but_not_deleted = defer([&] {
|
|
std::erase_if(sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) {
|
|
return s.contains(sst);
|
|
});
|
|
_t.rebuild_statistics();
|
|
});
|
|
|
|
_t.get_stats().pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_t.get_stats().pending_sstable_deletions--;
|
|
});
|
|
|
|
class sstable_list_updater : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
table::sstable_list_builder& _builder;
|
|
const compaction::compaction_completion_desc& _desc;
|
|
struct replacement_desc {
|
|
compaction::compaction_completion_desc desc;
|
|
table::sstable_list_builder::result main_sstable_set_builder_result;
|
|
std::optional<lw_shared_ptr<sstables::sstable_set>> new_maintenance_sstables;
|
|
};
|
|
std::unordered_map<compaction_group*, replacement_desc> _cg_desc;
|
|
public:
|
|
explicit sstable_list_updater(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d)
|
|
: _t(cg._t), _cg(cg), _builder(builder), _desc(d) {}
|
|
virtual future<> prepare() override {
|
|
// Segregate output sstables according to their owner compaction group.
|
|
// If not in splitting mode, then all output sstables will belong to the same group.
|
|
for (auto& sst : _desc.new_sstables) {
|
|
auto& cg = _t.compaction_group_for_sstable(sst);
|
|
_cg_desc[&cg].desc.new_sstables.push_back(sst);
|
|
}
|
|
// The group that triggered compaction is the only one to have sstables removed from it.
|
|
_cg_desc[&_cg].desc.old_sstables = _desc.old_sstables;
|
|
for (auto& [cg, d] : _cg_desc) {
|
|
d.main_sstable_set_builder_result = co_await _builder.build_new_list(*cg->main_sstables(), cg->make_main_sstable_set(),
|
|
d.desc.new_sstables, d.desc.old_sstables);
|
|
|
|
if (!d.desc.old_sstables.empty()
|
|
&& d.main_sstable_set_builder_result.removed_sstables.size() != d.desc.old_sstables.size()) {
|
|
// Not all old_sstables were removed from the main sstable set, which implies that
|
|
// they don't exist there. This can happen if the input sstables were picked up from
|
|
// the maintenance set during an offstrategy or scrub compaction. So, remove the old
|
|
// sstables from the maintenance set. No need to add any new sstables to the maintenance
|
|
// set though, as they are always added to the main set.
|
|
auto builder_result = co_await _builder.build_new_list(
|
|
*cg->maintenance_sstables(), std::move(*cg->make_maintenance_sstable_set()), {}, d.desc.old_sstables);
|
|
d.new_maintenance_sstables = std::move(builder_result.new_sstable_set);
|
|
}
|
|
}
|
|
}
|
|
virtual void execute() override {
|
|
for (auto&& [cg, d] : _cg_desc) {
|
|
cg->set_main_sstables(std::move(d.main_sstable_set_builder_result.new_sstable_set));
|
|
if (d.new_maintenance_sstables) {
|
|
// offstrategy or scrub compaction - replace the maintenance set
|
|
cg->set_maintenance_sstables(std::move(d.new_maintenance_sstables.value()));
|
|
}
|
|
}
|
|
// FIXME: the following is not exception safe
|
|
_t.refresh_compound_sstable_set();
|
|
for (auto& [cg, d] : _cg_desc) {
|
|
cg->backlog_tracker_adjust_charges(d.main_sstable_set_builder_result.removed_sstables, d.desc.new_sstables);
|
|
}
|
|
}
|
|
static std::unique_ptr<row_cache::external_updater_impl> make(compaction_group& cg, table::sstable_list_builder& builder, compaction::compaction_completion_desc& d) {
|
|
return std::make_unique<sstable_list_updater>(cg, builder, d);
|
|
}
|
|
};
|
|
table::sstable_list_builder builder(_t, co_await _t.get_sstable_list_permit());
|
|
auto updater = row_cache::external_updater(sstable_list_updater::make(*this, builder, desc));
|
|
auto& cache = _t.get_row_cache();
|
|
|
|
co_await cache.invalidate(std::move(updater), std::move(desc.ranges_for_cache_invalidation));
|
|
|
|
// refresh underlying data source in row cache to prevent it from holding reference
|
|
// to sstables files that are about to be deleted.
|
|
cache.refresh_snapshot();
|
|
|
|
_t.rebuild_statistics();
|
|
|
|
co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc)));
|
|
}
|
|
|
|
future<>
|
|
table::compact_all_sstables(tasks::task_info info, do_flush do_flush, bool consider_only_existing_data) {
|
|
if (do_flush) {
|
|
co_await flush();
|
|
}
|
|
// Forces off-strategy before major, so sstables previously sitting on maintenance set will be included
|
|
// in the compaction's input set, to provide same semantics as before maintenance set came into existence.
|
|
co_await perform_offstrategy_compaction(info);
|
|
co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) -> future<> {
|
|
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
|
co_await _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data);
|
|
});
|
|
}
|
|
|
|
void table::start_compaction() {
|
|
set_compaction_strategy(_schema->compaction_strategy());
|
|
}
|
|
|
|
void table::trigger_compaction() {
|
|
for_each_compaction_group([] (compaction_group& cg) {
|
|
cg.trigger_compaction();
|
|
});
|
|
}
|
|
|
|
void table::try_trigger_compaction(compaction_group& cg) noexcept {
|
|
try {
|
|
cg.trigger_compaction();
|
|
} catch (...) {
|
|
tlogger.error("Failed to trigger compaction: {}", std::current_exception());
|
|
}
|
|
}
|
|
|
|
void compaction_group::trigger_compaction() {
|
|
// But not if we're locked out or stopping
|
|
if (!_async_gate.is_closed()) {
|
|
// FIXME: indentation
|
|
for (auto view : all_views()) {
|
|
_t._compaction_manager.submit(*view);
|
|
}
|
|
}
|
|
}
|
|
|
|
void table::trigger_offstrategy_compaction() {
|
|
// Run in background.
|
|
// This is safe since the the compaction task is tracked
|
|
// by the compaction_manager until stop()
|
|
(void)perform_offstrategy_compaction(tasks::task_info{}).then_wrapped([this] (future<bool> f) {
|
|
if (f.failed()) {
|
|
auto ex = f.get_exception();
|
|
tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex);
|
|
}
|
|
});
|
|
}
|
|
|
|
future<bool> table::perform_offstrategy_compaction(tasks::task_info info) {
|
|
// If the user calls trigger_offstrategy_compaction() to trigger
|
|
// off-strategy explicitly, cancel the timeout based automatic trigger.
|
|
_off_strategy_trigger.cancel();
|
|
bool performed = false;
|
|
co_await parallel_foreach_compaction_group_view([this, &performed, info] (compaction::compaction_group_view& view) -> future<> {
|
|
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
|
performed |= co_await _compaction_manager.perform_offstrategy(view, info);
|
|
});
|
|
co_return performed;
|
|
}
|
|
|
|
future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges,
|
|
tasks::task_info info,
|
|
do_flush do_flush) {
|
|
auto* cg = try_get_compaction_group_with_static_sharding();
|
|
if (!cg) {
|
|
co_return;
|
|
}
|
|
|
|
if (do_flush) {
|
|
co_await flush();
|
|
}
|
|
|
|
auto lock_holder = co_await get_compaction_manager().get_incremental_repair_read_lock(cg->as_view_for_static_sharding(), "perform_cleanup_compaction");
|
|
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg->as_view_for_static_sharding(), info);
|
|
}
|
|
|
|
future<unsigned> compaction_group::estimate_pending_compactions() const {
|
|
unsigned ret = 0;
|
|
for (auto& view : all_views()) {
|
|
ret += co_await _t.get_compaction_strategy().estimated_pending_compactions(*view);
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<unsigned> table::estimate_pending_compactions() const {
|
|
unsigned ret = 0;
|
|
co_await const_cast<table*>(this)->parallel_foreach_compaction_group([&ret] (const compaction_group& cg) -> future<> {
|
|
ret += co_await cg.estimate_pending_compactions();
|
|
});
|
|
co_return ret;
|
|
}
|
|
|
|
void compaction_group::set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept {
|
|
_compaction_strategy_state = std::move(compaction_strategy_state);
|
|
}
|
|
|
|
void table::set_compaction_strategy(compaction::compaction_strategy_type strategy) {
|
|
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), compaction::compaction_strategy::name(strategy));
|
|
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
|
|
|
|
struct compaction_group_strategy_updater {
|
|
table& t;
|
|
compaction_group& cg;
|
|
compaction::compaction_backlog_tracker new_bt;
|
|
compaction::compaction_strategy_state new_cs_state;
|
|
|
|
compaction_group_strategy_updater(table& t, compaction_group& cg, compaction::compaction_strategy& new_cs)
|
|
: t(t)
|
|
, cg(cg)
|
|
, new_bt(new_cs.make_backlog_tracker())
|
|
, new_cs_state(compaction::compaction_strategy_state::make(new_cs)) {
|
|
}
|
|
|
|
void prepare(compaction::compaction_strategy& new_cs) {
|
|
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
|
|
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
|
|
|
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
|
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
|
|
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
|
new_sstables_for_backlog_tracker.push_back(s);
|
|
});
|
|
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
|
}
|
|
|
|
void execute() noexcept {
|
|
// Update strategy state and backlog tracker according to new strategy. SSTable set update
|
|
// is delayed until new compaction, which is triggered on strategy change. SSTable set
|
|
// cannot be updated here since it must happen under the set update lock.
|
|
cg.register_backlog_tracker(std::move(new_bt));
|
|
cg.set_compaction_strategy_state(std::move(new_cs_state));
|
|
}
|
|
};
|
|
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
|
|
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
compaction_group_strategy_updater updater(*this, cg, new_cs);
|
|
updater.prepare(new_cs);
|
|
cg_sstable_set_updaters.push_back(std::move(updater));
|
|
});
|
|
// now exception safe:
|
|
_compaction_strategy = std::move(new_cs);
|
|
for (auto& updater : cg_sstable_set_updaters) {
|
|
updater.execute();
|
|
}
|
|
}
|
|
|
|
size_t table::sstables_count() const {
|
|
return _sstables->size();
|
|
}
|
|
|
|
std::vector<uint64_t> table::sstable_count_per_level() const {
|
|
std::vector<uint64_t> count_per_level;
|
|
_sstables->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
auto level = sst->get_sstable_level();
|
|
|
|
if (level + 1 > count_per_level.size()) {
|
|
count_per_level.resize(level + 1, 0UL);
|
|
}
|
|
count_per_level[level]++;
|
|
});
|
|
return count_per_level;
|
|
}
|
|
|
|
int64_t table::get_unleveled_sstables() const {
|
|
// TODO: when we support leveled compaction, we should return the number of
|
|
// SSTables in L0. If leveled compaction is enabled in this column family,
|
|
// then we should return zero, as we currently do.
|
|
return 0;
|
|
}
|
|
|
|
future<std::unordered_set<sstables::shared_sstable>> table::get_sstables_by_partition_key(const sstring& key) const {
|
|
auto pk = partition_key::from_nodetool_style_string(_schema, key);
|
|
auto dk = dht::decorate_key(*_schema, pk);
|
|
auto hk = sstables::sstable::make_hashed_key(*_schema, dk.key());
|
|
|
|
auto sel = std::make_unique<sstables::sstable_set::incremental_selector>(get_sstable_set().make_incremental_selector());
|
|
const auto& sst = sel->select(dk).sstables;
|
|
|
|
std::unordered_set<sstables::shared_sstable> ssts;
|
|
for (auto s : sst) {
|
|
if (co_await s->has_partition_key(hk, dk)) {
|
|
ssts.insert(s);
|
|
}
|
|
}
|
|
co_return ssts;
|
|
}
|
|
|
|
const sstables::sstable_set& table::get_sstable_set() const {
|
|
return *_sstables;
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list> table::get_sstables() const {
|
|
return _sstables->all();
|
|
}
|
|
|
|
std::vector<sstables::shared_sstable> table::select_sstables(const dht::partition_range& range) const {
|
|
return _sstables->select(range);
|
|
}
|
|
|
|
future<> table::drop_quarantined_sstables() {
|
|
class quarantine_removal_updater : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
std::vector<sstables::shared_sstable>& _removed;
|
|
struct compaction_group_update {
|
|
lw_shared_ptr<sstables::sstable_set> new_main_sstables;
|
|
lw_shared_ptr<sstables::sstable_set> new_maintenance_sstables;
|
|
std::vector<sstables::shared_sstable> removed_main_sstables;
|
|
};
|
|
std::unordered_map<compaction_group*, compaction_group_update> _cg_updates;
|
|
|
|
public:
|
|
explicit quarantine_removal_updater(table& t, std::vector<sstables::shared_sstable>& removed)
|
|
: _t(t), _removed(removed) {}
|
|
|
|
virtual future<> prepare() override {
|
|
_t.for_each_compaction_group([&] (compaction_group& cg) {
|
|
auto new_main = make_lw_shared<sstables::sstable_set>(cg.make_main_sstable_set());
|
|
auto new_maintenance = cg.make_maintenance_sstable_set();
|
|
std::vector<sstables::shared_sstable> removed_main;
|
|
|
|
cg.main_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
if (sst->is_quarantined()) {
|
|
_removed.emplace_back(sst);
|
|
removed_main.emplace_back(sst);
|
|
|
|
} else {
|
|
new_main->insert(sst);
|
|
}
|
|
});
|
|
|
|
|
|
cg.maintenance_sstables()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
|
if (sst->is_quarantined()) {
|
|
_removed.emplace_back(sst);
|
|
} else {
|
|
new_maintenance->insert(sst);
|
|
}
|
|
});
|
|
|
|
_cg_updates[&cg] = compaction_group_update{
|
|
.new_main_sstables = std::move(new_main),
|
|
.new_maintenance_sstables = std::move(new_maintenance),
|
|
.removed_main_sstables = std::move(removed_main)
|
|
};
|
|
});
|
|
co_return;
|
|
}
|
|
|
|
virtual void execute() override {
|
|
for (auto& [cg, update] : _cg_updates) {
|
|
cg->set_main_sstables(std::move(update.new_main_sstables));
|
|
cg->set_maintenance_sstables(std::move(update.new_maintenance_sstables));
|
|
}
|
|
_t.refresh_compound_sstable_set();
|
|
for (auto& [cg, d] : _cg_updates) {
|
|
cg->get_backlog_tracker().replace_sstables(d.removed_main_sstables, {});
|
|
}
|
|
}
|
|
|
|
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, std::vector<sstables::shared_sstable>& removed) {
|
|
return std::make_unique<quarantine_removal_updater>(t, removed);
|
|
}
|
|
};
|
|
|
|
|
|
_stats.pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_stats.pending_sstable_deletions--;
|
|
});
|
|
|
|
auto permit = co_await get_sstable_list_permit();
|
|
|
|
std::vector<sstables::shared_sstable> removed;
|
|
auto updater = row_cache::external_updater(quarantine_removal_updater::make(*this, removed));
|
|
co_await _cache.invalidate(std::move(updater));
|
|
|
|
_cache.refresh_snapshot();
|
|
rebuild_statistics();
|
|
|
|
co_await delete_sstables_atomically(permit, std::move(removed));
|
|
}
|
|
|
|
bool storage_group::no_compacted_sstable_undeleted() const {
|
|
return std::ranges::all_of(compaction_groups(), [] (const_compaction_group_ptr& cg) {
|
|
return cg->compacted_undeleted_sstables().empty();
|
|
});
|
|
}
|
|
|
|
// Gets the list of all sstables in the column family, including ones that are
|
|
// not used for active queries because they have already been compacted, but are
|
|
// waiting for delete_atomically() to return.
|
|
//
|
|
// As long as we haven't deleted them, compaction needs to ensure it doesn't
|
|
// garbage-collect a tombstone that covers data in an sstable that may not be
|
|
// successfully deleted.
|
|
lw_shared_ptr<const sstable_list> table::get_sstables_including_compacted_undeleted() const {
|
|
bool no_compacted_undeleted_sstable = std::ranges::all_of(storage_groups() | std::views::values,
|
|
std::mem_fn(&storage_group::no_compacted_sstable_undeleted));
|
|
if (no_compacted_undeleted_sstable) {
|
|
return get_sstables();
|
|
}
|
|
auto ret = make_lw_shared<sstable_list>(*_sstables->all());
|
|
for_each_compaction_group([&ret] (const compaction_group& cg) {
|
|
for (auto&& s: cg.compacted_undeleted_sstables()) {
|
|
ret->insert(s);
|
|
}
|
|
});
|
|
return ret;
|
|
}
|
|
|
|
const std::vector<sstables::shared_sstable>& compaction_group::compacted_undeleted_sstables() const noexcept {
|
|
return _sstables_compacted_but_not_deleted;
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>
|
|
table::make_memory_only_memtable_list() {
|
|
auto get_schema = [this] { return schema(); };
|
|
return make_lw_shared<memtable_list>(std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group,
|
|
&get_compaction_manager().get_shared_tombstone_gc_state());
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>
|
|
table::make_memtable_list(compaction_group& cg) {
|
|
auto seal = [this, &cg] (flush_permit&& permit) -> future<> {
|
|
gate::holder holder = cg.flush_gate().hold();
|
|
co_await seal_active_memtable(cg, std::move(permit));
|
|
};
|
|
auto get_schema = [this] { return schema(); };
|
|
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _memtable_shared_data, _stats, _config.memory_compaction_scheduling_group,
|
|
&get_compaction_manager().get_shared_tombstone_gc_state());
|
|
}
|
|
|
|
class compaction_group::compaction_group_view : public compaction::compaction_group_view {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
// When engaged, compaction is disabled altogether on this view.
|
|
std::optional<compaction::compaction_reenabler> _compaction_reenabler;
|
|
private:
|
|
bool belongs_to_this_view(const sstables::shared_sstable& sst) const {
|
|
return &_cg.view_for_sstable(sst) == this;
|
|
}
|
|
|
|
future<lw_shared_ptr<const sstables::sstable_set>> make_sstable_set_for_this_view(lw_shared_ptr<const sstables::sstable_set> sstables, auto make_sstable_set_func) const {
|
|
sstables::sstable_set ret = make_sstable_set_func();
|
|
auto all_sstables = sstables->all();
|
|
auto belongs_to_this_view_func = [this] (const sstables::shared_sstable& sst) { return belongs_to_this_view(sst); };
|
|
for (auto sst : *all_sstables | std::views::filter(belongs_to_this_view_func)) {
|
|
ret.insert(sst);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return make_lw_shared<const sstables::sstable_set>(std::move(ret));
|
|
}
|
|
public:
|
|
explicit compaction_group_view(table& t, compaction_group& cg) : _t(t), _cg(cg) {}
|
|
|
|
dht::token_range token_range() const noexcept override {
|
|
return _cg.token_range();
|
|
}
|
|
|
|
const schema_ptr& schema() const noexcept override {
|
|
return _t.schema();
|
|
}
|
|
unsigned min_compaction_threshold() const noexcept override {
|
|
// During receiving stream operations, the less we compact the faster streaming is. For
|
|
// bootstrap and replace thereThere are no readers so it is fine to be less aggressive with
|
|
// compactions as long as we don't ignore them completely (this could create a problem for
|
|
// when streaming ends)
|
|
if (_t._is_bootstrap_or_replace) {
|
|
auto target = std::min(_t.schema()->max_compaction_threshold(), 16);
|
|
return std::max(_t.schema()->min_compaction_threshold(), target);
|
|
} else {
|
|
return _t.schema()->min_compaction_threshold();
|
|
}
|
|
}
|
|
bool compaction_enforce_min_threshold() const noexcept override {
|
|
return _t.get_config().compaction_enforce_min_threshold || _t._is_bootstrap_or_replace;
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
|
return make_sstable_set_for_this_view(_cg.main_sstables(), [this] { return _cg.make_main_sstable_set(); });
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
|
return make_sstable_set_for_this_view(_cg.maintenance_sstables(), [this] { return *_cg.make_maintenance_sstable_set(); });
|
|
}
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
|
return _t.sstable_set_for_tombstone_gc(_cg);
|
|
}
|
|
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
|
return compaction::get_fully_expired_sstables(*this, sstables, query_time);
|
|
}
|
|
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
|
|
return _cg.compacted_undeleted_sstables();
|
|
}
|
|
compaction::compaction_strategy& get_compaction_strategy() const noexcept override {
|
|
return _t.get_compaction_strategy();
|
|
}
|
|
compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override {
|
|
return _cg._compaction_strategy_state;
|
|
}
|
|
reader_permit make_compaction_reader_permit() const override {
|
|
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {});
|
|
}
|
|
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
|
return _t.get_sstables_manager();
|
|
}
|
|
sstables::shared_sstable make_sstable() const override {
|
|
return _t.make_sstable();
|
|
}
|
|
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
|
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
|
|
return cfg;
|
|
}
|
|
api::timestamp_type min_memtable_timestamp() const override {
|
|
return _cg.min_memtable_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_timestamp() const override {
|
|
return _cg.min_memtable_live_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_row_marker_timestamp() const override {
|
|
return _cg.min_memtable_live_row_marker_timestamp();
|
|
}
|
|
bool memtable_has_key(const dht::decorated_key& key) const override {
|
|
return _cg.memtable_has_key(key);
|
|
}
|
|
future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
|
co_await _cg.update_sstable_sets_on_compaction_completion(std::move(desc));
|
|
if (offstrategy) {
|
|
_cg.trigger_compaction();
|
|
}
|
|
}
|
|
bool is_auto_compaction_disabled_by_user() const noexcept override {
|
|
return _t.is_auto_compaction_disabled_by_user();
|
|
}
|
|
bool tombstone_gc_enabled() const noexcept override {
|
|
return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled();
|
|
}
|
|
const tombstone_gc_state& get_tombstone_gc_state() const noexcept override {
|
|
return _t.get_compaction_manager().get_tombstone_gc_state();
|
|
}
|
|
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
|
|
return _cg.get_backlog_tracker();
|
|
}
|
|
const std::string get_group_id() const noexcept override {
|
|
return fmt::format("{}", _cg.group_id());
|
|
}
|
|
|
|
seastar::condition_variable& get_staging_done_condition() noexcept override {
|
|
return _cg.get_staging_done_condition();
|
|
}
|
|
|
|
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
|
return _t.get_token_range_after_split(t);
|
|
}
|
|
|
|
int64_t get_sstables_repaired_at() const noexcept override {
|
|
return _cg.get_sstables_repaired_at();
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<compaction_group::compaction_group_view> compaction_group::make_compacting_view() {
|
|
auto view = std::make_unique<compaction_group_view>(_t, *this);
|
|
_t._compaction_manager.add(*view);
|
|
return view;
|
|
}
|
|
|
|
std::unique_ptr<compaction_group::compaction_group_view> compaction_group::make_non_compacting_view() {
|
|
auto view = std::make_unique<compaction_group_view>(_t, *this);
|
|
auto reenabler = _t._compaction_manager.add_with_compaction_disabled(*view);
|
|
// Attaches compaction reenabler, so this non compacting view will not be compacted.
|
|
_compaction_disabler_for_views.push_back(std::move(reenabler));
|
|
return view;
|
|
}
|
|
|
|
compaction_group::compaction_group(table& t, size_t group_id, dht::token_range token_range, repair_classifier_func repair_classifier)
|
|
: _t(t)
|
|
, _unrepaired_view(make_compacting_view())
|
|
, _repairing_view(make_non_compacting_view())
|
|
, _repaired_view(make_compacting_view())
|
|
, _group_id(group_id)
|
|
, _token_range(std::move(token_range))
|
|
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy))
|
|
, _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list())
|
|
, _main_sstables(make_lw_shared<sstables::sstable_set>(make_main_sstable_set()))
|
|
, _maintenance_sstables(make_maintenance_sstable_set())
|
|
, _async_gate(format("[compaction_group {}.{} {}]", t.schema()->ks_name(), t.schema()->cf_name(), group_id))
|
|
, _backlog_tracker(t.get_compaction_strategy().make_backlog_tracker())
|
|
, _repair_sstable_classifier(std::move(repair_classifier))
|
|
{
|
|
}
|
|
|
|
compaction_group_ptr compaction_group::make_empty_group(const compaction_group& base) {
|
|
return make_lw_shared<compaction_group>(base._t, base._group_id, base._token_range, base._repair_sstable_classifier);
|
|
}
|
|
|
|
bool compaction_group::stopped() const noexcept {
|
|
return _async_gate.is_closed();
|
|
}
|
|
|
|
bool compaction_group::compaction_disabled() const {
|
|
return std::ranges::all_of(all_views(), [this] (compaction::compaction_group_view* view) {
|
|
return _t._compaction_manager.compaction_disabled(*view);
|
|
});
|
|
}
|
|
|
|
compaction_group::~compaction_group() {
|
|
// Unclosed group is not tolerated since it might result in an use-after-free.
|
|
if (!compaction_disabled()) {
|
|
on_fatal_internal_error(tlogger, format("Compaction group of id {} that belongs to {}.{} was not disabled.",
|
|
_group_id, _t.schema()->ks_name(), _t.schema()->cf_name()));
|
|
}
|
|
}
|
|
|
|
future<> compaction_group::stop(sstring reason) noexcept {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
// FIXME: indentation
|
|
for (auto view : all_views()) {
|
|
co_await _t._compaction_manager.stop_ongoing_compactions(reason, view);
|
|
}
|
|
co_await _async_gate.close();
|
|
auto flush_future = co_await seastar::coroutine::as_future(flush());
|
|
|
|
co_await _flush_gate.close();
|
|
// FIXME: indentation
|
|
_compaction_disabler_for_views.clear();
|
|
co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s));
|
|
for (auto view : all_views()) {
|
|
co_await _t._compaction_manager.remove(*view, reason);
|
|
}
|
|
|
|
if (flush_future.failed()) {
|
|
co_await seastar::coroutine::return_exception_ptr(flush_future.get_exception());
|
|
}
|
|
}
|
|
|
|
bool compaction_group::empty() const noexcept {
|
|
return _memtables->empty() && live_sstable_count() == 0;
|
|
}
|
|
|
|
const schema_ptr& compaction_group::schema() const {
|
|
return _t.schema();
|
|
}
|
|
|
|
void compaction_group::clear_sstables() {
|
|
_main_sstables = make_lw_shared<sstables::sstable_set>(make_main_sstable_set());
|
|
_maintenance_sstables = make_maintenance_sstable_set();
|
|
}
|
|
|
|
void storage_group::clear_sstables() {
|
|
for (auto cg : compaction_groups()) {
|
|
cg->clear_sstables();
|
|
}
|
|
}
|
|
|
|
table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_options> sopts, compaction::compaction_manager& compaction_manager,
|
|
sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker,
|
|
locator::effective_replication_map_ptr erm)
|
|
: _schema(std::move(schema))
|
|
, _config(std::move(config))
|
|
, _erm(std::move(erm))
|
|
, _storage_opts(std::move(sopts))
|
|
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()),
|
|
keyspace_label(_schema->ks_name()),
|
|
column_family_label(_schema->cf_name())
|
|
)
|
|
, _compaction_manager(compaction_manager)
|
|
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
|
, _sg_manager(make_storage_group_manager())
|
|
, _sstables(make_compound_sstable_set())
|
|
, _sstable_deletion_gate(format("[table {}.{}] sstable_deletion_gate", _schema->ks_name(), _schema->cf_name()))
|
|
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
|
|
, _commitlog(nullptr)
|
|
, _readonly(true)
|
|
, _durable_writes(true)
|
|
, _sstables_manager(sst_manager)
|
|
, _index_manager(this->as_data_dictionary())
|
|
, _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name()))
|
|
, _counter_cell_locks(_schema->is_counter() ? std::make_unique<cell_locker>(_schema, cl_stats) : nullptr)
|
|
, _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_streams_phaser(format("[table {}.{}] pending_streams", _schema->ks_name(), _schema->cf_name()))
|
|
, _pending_flushes_phaser(format("[table {}.{}] pending_flushes", _schema->ks_name(), _schema->cf_name()))
|
|
, _row_locker(_schema)
|
|
, _flush_timer([this]{ on_flush_timer(); })
|
|
, _off_strategy_trigger([this] { trigger_offstrategy_compaction(); })
|
|
{
|
|
if (!_config.enable_disk_writes) {
|
|
tlogger.warn("Writes disabled, column family no durable.");
|
|
}
|
|
|
|
recalculate_tablet_count_stats();
|
|
set_metrics();
|
|
}
|
|
|
|
void table::on_flush_timer() {
|
|
tlogger.debug("on table {}.{} flush timer, period {}ms", _schema->ks_name(), _schema->cf_name(), _schema->memtable_flush_period());
|
|
(void)with_gate(_async_gate, [this] {
|
|
return flush().finally([this] {
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.rearm(timer<lowres_clock>::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
|
|
locator::table_load_stats table_stats;
|
|
table_stats.split_ready_seq_number = _split_ready_seq_number;
|
|
|
|
locator::tablet_load_stats tablet_stats;
|
|
|
|
for_each_storage_group([&] (size_t id, storage_group& sg) {
|
|
locator::global_tablet_id gid { _t.schema()->id(), locator::tablet_id(id) };
|
|
if (tablet_filter(*_tablet_map, gid)) {
|
|
const uint64_t tablet_size = sg.live_disk_space_used();
|
|
table_stats.size_in_bytes += tablet_size;
|
|
const dht::token_range trange = _tablet_map->get_token_range(gid.tablet);
|
|
// Make sure the token range is in the form (a, b]
|
|
SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());
|
|
tablet_stats.tablet_sizes[gid.table][trange] = tablet_size;
|
|
}
|
|
});
|
|
return locator::combined_load_stats{
|
|
.table_ls = std::move(table_stats),
|
|
.tablet_ls = std::move(tablet_stats)
|
|
};
|
|
}
|
|
|
|
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
|
|
return _sg_manager->table_load_stats(std::move(tablet_filter));
|
|
}
|
|
|
|
void tablet_storage_group_manager::handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
|
|
auto table_id = schema()->id();
|
|
size_t old_tablet_count = old_tmap.tablet_count();
|
|
size_t new_tablet_count = new_tmap.tablet_count();
|
|
storage_group_map new_storage_groups;
|
|
|
|
if (!old_tablet_count) {
|
|
on_internal_error(tlogger, format("Table {} had zero tablets, it should never happen when splitting.", table_id));
|
|
}
|
|
|
|
// NOTE: exception when applying replica changes to reflect token metadata will abort for obvious reasons,
|
|
// so exception safety is not required here.
|
|
|
|
unsigned growth_factor = log2ceil(new_tablet_count / old_tablet_count);
|
|
unsigned split_size = 1 << growth_factor;
|
|
tlogger.debug("Growth factor: {}, split size {}", growth_factor, split_size);
|
|
|
|
if (old_tablet_count * split_size != new_tablet_count) {
|
|
on_internal_error(tlogger, format("New tablet count for table {} is unexpected, actual: {}, expected {}.",
|
|
table_id, new_tablet_count, old_tablet_count * split_size));
|
|
}
|
|
|
|
// Stop the released main compaction groups asynchronously
|
|
for (auto& [id, sg] : _storage_groups) {
|
|
if (!sg->split_unready_groups_are_empty()) {
|
|
on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \
|
|
"therefore groups cannot be remapped with the new tablet count.",
|
|
id, table_id));
|
|
}
|
|
// Remove old empty groups, they're unused, but they need to be deregistered properly
|
|
// FIXME: indent.
|
|
for (auto cg_ptr : sg->split_unready_groups()) {
|
|
auto f = cg_ptr->stop("tablet split");
|
|
if (!f.available() || f.failed()) [[unlikely]] {
|
|
_stop_fut = _stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable {
|
|
return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) {
|
|
tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex));
|
|
});
|
|
});
|
|
}
|
|
}
|
|
unsigned first_new_id = id << growth_factor;
|
|
auto split_ready_groups = sg->split_ready_compaction_groups();
|
|
if (split_ready_groups.size() != split_size) {
|
|
on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size));
|
|
}
|
|
for (unsigned i = 0; i < split_size; i++) {
|
|
auto group_id = first_new_id + i;
|
|
auto old_range = old_tmap.get_token_range(locator::tablet_id(id));
|
|
auto new_range = new_tmap.get_token_range(locator::tablet_id(group_id));
|
|
auto sstables_repaired_at = new_tmap.get_tablet_info(locator::tablet_id(group_id)).sstables_repaired_at;
|
|
tlogger.debug("Setting sstables_repaired_at={} for split tablet_id={} old_tid={} new_tid={} old_range={} new_range={} idx={}",
|
|
sstables_repaired_at, table_id, id, group_id, old_range, new_range, i);
|
|
split_ready_groups[i]->update_id_and_range(group_id, new_range);
|
|
new_storage_groups[group_id] = make_lw_shared<storage_group>(std::move(split_ready_groups[i]));
|
|
}
|
|
|
|
tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].",
|
|
id, table_id, fmt::join(std::views::iota(first_new_id, first_new_id+split_size), ", "));
|
|
}
|
|
|
|
_storage_groups = std::move(new_storage_groups);
|
|
}
|
|
|
|
future<> tablet_storage_group_manager::merge_completion_fiber() {
|
|
co_await coroutine::switch_to(_t.get_config().streaming_scheduling_group);
|
|
|
|
while (!_t.async_gate().is_closed()) {
|
|
try {
|
|
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s));
|
|
auto ks_name = schema()->ks_name();
|
|
auto cf_name = schema()->cf_name();
|
|
// Enable compaction after merge is done.
|
|
auto cres = std::exchange(_compaction_reenablers_for_merging, {});
|
|
co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> {
|
|
auto main_group = sg.main_compaction_group();
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
|
int nr = 0;
|
|
int sz = sg.merging_groups().size();
|
|
for (auto& group : sg.merging_groups()) {
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} merging {} out of {} groups",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range(), ++nr, sz);
|
|
// Synchronize with ongoing writes that might be blocked waiting for memory.
|
|
// Also, disabling compaction provides stability on the sstable set.
|
|
// Flushes memtable, so all the data can be moved.
|
|
co_await group->stop("tablet merge");
|
|
if (utils::get_local_injector().enter("merge_completion_fiber_error")) {
|
|
tlogger.info("Got merge_completion_fiber_error");
|
|
co_await sleep(std::chrono::seconds(60));
|
|
}
|
|
co_await main_group->merge_sstables_from(*group);
|
|
}
|
|
co_await sg.remove_empty_merging_groups();
|
|
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished",
|
|
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
|
});
|
|
} catch (...) {
|
|
tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
|
|
tlogger.info("Merge completion fiber finished, about to sleep");
|
|
});
|
|
co_await _merge_completion_event.wait();
|
|
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
|
|
}
|
|
}
|
|
|
|
void tablet_storage_group_manager::handle_tablet_merge_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
|
|
auto table_id = schema()->id();
|
|
size_t old_tablet_count = old_tmap.tablet_count();
|
|
size_t new_tablet_count = new_tmap.tablet_count();
|
|
storage_group_map new_storage_groups;
|
|
|
|
unsigned log2_reduce_factor = log2ceil(old_tablet_count / new_tablet_count);
|
|
unsigned merge_size = 1 << log2_reduce_factor;
|
|
|
|
if (merge_size != 2) {
|
|
throw std::runtime_error(format("Tablet count was not reduced by a factor of 2 (old: {}, new {}) for table {}",
|
|
old_tablet_count, new_tablet_count, table_id));
|
|
}
|
|
|
|
for (auto& [id, sg] : _storage_groups) {
|
|
// Pick first (even) tablet of each sibling pair.
|
|
if (id % merge_size != 0) {
|
|
continue;
|
|
}
|
|
auto new_tid = id >> log2_reduce_factor;
|
|
auto new_range = new_tmap.get_token_range(locator::tablet_id(new_tid));
|
|
auto new_cg = make_lw_shared<compaction_group>(_t, new_tid, new_range, make_repair_sstable_classifier_func());
|
|
for (auto& view : new_cg->all_views()) {
|
|
auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging");
|
|
_compaction_reenablers_for_merging.push_back(std::move(cre));
|
|
}
|
|
auto new_sg = make_lw_shared<storage_group>(std::move(new_cg));
|
|
|
|
for (unsigned i = 0; i < merge_size; i++) {
|
|
auto group_id = id + i;
|
|
|
|
auto it = _storage_groups.find(group_id);
|
|
if (it == _storage_groups.end()) {
|
|
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
|
|
}
|
|
auto& sg = it->second;
|
|
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {
|
|
cg->update_id(new_tid);
|
|
tlogger.debug("Adding merging_group: sstables_repaired_at={} old_range={} new_range={} old_tid={} new_tid={} old_group_id={}",
|
|
cg->get_sstables_repaired_at(), cg->token_range(), new_range, cg->group_id(), new_tid, group_id);
|
|
new_sg->add_merging_group(cg);
|
|
});
|
|
// Cannot wait for group to be closed, since it can only return after some long-running operation
|
|
// is done with it, and old erm is still held at this point.
|
|
(void) with_gate(_t.async_gate(), [sg] {
|
|
return sg->close().handle_exception([sg] (std::exception_ptr ex) {
|
|
tlogger.warn("Failed to close storage group: {}. Ignored", std::move(ex));
|
|
});
|
|
});
|
|
}
|
|
new_storage_groups[new_tid] = std::move(new_sg);
|
|
}
|
|
_storage_groups = std::move(new_storage_groups);
|
|
_merge_completion_event.signal();
|
|
}
|
|
|
|
void tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) {
|
|
auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id());
|
|
auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map);
|
|
|
|
size_t old_tablet_count = old_tablet_map->tablet_count();
|
|
size_t new_tablet_count = new_tablet_map->tablet_count();
|
|
if (new_tablet_count > old_tablet_count) {
|
|
tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets",
|
|
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
|
handle_tablet_split_completion(*old_tablet_map, *new_tablet_map);
|
|
} else if (new_tablet_count < old_tablet_count) {
|
|
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
|
|
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
|
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
|
|
}
|
|
|
|
// Allocate storage group if tablet is migrating in, or deallocate if it's migrating out.
|
|
auto this_replica = locator::tablet_replica{
|
|
.host = erm.get_token_metadata().get_my_id(),
|
|
.shard = this_shard_id()
|
|
};
|
|
auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) {
|
|
return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica;
|
|
};
|
|
bool tablet_migrating_in = false;
|
|
for (auto& transition : new_tablet_map->transitions()) {
|
|
auto tid = transition.first;
|
|
auto transition_info = transition.second;
|
|
if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) {
|
|
auto range = new_tablet_map->get_token_range(tid);
|
|
_storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range));
|
|
tablet_migrating_in = true;
|
|
} else if (_storage_groups.contains(tid.value()) && locator::is_post_cleanup(this_replica, new_tablet_map->get_tablet_info(tid), transition_info)) {
|
|
// The storage group should be cleaned up and stopped at this point usually by the tablet cleanup stage,
|
|
// unless the storage group was allocated after tablet cleanup was completed for this node. This could
|
|
// happen if the node was restarted after tablet cleanup was run but before moving to the next stage. To
|
|
// handle this case we stop the storage group here if it's not stopped already.
|
|
auto sg = _storage_groups[tid.value()];
|
|
|
|
remove_storage_group(tid.value());
|
|
|
|
(void) with_gate(_t.async_gate(), [sg] {
|
|
return sg->stop("tablet post-cleanup").then([sg] {});
|
|
});
|
|
}
|
|
}
|
|
|
|
// update the per compaction group tombstone GC enabled flag
|
|
for_each_storage_group([&] (size_t group_id, storage_group& sg) {
|
|
const locator::tablet_id tid = static_cast<locator::tablet_id>(group_id);
|
|
const locator::tablet_info& tinfo = new_tablet_map->get_tablet_info(tid);
|
|
const bool tombstone_gc_enabled = std::ranges::contains(tinfo.replicas, this_replica);
|
|
|
|
sg.for_each_compaction_group([tombstone_gc_enabled] (const compaction_group_ptr& cg_ptr) {
|
|
cg_ptr->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
|
});
|
|
});
|
|
|
|
// TODO: possibly use row_cache::invalidate(external_updater) instead on all ranges of new replicas,
|
|
// as underlying source will be refreshed and external_updater::execute can refresh the sstable set.
|
|
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
|
|
// problem as fresh node won't have any data in new range and migration cleanup invalidates the
|
|
// range being moved away.
|
|
if (tablet_migrating_in || old_tablet_count != new_tablet_count) {
|
|
refresh_mutation_source();
|
|
}
|
|
}
|
|
|
|
// This function is called in the topology::transition_state::tablet_resize_finalization transition which
|
|
// guarantees there is no tablet repair. The cm.stop_and_disable_compaction() ensures no compaction is running.
|
|
future<> table::update_repaired_at_for_merge() {
|
|
if (!uses_tablets()) {
|
|
co_return;
|
|
}
|
|
if (utils::get_local_injector().enter("skip_update_repaired_at_for_merge")) {
|
|
co_return;
|
|
}
|
|
auto sgs = storage_groups();
|
|
for (auto& x : sgs) {
|
|
auto sg = x.second;
|
|
if (sg) {
|
|
auto cgs = sg->compaction_groups();
|
|
for (auto& cg : cgs) {
|
|
auto cre = co_await cg->get_compaction_manager().stop_and_disable_compaction("update_repaired_at_for_merge", cg->view_for_unrepaired_data());
|
|
co_await cg->update_repaired_at_for_merge();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
|
auto old_erm = std::exchange(_erm, std::move(erm));
|
|
|
|
auto refresh_mutation_source = [this] {
|
|
refresh_compound_sstable_set();
|
|
_cache.refresh_snapshot();
|
|
};
|
|
|
|
if (uses_tablets()) {
|
|
_sg_manager->update_effective_replication_map(*_erm, refresh_mutation_source);
|
|
}
|
|
if (old_erm) {
|
|
old_erm->invalidate();
|
|
}
|
|
|
|
recalculate_tablet_count_stats();
|
|
}
|
|
|
|
void table::recalculate_tablet_count_stats() {
|
|
_stats.tablet_count = calculate_tablet_count();
|
|
}
|
|
|
|
int64_t table::calculate_tablet_count() const {
|
|
if (!uses_tablets()) {
|
|
return 0;
|
|
}
|
|
|
|
return _sg_manager->storage_groups().size();
|
|
}
|
|
|
|
partition_presence_checker
|
|
table::make_partition_presence_checker(lw_shared_ptr<const sstables::sstable_set> sstables) {
|
|
auto sel = make_lw_shared<sstables::sstable_set::incremental_selector>(sstables->make_incremental_selector());
|
|
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
|
|
auto& sst = sel->select(key).sstables;
|
|
if (sst.empty()) {
|
|
return partition_presence_checker_result::definitely_doesnt_exist;
|
|
}
|
|
auto hk = sstables::sstable::make_hashed_key(*_schema, key.key());
|
|
for (auto&& s : sst) {
|
|
if (s->filter_has_key(hk)) {
|
|
return partition_presence_checker_result::maybe_exists;
|
|
}
|
|
}
|
|
return partition_presence_checker_result::definitely_doesnt_exist;
|
|
};
|
|
}
|
|
|
|
max_purgeable_fn table::get_max_purgeable_fn_for_cache_underlying_reader() const {
|
|
return [this](const dht::decorated_key& dk, ::is_shadowable is_shadowable) -> max_purgeable {
|
|
auto& sg = storage_group_for_token(dk.token());
|
|
max_purgeable mp;
|
|
|
|
sg.for_each_compaction_group([&dk, is_shadowable, &mp] (const compaction_group_ptr& cg) {
|
|
mp.combine(cg->memtables()->get_max_purgeable(dk, is_shadowable, cg->max_seen_timestamp()));
|
|
});
|
|
|
|
return mp;
|
|
};
|
|
}
|
|
|
|
snapshot_source
|
|
table::sstables_as_snapshot_source() {
|
|
return snapshot_source([this] () {
|
|
auto sst_set = _sstables;
|
|
return mutation_source([this, sst_set] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& r,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
auto reader = make_sstable_reader(std::move(s), std::move(permit), sst_set, r, slice, std::move(trace_state), fwd, fwd_mr);
|
|
return make_compacting_reader(
|
|
std::move(reader),
|
|
gc_clock::now(),
|
|
get_max_purgeable_fn_for_cache_underlying_reader(),
|
|
_compaction_manager.get_tombstone_gc_state().with_commitlog_check_disabled(),
|
|
fwd);
|
|
}, [this, sst_set] {
|
|
return make_partition_presence_checker(sst_set);
|
|
});
|
|
});
|
|
}
|
|
|
|
// define in .cc, since sstable is forward-declared in .hh
|
|
table::~table() {
|
|
}
|
|
|
|
|
|
logalloc::occupancy_stats table::occupancy() const {
|
|
logalloc::occupancy_stats res;
|
|
for_each_compaction_group([&] (const compaction_group& cg) {
|
|
for (auto& m : *const_cast<compaction_group&>(cg).memtables()) {
|
|
res += m->region().occupancy();
|
|
}
|
|
});
|
|
return res;
|
|
}
|
|
|
|
db::replay_position table::highest_flushed_replay_position() const {
|
|
return _highest_flushed_rp;
|
|
}
|
|
|
|
struct manifest_json : public json::json_base {
|
|
json::json_chunked_list<sstring> files;
|
|
|
|
manifest_json() {
|
|
register_params();
|
|
}
|
|
manifest_json(manifest_json&& e) {
|
|
register_params();
|
|
files = std::move(e.files);
|
|
}
|
|
manifest_json& operator=(manifest_json&& e) {
|
|
files = std::move(e.files);
|
|
return *this;
|
|
}
|
|
private:
|
|
void register_params() {
|
|
add(&files, "files");
|
|
}
|
|
};
|
|
|
|
future<>
|
|
table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets) {
|
|
manifest_json manifest;
|
|
for (const auto& fsp : file_sets) {
|
|
for (auto& rf : *fsp) {
|
|
manifest.files.push(std::move(rf));
|
|
}
|
|
}
|
|
auto streamer = json::stream_object(std::move(manifest));
|
|
auto jsonfile = jsondir + "/manifest.json";
|
|
|
|
tlogger.debug("Storing manifest {}", jsonfile);
|
|
|
|
co_await io_check([jsondir] { return recursive_touch_directory(jsondir); });
|
|
auto f = co_await open_checked_file_dma(general_disk_error_handler, jsonfile, open_flags::wo | open_flags::create | open_flags::truncate);
|
|
auto out = co_await make_file_output_stream(std::move(f));
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await streamer(std::move(out));
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
|
|
co_await io_check(sync_directory, std::move(jsondir));
|
|
}
|
|
|
|
future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstring dir) const {
|
|
auto schema_desc = schema()->describe(
|
|
replica::make_schema_describe_helper(table_shards),
|
|
cql3::describe_option::STMTS);
|
|
|
|
auto schema_description = std::move(*schema_desc.create_statement);
|
|
auto schema_file_name = dir + "/schema.cql";
|
|
auto f = co_await open_checked_file_dma(general_disk_error_handler, schema_file_name, open_flags::wo | open_flags::create | open_flags::truncate);
|
|
auto out = co_await make_file_output_stream(std::move(f));
|
|
std::exception_ptr ex;
|
|
|
|
auto view = managed_bytes_view(schema_description.as_managed_bytes());
|
|
|
|
try {
|
|
for (auto&& fragment : fragment_range(view)) {
|
|
auto sv = to_string_view(fragment);
|
|
co_await out.write(sv.data(), sv.size());
|
|
}
|
|
co_await out.flush();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await out.close();
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
// Runs the orchestration code on an arbitrary shard to balance the load.
|
|
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
|
|
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
|
|
if (so == nullptr) {
|
|
throw std::runtime_error("Snapshotting non-local tables is not implemented");
|
|
}
|
|
if (so->dir.empty()) { // virtual tables don't have initialized local storage
|
|
co_return;
|
|
}
|
|
|
|
auto jsondir = (so->dir / sstables::snapshots_dir / name).native();
|
|
auto orchestrator = std::hash<sstring>()(jsondir) % smp::count;
|
|
|
|
co_await smp::submit_to(orchestrator, [&] () -> future<> {
|
|
auto& t = *table_shards;
|
|
auto s = t.schema();
|
|
tlogger.debug("Taking snapshot of {}.{}: directory={}", s->ks_name(), s->cf_name(), jsondir);
|
|
|
|
std::vector<table::snapshot_file_set> file_sets;
|
|
file_sets.reserve(smp::count);
|
|
|
|
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
|
|
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
|
|
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
|
|
return table_shards->take_snapshot(jsondir);
|
|
}));
|
|
});
|
|
co_await io_check(sync_directory, jsondir);
|
|
|
|
co_await t.finalize_snapshot(table_shards, std::move(jsondir), std::move(file_sets));
|
|
});
|
|
}
|
|
|
|
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
|
|
tlogger.trace("take_snapshot {}", jsondir);
|
|
|
|
auto sstable_deletion_guard = co_await get_sstable_list_permit();
|
|
|
|
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
|
|
auto table_names = std::make_unique<std::unordered_set<sstring>>();
|
|
|
|
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
|
|
table_names->insert(sstable->component_basename(sstables::component_type::Data));
|
|
return io_check([sstable, &dir = jsondir] {
|
|
return sstable->snapshot(dir);
|
|
});
|
|
});
|
|
co_return make_foreign(std::move(table_names));
|
|
}
|
|
|
|
future<> table::finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets) {
|
|
std::exception_ptr ex;
|
|
|
|
tlogger.debug("snapshot {}: writing schema.cql", jsondir);
|
|
co_await write_schema_as_cql(table_shards, jsondir).handle_exception([&] (std::exception_ptr ptr) {
|
|
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
|
|
ex = std::move(ptr);
|
|
});
|
|
tlogger.debug("snapshot {}: seal_snapshot", jsondir);
|
|
co_await seal_snapshot(jsondir, std::move(file_sets)).handle_exception([&] (std::exception_ptr ptr) {
|
|
tlogger.error("Failed to seal snapshot in {}: {}.", jsondir, ptr);
|
|
ex = std::move(ptr);
|
|
});
|
|
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<bool> table::snapshot_exists(sstring tag) {
|
|
auto* so = std::get_if<storage_options::local>(&_storage_opts->value);
|
|
if (so == nullptr || so->dir.empty()) {
|
|
co_return false; // Technically it doesn't as snapshots only work for local storage
|
|
}
|
|
|
|
sstring jsondir = (so->dir / sstables::snapshots_dir / tag).native();
|
|
bool exists = false;
|
|
try {
|
|
auto sd = co_await io_check(file_stat, jsondir, follow_symlink::no);
|
|
if (sd.type != directory_entry_type::directory) {
|
|
throw std::error_code(ENOTDIR, std::system_category());
|
|
}
|
|
exists = true;
|
|
} catch (std::system_error& e) {
|
|
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
|
throw;
|
|
}
|
|
}
|
|
co_return exists;
|
|
}
|
|
|
|
future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot_details() {
|
|
return seastar::async([this] {
|
|
std::unordered_map<sstring, snapshot_details> all_snapshots;
|
|
auto* so = std::get_if<storage_options::local>(&_storage_opts->value);
|
|
if (so == nullptr || so->dir.empty()) {
|
|
return all_snapshots;
|
|
}
|
|
|
|
auto datadirs = _sstables_manager.get_local_directories(*so);
|
|
for (auto& datadir : datadirs) {
|
|
fs::path snapshots_dir = datadir / sstables::snapshots_dir;
|
|
auto file_exists = io_check([&snapshots_dir] { return seastar::file_exists(snapshots_dir.native()); }).get();
|
|
if (!file_exists) {
|
|
continue;
|
|
}
|
|
|
|
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
|
while (auto de = lister.get().get()) {
|
|
auto snapshot_name = de->name;
|
|
all_snapshots.emplace(snapshot_name, snapshot_details());
|
|
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
|
|
auto& sd = all_snapshots.at(snapshot_name);
|
|
sd.total += details.total;
|
|
sd.live += details.live;
|
|
}
|
|
}
|
|
return all_snapshots;
|
|
});
|
|
}
|
|
|
|
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
|
|
table::snapshot_details details{};
|
|
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
|
|
if (!co_await file_exists(staging_dir->native())) {
|
|
staging_dir.reset();
|
|
}
|
|
|
|
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
while (auto de = co_await lister.get()) {
|
|
const auto& name = de->name;
|
|
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
|
|
// See https://github.com/scylladb/seastar/pull/3163
|
|
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
|
|
auto size = sd.allocated_size;
|
|
|
|
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
|
|
//
|
|
// All the others should just generate an exception: there is something wrong, so don't blindly
|
|
// add it to the size.
|
|
if (name != "manifest.json" && name != "schema.cql") {
|
|
details.total += size;
|
|
if (sd.number_of_links == 1) {
|
|
// File exists only in the snapshot directory.
|
|
details.live += size;
|
|
continue;
|
|
}
|
|
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
|
|
// So check the datadir for the file too.
|
|
} else {
|
|
continue;
|
|
}
|
|
|
|
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
|
|
try {
|
|
// File exists in the main SSTable directory. Snapshots are not contributing to size
|
|
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
|
|
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
|
|
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
|
|
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
|
|
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
|
|
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
|
|
co_return false;
|
|
}
|
|
co_return true;
|
|
} catch (std::system_error& e) {
|
|
if (e.code() != std::error_code(ENOENT, std::system_category())) {
|
|
throw;
|
|
}
|
|
co_return false;
|
|
}
|
|
};
|
|
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
|
|
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
|
|
!co_await exists_in_dir(datadir / name)) {
|
|
details.live += size;
|
|
}
|
|
}
|
|
|
|
co_return details;
|
|
}
|
|
|
|
future<> compaction_group::flush() noexcept {
|
|
try {
|
|
return _memtables->flush();
|
|
} catch (...) {
|
|
return current_exception_as_future<>();
|
|
}
|
|
}
|
|
|
|
future<> storage_group::flush() noexcept {
|
|
for (auto& cg : compaction_groups()) {
|
|
co_await cg->flush();
|
|
}
|
|
}
|
|
|
|
bool compaction_group::can_flush() const {
|
|
return _memtables->can_flush();
|
|
}
|
|
|
|
lw_shared_ptr<memtable_list>& compaction_group::memtables() noexcept {
|
|
return _memtables;
|
|
}
|
|
|
|
size_t compaction_group::memtable_count() const noexcept {
|
|
return _memtables->size();
|
|
}
|
|
|
|
size_t storage_group::memtable_count() const {
|
|
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
|
|
}
|
|
|
|
future<> table::flush(std::optional<db::replay_position> pos) {
|
|
if (pos && *pos < _flush_rp) {
|
|
co_return;
|
|
}
|
|
// There is nothing to flush if the table was stopped.
|
|
if (_pending_flushes_phaser.is_closed()) {
|
|
co_return;
|
|
}
|
|
auto op = _pending_flushes_phaser.start();
|
|
auto fp = _highest_rp;
|
|
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::flush));
|
|
_flush_rp = std::max(_flush_rp, fp);
|
|
}
|
|
|
|
bool storage_group::can_flush() const {
|
|
return std::ranges::any_of(compaction_groups(), std::mem_fn(&compaction_group::can_flush));
|
|
}
|
|
|
|
bool table::can_flush() const {
|
|
return std::ranges::any_of(storage_groups() | std::views::values, std::mem_fn(&storage_group::can_flush));
|
|
}
|
|
|
|
future<> compaction_group::clear_memtables() {
|
|
if (_t.commitlog()) {
|
|
for (auto& t : *_memtables) {
|
|
_t.commitlog()->discard_completed_segments(_t.schema()->id(), t->get_and_discard_rp_set());
|
|
}
|
|
}
|
|
auto old_memtables = _memtables->clear_and_add();
|
|
for (auto& smt : old_memtables) {
|
|
co_await smt->clear_gently();
|
|
}
|
|
}
|
|
|
|
future<> table::clear() {
|
|
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
|
|
|
|
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::clear_memtables));
|
|
|
|
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
|
}
|
|
|
|
bool storage_group::compaction_disabled() const {
|
|
// Compaction group that has been stopped will be excluded, since the group will not be available for a caller
|
|
// to disable compaction explicitly on it, e.g. on truncate, and the caller might want to perform a check
|
|
// that compaction was disabled on all groups. Stopping a group is equivalent to disabling compaction on it.
|
|
return std::ranges::all_of(compaction_groups()
|
|
| std::views::filter(std::not_fn(&compaction_group::stopped)), [] (const_compaction_group_ptr& cg) {
|
|
return cg->compaction_disabled(); });
|
|
}
|
|
|
|
// NOTE: does not need to be futurized, but might eventually, depending on
|
|
// if we implement notifications, whatnot.
|
|
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
|
|
// truncate_table_on_all_shards() disables compaction for the truncated
|
|
// tables and views, so we normally expect compaction to be disabled on
|
|
// this table. But as shown in issue #17543, it is possible that a new
|
|
// materialized view was created right after truncation started, and it
|
|
// would not have compaction disabled when this function is called on it.
|
|
if (!schema()->is_view()) {
|
|
// Check if the storage groups have compaction disabled, but also check if they have been stopped.
|
|
// This is to avoid races with tablet cleanup which stops the storage group, and then stops the
|
|
// compaction groups. We could have a situation where compaction couldn't have been disabled by
|
|
// truncate because the storage group has been stopped, but the compaction groups have not yet been stopped.
|
|
auto compaction_disabled = std::ranges::all_of(storage_groups() | std::views::values, [] (const storage_group_ptr& sgp) {
|
|
return sgp->async_gate().is_closed() || sgp->compaction_disabled();
|
|
});
|
|
if (!compaction_disabled) {
|
|
utils::on_internal_error(fmt::format("compaction not disabled on table {}.{} during TRUNCATE",
|
|
schema()->ks_name(), schema()->cf_name()));
|
|
}
|
|
}
|
|
|
|
db::replay_position rp;
|
|
struct removed_sstable {
|
|
compaction_group& cg;
|
|
sstables::shared_sstable sst;
|
|
replica::enable_backlog_tracker enable_backlog_tracker;
|
|
};
|
|
std::vector<removed_sstable> remove;
|
|
|
|
_stats.pending_sstable_deletions++;
|
|
auto undo_stats = defer([this] {
|
|
_stats.pending_sstable_deletions--;
|
|
});
|
|
|
|
auto permit = co_await get_sstable_list_permit();
|
|
|
|
co_await _cache.invalidate(row_cache::external_updater([this, &rp, &remove, truncated_at] {
|
|
// FIXME: the following isn't exception safe.
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
|
|
auto pruned = make_lw_shared<sstables::sstable_set>(cg.make_main_sstable_set());
|
|
auto maintenance_pruned = cg.make_maintenance_sstable_set();
|
|
|
|
auto prune = [&] (lw_shared_ptr<sstables::sstable_set>& pruned,
|
|
const lw_shared_ptr<sstables::sstable_set>& pruning,
|
|
replica::enable_backlog_tracker enable_backlog_tracker) mutable {
|
|
pruning->for_each_sstable([&] (const sstables::shared_sstable& p) mutable {
|
|
if (p->max_data_age() <= truncated_at) {
|
|
if (p->originated_on_this_node().value_or(false) && p->get_stats_metadata().position.shard_id() == this_shard_id()) {
|
|
rp = std::max(p->get_stats_metadata().position, rp);
|
|
}
|
|
remove.emplace_back(removed_sstable{cg, p, enable_backlog_tracker});
|
|
return;
|
|
}
|
|
pruned->insert(p);
|
|
});
|
|
};
|
|
prune(pruned, cg.main_sstables(), enable_backlog_tracker::yes);
|
|
prune(maintenance_pruned, cg.maintenance_sstables(), enable_backlog_tracker::no);
|
|
|
|
cg.set_main_sstables(std::move(pruned));
|
|
cg.set_maintenance_sstables(std::move(maintenance_pruned));
|
|
});
|
|
refresh_compound_sstable_set();
|
|
tlogger.debug("cleaning out row cache");
|
|
}));
|
|
rebuild_statistics();
|
|
|
|
std::vector<sstables::shared_sstable> del;
|
|
del.reserve(remove.size());
|
|
for (auto& r : remove) {
|
|
if (r.enable_backlog_tracker) {
|
|
remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst);
|
|
}
|
|
erase_sstable_cleanup_state(r.sst);
|
|
del.emplace_back(r.sst);
|
|
};
|
|
co_await delete_sstables_atomically(permit, std::move(del));
|
|
co_return rp;
|
|
}
|
|
|
|
void table::mark_ready_for_writes(db::commitlog* cl) {
|
|
if (!_readonly) {
|
|
on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
if (_config.enable_commitlog) {
|
|
_commitlog = cl;
|
|
}
|
|
_readonly = false;
|
|
}
|
|
|
|
db::commitlog* table::commitlog() const {
|
|
if (_readonly) [[unlikely]] {
|
|
on_internal_error(dblog, ::format("table {}.{} is readonly", _schema->ks_name(), _schema->cf_name()));
|
|
}
|
|
return _commitlog;
|
|
}
|
|
|
|
void table::set_schema(schema_ptr s) {
|
|
SCYLLA_ASSERT(s->is_counter() == _schema->is_counter());
|
|
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",
|
|
_schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version());
|
|
|
|
_flush_timer.cancel();
|
|
|
|
for_each_compaction_group([&] (compaction_group& cg) {
|
|
for (auto& m: *cg.memtables()) {
|
|
m->set_schema(s);
|
|
}
|
|
});
|
|
|
|
_cache.set_schema(s);
|
|
if (_counter_cell_locks) {
|
|
_counter_cell_locks->set_schema(s);
|
|
}
|
|
_schema = std::move(s);
|
|
|
|
for (auto&& v : _views) {
|
|
v->view_info()->reset_view_info();
|
|
if (auto reverse_schema = local_schema_registry().get_or_null(reversed(v->version()))) {
|
|
reverse_schema->view_info()->reset_view_info();
|
|
}
|
|
}
|
|
|
|
set_compaction_strategy(_schema->compaction_strategy());
|
|
trigger_compaction();
|
|
|
|
if (_schema->memtable_flush_period() > 0) {
|
|
_flush_timer.rearm(timer<lowres_clock>::clock::now() + std::chrono::milliseconds(_schema->memtable_flush_period()));
|
|
}
|
|
}
|
|
|
|
static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, const view_ptr& v) {
|
|
return std::find_if(views.begin(), views.end(), [&v] (auto&& e) {
|
|
return e->id() == v->id();
|
|
});
|
|
}
|
|
|
|
void table::add_or_update_view(view_ptr v) {
|
|
auto existing = find_view(_views, v);
|
|
if (existing != _views.end()) {
|
|
*existing = std::move(v);
|
|
} else {
|
|
_views.push_back(std::move(v));
|
|
}
|
|
}
|
|
|
|
void table::remove_view(view_ptr v) {
|
|
auto existing = find_view(_views, v);
|
|
if (existing != _views.end()) {
|
|
_views.erase(existing);
|
|
}
|
|
}
|
|
|
|
void table::clear_views() {
|
|
_views.clear();
|
|
}
|
|
|
|
const std::vector<view_ptr>& table::views() const {
|
|
return _views;
|
|
}
|
|
|
|
std::vector<view_ptr> table::affected_views(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base, const mutation& update) const {
|
|
//FIXME: Avoid allocating a vector here; consider returning the boost iterator.
|
|
return std::ranges::to<std::vector<view_ptr>>(_views | std::views::filter([&] (auto&& view) {
|
|
return db::view::partition_key_matches(gen->get_db().as_data_dictionary(), *base, *view->view_info(), update.decorated_key());
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Shard-local locking of clustering rows or entire partitions of the base
|
|
* table during a Materialized-View read-modify-update:
|
|
*
|
|
* Consider that two concurrent base-table updates set column C, a column
|
|
* added to a view's primary key, to two different values - V1 and V2.
|
|
* Say that that before the updates, C's value was V0. Both updates may remove
|
|
* from the view the old row with V0, one will add a view row with V1 and the
|
|
* second will add a view row with V2, and we end up with two rows, with the
|
|
* two different values, instead of just one row with the last value.
|
|
*
|
|
* The solution is to lock the base row which we read to ensure atomic read-
|
|
* modify-write to the view table: Under one locked section, the row with V0
|
|
* is deleted and a new one with V1 is created, and then under a second locked
|
|
* section the row with V1 is deleted and a new one with V2 is created.
|
|
* Note that the lock is node-local (and in fact shard-local) and the locked
|
|
* section doesn't include the view table modifications - it includes just the
|
|
* read and the creation of the update commands - commands which will
|
|
* eventually be sent to the view replicas.
|
|
*
|
|
* We need to lock a base-table row even if an update does not modify the
|
|
* view's new key column C: Consider an update that only updates a non-key
|
|
* column (but also in the view) D. We still need to read the current base row
|
|
* to retrieve the view row's current key (column C), and then write the
|
|
* modification to *that* view row. Having several such modifications in
|
|
* parallel is fine. What is not fine is to have in parallel a modification
|
|
* of the value of C. So basically we need a reader-writer lock (a.k.a.
|
|
* shared-exclusive lock) on base rows:
|
|
* 1. Updates which do not modify the view's key column take a reader lock
|
|
* on the base row.
|
|
* 2. Updates which do modify the view's key column take a writer lock.
|
|
*
|
|
* Further complicating matters is that some operations involve multiple
|
|
* base rows - such as a deletion of an entire partition or a range of rows.
|
|
* In that case, we should lock the entire partition, and forbid parallel
|
|
* work on the same partition or one of its rows. We can do this with a
|
|
* read-writer lock on base partitions:
|
|
* 1. Before we lock a row (as described above), we lock its partition key
|
|
* with the reader lock.
|
|
* 2. When an operation involves an entire partition (or range of rows),
|
|
* we lock the partition key with a writer lock.
|
|
*
|
|
* If an operation involves only a range of rows, not an entire partition,
|
|
* we could in theory lock only this range and not an entire partition.
|
|
* However, we expect this case to be rare enough to not care about and we
|
|
* currently just lock the entire partition.
|
|
*
|
|
* If a base table has *multiple* views, we still read the base table row
|
|
* only once, and have to keep a lock around this read and all the view
|
|
* updates generation. This lock needs to be the strictest of the above -
|
|
* i.e., if a column is modified which is not part of one view's key but is
|
|
* part of a second view's key - we should lock the base row with the
|
|
* stricter writer lock, not a reader lock.
|
|
*/
|
|
future<row_locker::lock_holder>
|
|
table::local_base_lock(
|
|
const schema_ptr& s,
|
|
const dht::decorated_key& pk,
|
|
const query::clustering_row_ranges& rows,
|
|
db::timeout_clock::time_point timeout) const {
|
|
// FIXME: Optimization:
|
|
// Below we always pass "true" to the lock functions and take an exclusive
|
|
// lock on the affected row or partition. But as explained above, if all
|
|
// the modified columns are not key columns in *any* of the views, and
|
|
// shared lock is enough. We should test for this case and pass false.
|
|
// This will allow more parallelism in concurrent modifications to the
|
|
// same row - probably not a very urgent case.
|
|
_row_locker.upgrade(s);
|
|
if (rows.size() == 1 && rows[0].is_singular() && rows[0].start() && !rows[0].start()->value().is_empty(*s)) {
|
|
// A single clustering row is involved.
|
|
return _row_locker.lock_ck(pk, rows[0].start()->value(), true, timeout, _row_locker_stats);
|
|
} else {
|
|
// More than a single clustering row is involved. Most commonly it's
|
|
// the entire partition, so let's lock the entire partition. We could
|
|
// lock less than the entire partition in more elaborate cases where
|
|
// just a few individual rows are involved, or row ranges, but we
|
|
// don't think this will make a practical difference.
|
|
return _row_locker.lock_pk(pk, true, timeout, _row_locker_stats);
|
|
}
|
|
}
|
|
|
|
const ssize_t new_reader_base_cost{16 * 1024};
|
|
|
|
size_t table::estimate_read_memory_cost() const {
|
|
return new_reader_base_cost;
|
|
}
|
|
|
|
void table::set_hit_rate(locator::host_id addr, cache_temperature rate) {
|
|
auto& e = _cluster_cache_hit_rates[addr];
|
|
e.rate = rate;
|
|
e.last_updated = lowres_clock::now();
|
|
}
|
|
|
|
table::cache_hit_rate table::get_my_hit_rate() const {
|
|
return cache_hit_rate { _global_cache_hit_rate, lowres_clock::now()};
|
|
}
|
|
|
|
table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, locator::host_id addr) {
|
|
if (gossiper.my_host_id() == addr) {
|
|
return get_my_hit_rate();
|
|
}
|
|
auto it = _cluster_cache_hit_rates.find(addr);
|
|
if (it == _cluster_cache_hit_rates.end()) {
|
|
// no data yet, get it from the gossiper
|
|
auto eps = gossiper.get_endpoint_state_ptr(addr);
|
|
if (eps) {
|
|
auto* state = eps->get_application_state_ptr(gms::application_state::CACHE_HITRATES);
|
|
float f = -1.0f; // missing state means old node
|
|
if (state) {
|
|
const auto me = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
|
const auto& value = state->value();
|
|
const auto i = value.find(me);
|
|
if (i != sstring::npos) {
|
|
f = strtof(&value[i + me.size() + 1], nullptr);
|
|
} else {
|
|
f = 0.0f; // empty state means that node has rebooted
|
|
}
|
|
set_hit_rate(addr, cache_temperature(f));
|
|
return cache_hit_rate{cache_temperature(f), lowres_clock::now()};
|
|
}
|
|
}
|
|
return cache_hit_rate {cache_temperature(0.0f), lowres_clock::now()};
|
|
} else {
|
|
return it->second;
|
|
}
|
|
}
|
|
|
|
void table::drop_hit_rate(locator::host_id addr) {
|
|
_cluster_cache_hit_rates.erase(addr);
|
|
}
|
|
|
|
void
|
|
table::check_valid_rp(const db::replay_position& rp) const {
|
|
if (rp != db::replay_position() && rp < _lowest_allowed_rp) {
|
|
throw mutation_reordered_with_truncate_exception();
|
|
}
|
|
}
|
|
|
|
db::replay_position table::set_low_replay_position_mark() {
|
|
_lowest_allowed_rp = _highest_rp;
|
|
return _lowest_allowed_rp;
|
|
}
|
|
|
|
template<typename... Args>
|
|
void table::do_apply(compaction_group& cg, db::rp_handle&& h, Args&&... args) {
|
|
utils::latency_counter lc;
|
|
_stats.writes.set_latency(lc);
|
|
db::replay_position rp = h;
|
|
check_valid_rp(rp);
|
|
try {
|
|
cg.memtables()->active_memtable().apply(std::forward<Args>(args)..., std::move(h));
|
|
_highest_rp = std::max(_highest_rp, rp);
|
|
} catch (...) {
|
|
_failed_counter_applies_to_memtable++;
|
|
throw;
|
|
}
|
|
_stats.writes.mark(lc);
|
|
}
|
|
|
|
future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
|
|
if (_virtual_writer) [[unlikely]] {
|
|
return (*_virtual_writer)(freeze(m));
|
|
}
|
|
|
|
auto& cg = compaction_group_for_token(m.token());
|
|
auto holder = cg.async_gate().hold();
|
|
return dirty_memory_region_group().run_when_memory_available([this, &m, h = std::move(h), &cg, holder = std::move(holder)] () mutable {
|
|
do_apply(cg, std::move(h), m);
|
|
}, timeout);
|
|
}
|
|
|
|
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&);
|
|
|
|
future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
|
|
if (_virtual_writer) [[unlikely]] {
|
|
return (*_virtual_writer)(m);
|
|
}
|
|
|
|
auto& cg = compaction_group_for_key(m.key(), m_schema);
|
|
auto holder = cg.async_gate().hold();
|
|
|
|
return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable {
|
|
do_apply(cg, std::move(h), m, m_schema);
|
|
}, timeout);
|
|
}
|
|
|
|
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&);
|
|
|
|
future<>
|
|
write_memtable_to_sstable(mutation_reader reader,
|
|
memtable& mt, sstables::shared_sstable sst,
|
|
size_t estimated_partitions,
|
|
sstables::write_monitor& monitor,
|
|
sstables::sstable_writer_config& cfg) {
|
|
cfg.replay_position = mt.replay_position();
|
|
cfg.monitor = &monitor;
|
|
cfg.origin = "memtable";
|
|
schema_ptr s = reader.schema();
|
|
return sst->write_components(std::move(reader), estimated_partitions, s, cfg, mt.get_encoding_stats());
|
|
}
|
|
|
|
future<>
|
|
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) {
|
|
auto cfg = sst->manager().configure_writer("memtable");
|
|
auto monitor = replica::permit_monitor(make_lw_shared(sstable_write_permit::unconditional()));
|
|
auto semaphore = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable",
|
|
reader_concurrency_semaphore::register_metrics::no);
|
|
std::exception_ptr ex;
|
|
|
|
try {
|
|
auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {});
|
|
auto reader = mt.make_flush_reader(mt.schema(), std::move(permit));
|
|
co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await semaphore.stop();
|
|
if (ex) {
|
|
std::rethrow_exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<lw_shared_ptr<query::result>>
|
|
table::query(schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
query::result_options opts,
|
|
const dht::partition_range_vector& partition_ranges,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_limiter& memory_limiter,
|
|
db::timeout_clock::time_point timeout,
|
|
std::optional<querier>* saved_querier) {
|
|
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
|
|
co_return make_lw_shared<query::result>();
|
|
}
|
|
|
|
const auto table_async_gate_holder = _async_gate.hold();
|
|
utils::latency_counter lc;
|
|
_stats.reads.set_latency(lc);
|
|
|
|
auto finally = defer([&] () noexcept {
|
|
_stats.reads.mark(lc);
|
|
});
|
|
|
|
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
|
|
auto accounter = co_await (opts.request == query::result_request::only_digest
|
|
? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed)
|
|
: memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed));
|
|
|
|
query_state qs(query_schema, cmd, opts, partition_ranges, std::move(accounter));
|
|
|
|
std::optional<querier> querier_opt;
|
|
if (saved_querier) {
|
|
querier_opt = std::move(*saved_querier);
|
|
}
|
|
|
|
co_await utils::get_local_injector().inject("replica_query_wait", [&] (auto& handler) -> future<> {
|
|
auto table_name = handler.template get<std::string_view>("table");
|
|
if (table_name && *table_name == _schema->cf_name()) {
|
|
tlogger.info("replica_query_wait: waiting");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes(5));
|
|
}
|
|
});
|
|
|
|
while (!qs.done()) {
|
|
auto&& range = *qs.current_partition_range++;
|
|
|
|
if (!querier_opt) {
|
|
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
|
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf);
|
|
}
|
|
auto& q = *querier_opt;
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await q.consume_page(query_result_builder(*query_schema, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, trace_state);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
if (ex || !qs.done()) {
|
|
co_await q.close();
|
|
querier_opt = {};
|
|
}
|
|
if (ex) {
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
std::optional<full_position> last_pos;
|
|
if (querier_opt && querier_opt->current_position()) {
|
|
last_pos.emplace(*querier_opt->current_position());
|
|
}
|
|
|
|
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
|
|
co_await querier_opt->close();
|
|
querier_opt = {};
|
|
}
|
|
if (saved_querier) {
|
|
*saved_querier = std::move(querier_opt);
|
|
}
|
|
|
|
co_return make_lw_shared<query::result>(qs.builder.build(std::move(last_pos)));
|
|
}
|
|
|
|
future<reconcilable_result>
|
|
table::mutation_query(schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
const dht::partition_range& range,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_accounter accounter,
|
|
db::timeout_clock::time_point timeout,
|
|
bool tombstone_gc_enabled,
|
|
std::optional<querier>* saved_querier) {
|
|
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
|
|
co_return reconcilable_result();
|
|
}
|
|
|
|
const auto table_async_gate_holder = _async_gate.hold();
|
|
|
|
std::optional<querier> querier_opt;
|
|
if (saved_querier) {
|
|
querier_opt = std::move(*saved_querier);
|
|
}
|
|
if (!querier_opt) {
|
|
auto tombstone_gc_state = tombstone_gc_enabled ? get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
|
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
|
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, tombstone_gc_state, conf);
|
|
}
|
|
auto& q = *querier_opt;
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
auto rrb = reconcilable_result_builder(*query_schema, cmd.slice, std::move(accounter));
|
|
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, trace_state);
|
|
|
|
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {
|
|
co_await q.close();
|
|
querier_opt = {};
|
|
}
|
|
if (saved_querier) {
|
|
*saved_querier = std::move(querier_opt);
|
|
}
|
|
|
|
co_return r;
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await q.close();
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
|
|
mutation_source
|
|
table::as_mutation_source() const {
|
|
return mutation_source([this] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return this->make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}
|
|
|
|
void table::add_coordinator_read_latency(utils::estimated_histogram::duration latency) {
|
|
_stats.estimated_coordinator_read.add(std::chrono::duration_cast<std::chrono::microseconds>(latency).count());
|
|
}
|
|
|
|
std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double percentile) {
|
|
if (_cached_percentile != percentile || lowres_clock::now() - _percentile_cache_timestamp > 1s) {
|
|
_percentile_cache_timestamp = lowres_clock::now();
|
|
_cached_percentile = percentile;
|
|
_percentile_cache_value = std::max(_stats.estimated_coordinator_read.percentile(percentile) / 1000, int64_t(1)) * 1ms;
|
|
_stats.estimated_coordinator_read *= 0.9; // decay values a little to give new data points more weight
|
|
}
|
|
return _percentile_cache_value;
|
|
}
|
|
|
|
void
|
|
table::enable_auto_compaction() {
|
|
// FIXME: unmute backlog. turn table backlog back on.
|
|
// see table::disable_auto_compaction() notes.
|
|
_compaction_disabled_by_user = false;
|
|
trigger_compaction();
|
|
}
|
|
|
|
future<>
|
|
table::disable_auto_compaction() {
|
|
// FIXME: mute backlog. When we disable background compactions
|
|
// for the table, we must also disable current backlog of the
|
|
// table compaction strategy that contributes to the scheduling
|
|
// group resources prioritization.
|
|
//
|
|
// There are 2 possibilities possible:
|
|
// - there are no ongoing background compaction, and we can freely
|
|
// mute table backlog.
|
|
// - there are compactions happening. than we must decide either
|
|
// we want to allow them to finish not allowing submitting new
|
|
// compactions tasks, or we may "suspend" them until the bg
|
|
// compactions will be enabled back. This is not a worst option
|
|
// because it will allow bg compactions to finish if there are
|
|
// unused resourced, it will not lose any writers/readers stats.
|
|
//
|
|
// Besides that:
|
|
// - there are major compactions that additionally uses constant
|
|
// size backlog of shares,
|
|
// - sstables rewrites tasks that do the same.
|
|
//
|
|
// Setting NullCompactionStrategy is not an option due to the
|
|
// following reasons:
|
|
// - it will 0 backlog if suspending current compactions is not an
|
|
// option
|
|
// - it will break computation of major compaction descriptor
|
|
// for new submissions
|
|
_compaction_disabled_by_user = true;
|
|
return with_gate(_async_gate, [this] {
|
|
return parallel_foreach_compaction_group_view([this] (compaction::compaction_group_view& view) {
|
|
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &view, compaction::compaction_type::Compaction);
|
|
});
|
|
});
|
|
}
|
|
|
|
void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
|
|
_tombstone_gc_enabled = tombstone_gc_enabled;
|
|
tlogger.info0("Tombstone GC was {} for {}.{}", tombstone_gc_enabled ? "enabled" : "disabled", _schema->ks_name(), _schema->cf_name());
|
|
if (_tombstone_gc_enabled) {
|
|
trigger_compaction();
|
|
}
|
|
}
|
|
|
|
mutation_reader
|
|
table::make_mutation_reader_excluding_staging(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const {
|
|
std::vector<mutation_reader> readers;
|
|
|
|
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
|
|
readers.reserve(memtable_count + 1);
|
|
});
|
|
|
|
static const sstables::sstable_predicate excl_staging_predicate = [] (const sstables::sstable& sst) {
|
|
return !sst.requires_view_building();
|
|
};
|
|
|
|
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate));
|
|
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
}
|
|
|
|
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
|
auto permit = co_await get_sstable_list_permit();
|
|
sstables::delayed_commit_changes delay_commit;
|
|
std::unordered_set<compaction_group*> compaction_groups_to_notify;
|
|
for (auto sst : sstables) {
|
|
try {
|
|
// Off-strategy can happen in parallel to view building, so the SSTable may be deleted already if the former
|
|
// completed first.
|
|
// The sstable list permit prevents list update on off-strategy completion and move_sstables_from_staging()
|
|
// from stepping on each other's toe.
|
|
co_await sst->change_state(sstables::sstable_state::normal, &delay_commit);
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
if (get_compaction_manager().requires_cleanup(cg.view_for_sstable(sst), sst)) {
|
|
compaction_groups_to_notify.insert(&cg);
|
|
}
|
|
// If view building finished faster, SSTable with repair origin still exists.
|
|
// It can also happen the SSTable is not going through reshape, so it doesn't have a repair origin.
|
|
// That being said, we'll only add this SSTable to tracker if its origin is other than repair.
|
|
// Otherwise, we can count on off-strategy completion to add it when updating lists.
|
|
if (sst->get_origin() != sstables::repair_origin) {
|
|
add_sstable_to_backlog_tracker(cg.get_backlog_tracker(), sst);
|
|
}
|
|
} catch (...) {
|
|
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
co_await delay_commit.commit();
|
|
|
|
for (auto* cg : compaction_groups_to_notify) {
|
|
cg->get_staging_done_condition().broadcast();
|
|
}
|
|
|
|
// Off-strategy timer will be rearmed, so if there's more incoming data through repair / streaming,
|
|
// the timer can be updated once again. In practice, it allows off-strategy compaction to kick off
|
|
// at the end of the node operation on behalf of this table, which brings more efficiency in terms
|
|
// of write amplification.
|
|
do_update_off_strategy_trigger();
|
|
}
|
|
|
|
/**
|
|
* Given an update for the base table, calculates the set of potentially affected views,
|
|
* generates the relevant updates, and sends them to the paired view replicas.
|
|
*/
|
|
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, const frozen_mutation& fm,
|
|
db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
|
|
//FIXME: Avoid unfreezing here.
|
|
auto m = fm.unfreeze(s);
|
|
return push_view_replica_updates(std::move(gen), s, std::move(m), timeout, std::move(tr_state), sem);
|
|
}
|
|
|
|
future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, query::partition_slice::option_set custom_opts) const {
|
|
schema_ptr base = schema();
|
|
m.upgrade(base);
|
|
gc_clock::time_point now = gc_clock::now();
|
|
utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] {
|
|
now -= 10s;
|
|
});
|
|
|
|
if (!db::view::should_generate_view_updates_on_this_shard(base, get_effective_replication_map(), m.token())) {
|
|
// This could happen if we are a pending replica.
|
|
// A pending replica may have incomplete data, and building view updates could result
|
|
// in wrong updates. Therefore we don't send updates from a pending replica. Instead, the
|
|
// base replicas send the updates to the view replicas, including pending replicas.
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
|
|
auto views = affected_views(gen, base, m);
|
|
if (views.empty()) {
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(gen->get_db().as_data_dictionary(), *base, m.decorated_key(), m.partition(), views);
|
|
const bool need_regular = !cr_ranges.empty();
|
|
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
|
if (!need_regular && !need_static) {
|
|
tracing::trace(tr_state, "View updates do not require read-before-write");
|
|
co_await gen->generate_and_propagate_view_updates(*this, base, sem.make_tracking_only_permit(s, "push-view-updates-no-read-before-write", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now, timeout);
|
|
// In this case we are not doing a read-before-write, just a
|
|
// write, so no lock is needed.
|
|
co_return row_locker::lock_holder();
|
|
}
|
|
// We read whole sets of regular and/or static columns in case the update now causes a base row to pass
|
|
// a view's filters, and a view happens to include columns that have no value in this update.
|
|
// Also, one of those columns can determine the lifetime of the base row, if it has a TTL.
|
|
query::column_id_vector static_columns;
|
|
query::column_id_vector regular_columns;
|
|
if (need_regular) {
|
|
std::ranges::copy(base->regular_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(regular_columns));
|
|
}
|
|
if (need_static) {
|
|
std::ranges::copy(base->static_columns() | std::views::transform(std::mem_fn(&column_definition::id)), std::back_inserter(static_columns));
|
|
}
|
|
query::partition_slice::option_set opts;
|
|
opts.set(query::partition_slice::option::send_partition_key);
|
|
opts.set_if<query::partition_slice::option::send_clustering_key>(need_regular);
|
|
opts.set_if<query::partition_slice::option::distinct>(need_static && !need_regular);
|
|
opts.set_if<query::partition_slice::option::always_return_static_content>(need_static);
|
|
opts.set(query::partition_slice::option::send_timestamp);
|
|
opts.set(query::partition_slice::option::send_ttl);
|
|
opts.add(custom_opts);
|
|
auto slice = query::partition_slice(
|
|
std::move(cr_ranges), std::move(static_columns), std::move(regular_columns), std::move(opts), { }, query::max_rows);
|
|
// Take the shard-local lock on the base-table row or partition as needed.
|
|
// We'll return this lock to the caller, which will release it after
|
|
// writing the base-table update.
|
|
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
|
|
co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout);
|
|
auto lock = co_await std::move(lockf);
|
|
auto pk = dht::partition_range::make_singular(m.decorated_key());
|
|
auto permit = co_await sem.obtain_permit(base, "push-view-updates-read-before-write", estimate_read_memory_cost(), timeout, tr_state);
|
|
auto reader = source.make_mutation_reader(base, permit, pk, slice, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
co_await gen->generate_and_propagate_view_updates(*this, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now, timeout);
|
|
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
|
// return the local partition/row lock we have taken so it
|
|
// remains locked until the caller is done modifying this
|
|
// partition/row and destroys the lock object.
|
|
co_return std::move(lock);
|
|
|
|
}
|
|
|
|
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
|
|
return do_push_view_replica_updates(std::move(gen), s, std::move(m), timeout, as_mutation_source(),
|
|
std::move(tr_state), sem, {});
|
|
}
|
|
|
|
future<row_locker::lock_holder>
|
|
table::stream_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
std::vector<sstables::shared_sstable>& excluded_sstables) const {
|
|
return do_push_view_replica_updates(
|
|
std::move(gen),
|
|
s,
|
|
std::move(m),
|
|
timeout,
|
|
as_mutation_source_excluding_staging(),
|
|
tracing::trace_state_ptr(),
|
|
*_config.streaming_read_concurrency_semaphore,
|
|
query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
|
|
}
|
|
|
|
mutation_source
|
|
table::as_mutation_source_excluding_staging() const {
|
|
return mutation_source([this] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return this->make_mutation_reader_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}
|
|
|
|
std::vector<mutation_source> table::select_memtables_as_mutation_sources(dht::token token) const {
|
|
auto& sg = storage_group_for_token(token);
|
|
std::vector<mutation_source> mss;
|
|
mss.reserve(sg.memtable_count());
|
|
for (auto& cg : sg.compaction_groups()) {
|
|
for (auto& mt : *cg->memtables()) {
|
|
mss.emplace_back(mt->as_data_source());
|
|
}
|
|
}
|
|
return mss;
|
|
}
|
|
|
|
compaction::compaction_backlog_tracker& compaction_group::get_backlog_tracker() {
|
|
return *_backlog_tracker;
|
|
}
|
|
|
|
void compaction_group::register_backlog_tracker(compaction::compaction_backlog_tracker new_backlog_tracker) {
|
|
_backlog_tracker.emplace(std::move(new_backlog_tracker));
|
|
get_compaction_manager().register_backlog_tracker(*_backlog_tracker);
|
|
}
|
|
|
|
compaction::compaction_manager& compaction_group::get_compaction_manager() noexcept {
|
|
return _t.get_compaction_manager();
|
|
}
|
|
|
|
const compaction::compaction_manager& compaction_group::get_compaction_manager() const noexcept {
|
|
return _t.get_compaction_manager();
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::as_view_for_static_sharding() const {
|
|
return view_for_unrepaired_data();
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::view_for_unrepaired_data() const {
|
|
return *_unrepaired_view;
|
|
}
|
|
|
|
compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const {
|
|
switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) {
|
|
case repair_sstable_classification::unrepaired: return *_unrepaired_view;
|
|
case repair_sstable_classification::repairing: return *_repairing_view;
|
|
case repair_sstable_classification::repaired: return *_repaired_view;
|
|
}
|
|
std::unreachable();
|
|
}
|
|
|
|
utils::small_vector<compaction::compaction_group_view*, 3> compaction_group::all_views() const {
|
|
utils::small_vector<compaction::compaction_group_view*, 3> ret;
|
|
ret.push_back(_unrepaired_view.get());
|
|
ret.push_back(_repairing_view.get());
|
|
ret.push_back(_repaired_view.get());
|
|
return ret;
|
|
}
|
|
|
|
compaction_group* table::try_get_compaction_group_with_static_sharding() const {
|
|
if (!uses_static_sharding()) {
|
|
return nullptr;
|
|
}
|
|
return get_compaction_group(0);
|
|
}
|
|
|
|
compaction::compaction_group_view& table::try_get_compaction_group_view_with_static_sharding() const {
|
|
auto* cg = try_get_compaction_group_with_static_sharding();
|
|
if (!cg) {
|
|
throw std::runtime_error("Getting table state is allowed only with static sharding");
|
|
}
|
|
return cg->as_view_for_static_sharding();
|
|
}
|
|
|
|
future<> table::parallel_foreach_compaction_group_view(std::function<future<>(compaction::compaction_group_view&)> action) {
|
|
return parallel_foreach_compaction_group([action = std::move(action)] (compaction_group& cg) -> future<> {
|
|
for (auto view : cg.all_views()) {
|
|
co_await action(*view);
|
|
}
|
|
});
|
|
}
|
|
|
|
compaction::compaction_group_view& table::compaction_group_view_for_sstable(const sstables::shared_sstable& sst) const {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return cg.view_for_sstable(sst);
|
|
}
|
|
|
|
data_dictionary::table
|
|
table::as_data_dictionary() const {
|
|
static constinit data_dictionary_impl _impl;
|
|
return _impl.wrap(*this);
|
|
}
|
|
|
|
bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return get_compaction_manager().erase_sstable_cleanup_state(cg.as_view_for_static_sharding(), sst);
|
|
}
|
|
|
|
bool table::requires_cleanup(const sstables::shared_sstable& sst) const {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return get_compaction_manager().requires_cleanup(cg.as_view_for_static_sharding(), sst);
|
|
}
|
|
|
|
bool table::requires_cleanup(const sstables::sstable_set& set) const {
|
|
return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) {
|
|
auto& cg = compaction_group_for_sstable(sst);
|
|
return stop_iteration(_compaction_manager.requires_cleanup(cg.as_view_for_static_sharding(), sst));
|
|
}));
|
|
}
|
|
|
|
future<> compaction_group::cleanup() {
|
|
class compaction_group_cleaner : public row_cache::external_updater_impl {
|
|
table& _t;
|
|
compaction_group& _cg;
|
|
const lw_shared_ptr<sstables::sstable_set> _empty_main_set;
|
|
const lw_shared_ptr<sstables::sstable_set> _empty_maintenance_set;
|
|
private:
|
|
lw_shared_ptr<sstables::sstable_set> empty_sstable_set() const {
|
|
return make_lw_shared<sstables::sstable_set>(_cg.make_main_sstable_set());
|
|
}
|
|
public:
|
|
explicit compaction_group_cleaner(compaction_group& cg)
|
|
: _t(cg._t)
|
|
, _cg(cg)
|
|
, _empty_main_set(empty_sstable_set())
|
|
, _empty_maintenance_set(empty_sstable_set())
|
|
{
|
|
}
|
|
virtual future<> prepare() override {
|
|
// Capture SSTables after flush, and with compaction disabled, to avoid missing any.
|
|
auto set = _cg.make_sstable_set();
|
|
auto& sstables_compacted_but_not_deleted = _cg._sstables_compacted_but_not_deleted;
|
|
sstables_compacted_but_not_deleted.reserve(set->size() + sstables_compacted_but_not_deleted.size());
|
|
set->for_each_sstable([&] (const sstables::shared_sstable& sst) mutable {
|
|
sstables_compacted_but_not_deleted.push_back(sst);
|
|
});
|
|
if (utils::get_local_injector().enter("tablet_cleanup_failure")) {
|
|
co_await sleep(std::chrono::seconds(1));
|
|
tlogger.info("Cleanup failed for tablet {}", _cg.group_id());
|
|
throw std::runtime_error("tablet cleanup failure");
|
|
}
|
|
}
|
|
|
|
virtual void execute() override {
|
|
_t.subtract_compaction_group_from_stats(_cg);
|
|
_cg.set_main_sstables(std::move(_empty_main_set));
|
|
_cg.set_maintenance_sstables(std::move(_empty_maintenance_set));
|
|
_t.refresh_compound_sstable_set();
|
|
}
|
|
};
|
|
|
|
auto permit = co_await _t.get_sstable_list_permit();
|
|
auto updater = row_cache::external_updater(std::make_unique<compaction_group_cleaner>(*this));
|
|
|
|
auto p_range = to_partition_range(token_range());
|
|
tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.",
|
|
p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name());
|
|
// Since permit is still held, all actions below will be executed atomically:
|
|
co_await _t._cache.invalidate(std::move(updater), p_range);
|
|
_t._cache.refresh_snapshot();
|
|
|
|
co_await _t.delete_sstables_atomically(permit, _sstables_compacted_but_not_deleted);
|
|
// Clearing sstables_compacted_but_not_deleted only on success allows a retry caused
|
|
// by a failure during deletion to still find the sstables, despite they were removed
|
|
// from the sstable sets.
|
|
_sstables_compacted_but_not_deleted.clear();
|
|
if (utils::get_local_injector().enter("tablet_cleanup_failure_post_deletion")) {
|
|
tlogger.info("Cleanup failed for tablet {}", group_id());
|
|
throw std::runtime_error("tablet cleanup failure");
|
|
}
|
|
}
|
|
|
|
future<> table::clear_inactive_reads_for_tablet(database& db, storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups()) {
|
|
co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range());
|
|
}
|
|
}
|
|
|
|
future<> storage_group::stop(sstring reason) noexcept {
|
|
if (_async_gate.is_closed()) {
|
|
co_return;
|
|
}
|
|
// Carefully waits for close of gate after stopping compaction groups, since we don't want
|
|
// to wait on an ongoing compaction, *but* start it earlier to prevent iterations from
|
|
// picking this group that is being stopped.
|
|
auto closed_gate_fut = _async_gate.close();
|
|
|
|
co_await utils::get_local_injector().inject("wait_before_stop_compaction_groups", [] (auto& handler) -> future<> {
|
|
dblog.info("wait_before_stop_compaction_groups: wait");
|
|
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
|
dblog.info("wait_before_stop_compaction_groups: done");
|
|
}, false);
|
|
|
|
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
|
|
|
|
// The reason we have to stop main cg first, is because an ongoing split always run in main cg
|
|
// and output will be written to left and right groups. If either left or right are stopped before
|
|
// main, split completion will add sstable to a closed group, and that might in turn trigger an
|
|
// exception while running under row_cache::external_updater::execute, resulting in node crash.
|
|
co_await _main_cg->stop(reason);
|
|
co_await coroutine::parallel_for_each(_split_ready_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
|
|
return cg_ptr->stop(reason);
|
|
});
|
|
co_await coroutine::parallel_for_each(_merging_groups, [&reason] (const compaction_group_ptr& cg_ptr) {
|
|
return cg_ptr->stop(reason);
|
|
});
|
|
co_await std::move(closed_gate_fut);
|
|
}
|
|
|
|
future<> table::stop_compaction_groups(storage_group& sg) {
|
|
return sg.stop("tablet cleanup");
|
|
}
|
|
|
|
future<> table::flush_compaction_groups(storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups()) {
|
|
co_await cg_ptr->flush();
|
|
}
|
|
}
|
|
|
|
future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group& sg) {
|
|
for (auto& cg_ptr : sg.compaction_groups()) {
|
|
co_await cg_ptr->cleanup();
|
|
// FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation,
|
|
// and can cover some mutations which weren't cleaned, causing them to be lost during replay.
|
|
//
|
|
// This should be okay, because writes are not supposed to race with cleanups
|
|
// in the first place, but it would be better to extract the exact replay_position from
|
|
// the actually flushed/deleted sstables, like discard_sstable() does, so that the mutations
|
|
// cleaned from sstables are exactly the same as the ones cleaned from commitlog.
|
|
co_await sys_ks.save_commitlog_cleanup_record(schema()->id(), sg.token_range(), _highest_rp);
|
|
// This is the only place (outside of reboot) where we delete unneeded commitlog cleanup
|
|
// records. This isn't ideal -- it would be more natural if the unneeded records
|
|
// were deleted as soon as they become unneeded -- but this gets the job done with a
|
|
// minimal amount of code.
|
|
co_await sys_ks.drop_old_commitlog_cleanup_records(db.commitlog()->min_position());
|
|
}
|
|
|
|
tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name());
|
|
}
|
|
|
|
future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
|
auto holder = async_gate().hold();
|
|
|
|
auto sgp = _sg_manager->maybe_storage_group_for_id(_schema, tid.value());
|
|
if (!sgp) {
|
|
tlogger.warn("Storage group for tablet {} is deallocated. Ignore cleanup.", tid);
|
|
co_return;
|
|
}
|
|
|
|
auto& sg = *sgp;
|
|
co_await clear_inactive_reads_for_tablet(db, sg);
|
|
// compaction_group::stop takes care of flushing.
|
|
co_await stop_compaction_groups(sg);
|
|
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
|
|
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
|
co_await utils::get_local_injector().inject("tablet_cleanup_completion_wait", utils::wait_for_message(std::chrono::seconds(5)));
|
|
}
|
|
|
|
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
|
auto holder = async_gate().hold();
|
|
auto& sg = storage_group_for_id(tid.value());
|
|
|
|
co_await clear_inactive_reads_for_tablet(db, sg);
|
|
co_await flush_compaction_groups(sg);
|
|
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
|
}
|
|
|
|
shard_id table::shard_for_reads(dht::token t) const {
|
|
return _erm ? _erm->shard_for_reads(*_schema, t)
|
|
: dht::static_shard_of(*_schema, t); // for tests.
|
|
}
|
|
|
|
dht::shard_replica_set table::shard_for_writes(dht::token t) const {
|
|
return _erm ? _erm->shard_for_writes(*_schema, t)
|
|
: dht::shard_replica_set{dht::static_shard_of(*_schema, t)}; // for tests.
|
|
}
|
|
|
|
future<uint64_t> table::estimated_partitions_in_range(dht::token_range tr) const {
|
|
// FIXME: use a better estimation for the set than a simple sum of individual estimations for each sstable.
|
|
//
|
|
// If sstables can be grouped by token range,
|
|
// and tokens within each sstable are uniformly distributed
|
|
// over the token range, (that's the usual case, and it's always
|
|
// the case with tablets), then we could use an alternative calculation.
|
|
//
|
|
// The result would be the sum of sub-results for each compaction group.
|
|
// To get an estimate for a compaction group,
|
|
// we could first use the sstable cardinality sketches (hyperloglog)
|
|
// to get a decent post-compaction estimate of total cardinality.
|
|
// And then, we could multiply the total estimate by the fraction of the group's
|
|
// range which is covered by `tr`.
|
|
auto sstables = select_sstables(dht::to_partition_range(tr));
|
|
uint64_t partition_count = 0;
|
|
co_await seastar::max_concurrent_for_each(sstables, 10, [&partition_count, &tr] (sstables::shared_sstable sst) -> future<> {
|
|
partition_count += co_await sst->estimated_keys_for_range(tr);
|
|
});
|
|
co_return partition_count;
|
|
}
|
|
|
|
} // namespace replica
|