A compaction group has a separator buffer that holds the mixed segments alive until the separator buffer is flushed. A mixed segment can be freed only after all separator buffers that hold writes from the segment are flushed. Typically a separator buffer is flushed when it becomes full. However it's possible for example that one compaction groups is filled slower than others and holds many segments. To fix this we trigger a separator flush periodically for separator buffers that hold old segments. We track the active segment sequence number and for each separator buffer the oldest sequence number it holds.
2330 lines
106 KiB
C++
2330 lines
106 KiB
C++
/*
|
|
* Copyright (C) 2014-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "index/secondary_index_manager.hh"
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/execution_stage.hh>
|
|
#include <seastar/core/when_all.hh>
|
|
#include "replica/global_table_ptr.hh"
|
|
#include "replica/logstor/compaction.hh"
|
|
#include "types/user.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/hash.hh"
|
|
#include "cell_locking.hh"
|
|
#include "db_clock.hh"
|
|
#include "gc_clock.hh"
|
|
#include <chrono>
|
|
#include <seastar/core/sharded.hh>
|
|
#include <functional>
|
|
#include <unordered_map>
|
|
#include <set>
|
|
#include <boost/functional/hash.hpp>
|
|
#include <optional>
|
|
#include <string.h>
|
|
#include "types/types.hh"
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include "db/commitlog/replay_position.hh"
|
|
#include "db/commitlog/commitlog_types.hh"
|
|
#include "logstor/logstor.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "db/view/view.hh"
|
|
#include "db/snapshot-ctl.hh"
|
|
#include "memtable.hh"
|
|
#include "db/row_cache.hh"
|
|
#include "query/query-result.hh"
|
|
#include "compaction/compaction_strategy.hh"
|
|
#include "utils/estimated_histogram.hh"
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include "db/view/view_stats.hh"
|
|
#include "db/view/view_update_backlog.hh"
|
|
#include "db/view/row_locking.hh"
|
|
#include "utils/phased_barrier.hh"
|
|
#include "backlog_controller.hh"
|
|
#include "dirty_memory_manager.hh"
|
|
#include "reader_concurrency_semaphore_group.hh"
|
|
#include "db/timeout_clock.hh"
|
|
#include "replica/querier.hh"
|
|
#include "cache_temperature.hh"
|
|
#include <unordered_set>
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/updateable_value.hh"
|
|
#include "readers/multishard.hh"
|
|
#include "data_dictionary/user_types_metadata.hh"
|
|
#include "data_dictionary/keyspace_metadata.hh"
|
|
#include "data_dictionary/data_dictionary.hh"
|
|
#include "absl-flat_hash_map.hh"
|
|
#include "utils/cross-shard-barrier.hh"
|
|
#include "sstables/generation_type.hh"
|
|
#include "db/rate_limiter.hh"
|
|
#include "db/operation_type.hh"
|
|
#include "locator/tablets.hh"
|
|
#include "utils/serialized_action.hh"
|
|
#include "compaction/compaction_fwd.hh"
|
|
#include "compaction_group.hh"
|
|
#include "service/qos/qos_configuration_change_subscriber.hh"
|
|
#include "replica/tables_metadata_lock.hh"
|
|
#include "service/topology_guard.hh"
|
|
#include "utils/disk_space_monitor.hh"
|
|
|
|
class cell_locker;
|
|
class cell_locker_stats;
|
|
class locked_cell;
|
|
class mutation;
|
|
|
|
namespace compaction {
|
|
class compaction_completion_desc;
|
|
class compaction_data;
|
|
class compaction_descriptor;
|
|
class compaction_manager;
|
|
class compaction_reenabler;
|
|
class compaction_task_impl;
|
|
}
|
|
|
|
class frozen_mutation;
|
|
class reconcilable_result;
|
|
|
|
namespace bi = boost::intrusive;
|
|
|
|
namespace tracing { class trace_state_ptr; }
|
|
namespace s3 { struct endpoint_config; }
|
|
|
|
namespace lang { class manager; }
|
|
|
|
namespace service {
|
|
class storage_proxy;
|
|
class storage_service;
|
|
class migration_notifier;
|
|
class raft_group_registry;
|
|
}
|
|
|
|
namespace gms {
|
|
class feature_service;
|
|
}
|
|
|
|
namespace alternator {
|
|
class table_stats;
|
|
}
|
|
namespace sstables {
|
|
|
|
enum class sstable_state;
|
|
class sstable;
|
|
class storage_manager;
|
|
class sstables_manager;
|
|
class sstable_set;
|
|
class directory_semaphore;
|
|
struct sstable_files_snapshot;
|
|
struct entry_descriptor;
|
|
|
|
}
|
|
|
|
class sstable_compressor_factory;
|
|
|
|
namespace ser {
|
|
template<typename T>
|
|
class serializer;
|
|
}
|
|
|
|
namespace gms {
|
|
class gossiper;
|
|
}
|
|
|
|
namespace api {
|
|
class autocompaction_toggle_guard;
|
|
}
|
|
|
|
namespace db {
|
|
class commitlog;
|
|
class config;
|
|
class extensions;
|
|
class rp_handle;
|
|
class data_listeners;
|
|
class large_data_handler;
|
|
class system_table_corrupt_data_handler;
|
|
class nop_corrupt_data_handler;
|
|
class system_keyspace;
|
|
|
|
namespace view {
|
|
class view_update_generator;
|
|
}
|
|
|
|
}
|
|
|
|
namespace qos {
|
|
class service_level_controller;
|
|
}
|
|
|
|
class mutation_reordered_with_truncate_exception : public std::exception {};
|
|
|
|
class column_family_test;
|
|
class table_for_tests;
|
|
class database_test_wrapper;
|
|
using sstable_list = sstables::sstable_list;
|
|
|
|
class sigquit_handler;
|
|
|
|
extern logging::logger dblog;
|
|
|
|
namespace replica {
|
|
|
|
struct compaction_reenablers_and_lock_holders {
|
|
std::vector<std::unique_ptr<compaction::compaction_reenabler>> cres;
|
|
std::vector<seastar::rwlock::holder> lock_holders;
|
|
};
|
|
|
|
using shared_memtable = lw_shared_ptr<memtable>;
|
|
class global_table_ptr;
|
|
|
|
// We could just add all memtables, regardless of types, to a single list, and
|
|
// then filter them out when we read them. Here's why I have chosen not to do
|
|
// it:
|
|
//
|
|
// First, some of the methods in which a memtable is involved (like seal) are
|
|
// assume a commitlog, and go through great care of updating the replay
|
|
// position, flushing the log, etc. We want to bypass those, and that has to
|
|
// be done either by sprikling the seal code with conditionals, or having a
|
|
// separate method for each seal.
|
|
//
|
|
// Also, if we ever want to put some of the memtables in as separate allocator
|
|
// region group to provide for extra QoS, having the classes properly wrapped
|
|
// will make that trivial: just pass a version of new_memtable() that puts it
|
|
// in a different region, while the list approach would require a lot of
|
|
// conditionals as well.
|
|
//
|
|
// If we are going to have different methods, better have different instances
|
|
// of a common class.
|
|
class memtable_list {
|
|
public:
|
|
using seal_immediate_fn_type = std::function<future<> (flush_permit&&)>;
|
|
using intrusive_memtable_list = bi::list<
|
|
memtable,
|
|
bi::base_hook<bi::list_base_hook<bi::link_mode<bi::auto_unlink>>>,
|
|
bi::constant_time_size<false>>;
|
|
private:
|
|
std::vector<shared_memtable> _memtables;
|
|
intrusive_memtable_list _flushed_memtables_with_active_reads;
|
|
seal_immediate_fn_type _seal_immediate_fn;
|
|
std::function<schema_ptr()> _current_schema;
|
|
replica::dirty_memory_manager* _dirty_memory_manager;
|
|
memtable_table_shared_data& _table_shared_data;
|
|
std::optional<shared_future<>> _flush_coalescing;
|
|
seastar::scheduling_group _compaction_scheduling_group;
|
|
replica::table_stats& _table_stats;
|
|
shared_tombstone_gc_state* _shared_gc_state = nullptr;
|
|
public:
|
|
using iterator = decltype(_memtables)::iterator;
|
|
using const_iterator = decltype(_memtables)::const_iterator;
|
|
public:
|
|
memtable_list(
|
|
seal_immediate_fn_type seal_immediate_fn,
|
|
std::function<schema_ptr()> cs,
|
|
dirty_memory_manager* dirty_memory_manager,
|
|
memtable_table_shared_data& table_shared_data,
|
|
replica::table_stats& table_stats,
|
|
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group(),
|
|
shared_tombstone_gc_state* shared_gc_state = nullptr)
|
|
: _memtables({})
|
|
, _seal_immediate_fn(seal_immediate_fn)
|
|
, _current_schema(cs)
|
|
, _dirty_memory_manager(dirty_memory_manager)
|
|
, _table_shared_data(table_shared_data)
|
|
, _compaction_scheduling_group(compaction_scheduling_group)
|
|
, _table_stats(table_stats)
|
|
, _shared_gc_state(shared_gc_state)
|
|
{
|
|
add_memtable();
|
|
}
|
|
|
|
memtable_list(std::function<schema_ptr()> cs, dirty_memory_manager* dirty_memory_manager,
|
|
memtable_table_shared_data& table_shared_data,
|
|
replica::table_stats& table_stats,
|
|
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group(),
|
|
shared_tombstone_gc_state* shared_gc_state = nullptr)
|
|
: memtable_list({}, std::move(cs), dirty_memory_manager, table_shared_data, table_stats, compaction_scheduling_group, shared_gc_state) {
|
|
}
|
|
|
|
bool can_flush() const noexcept {
|
|
return bool(_seal_immediate_fn);
|
|
}
|
|
|
|
bool needs_flush() const noexcept {
|
|
return !empty();
|
|
}
|
|
|
|
bool empty() const noexcept {
|
|
for (auto& m : _memtables) {
|
|
if (!m->empty()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
shared_memtable back() const noexcept {
|
|
return _memtables.back();
|
|
}
|
|
|
|
// Returns the minimum live timestamp. Considers all memtables, even
|
|
// those that were flushed and removed with erase(), but an
|
|
// in-progress read is still using them.
|
|
// Memtables whose min live timestamp > max_seen_timestamp are ignored as we
|
|
// consider that their content is more recent than any potential tombstone in
|
|
// other mutation sources.
|
|
// Returns api::max_timestamp if the key is not in any of the memtables.
|
|
max_purgeable get_max_purgeable(const dht::decorated_key& dk, is_shadowable is, api::timestamp_type max_seen_timestamp) const noexcept;
|
|
|
|
// # 8904 - this method is akin to std::set::erase(key_type), not
|
|
// erase(iterator). Should be tolerant against non-existing.
|
|
void erase(const shared_memtable& element) noexcept {
|
|
auto i = std::ranges::find(_memtables, element);
|
|
if (i != _memtables.end()) {
|
|
_memtables.erase(i);
|
|
}
|
|
_flushed_memtables_with_active_reads.push_back(*element);
|
|
}
|
|
|
|
// Synchronously swaps the active memtable with a new, empty one,
|
|
// returning the old memtables list.
|
|
// Exception safe.
|
|
std::vector<replica::shared_memtable> clear_and_add();
|
|
|
|
size_t size() const noexcept {
|
|
return _memtables.size();
|
|
}
|
|
|
|
future<> seal_active_memtable(flush_permit&& permit) noexcept {
|
|
return _seal_immediate_fn(std::move(permit));
|
|
}
|
|
|
|
auto begin() noexcept {
|
|
return _memtables.begin();
|
|
}
|
|
|
|
auto begin() const noexcept {
|
|
return _memtables.begin();
|
|
}
|
|
|
|
auto end() noexcept {
|
|
return _memtables.end();
|
|
}
|
|
|
|
auto end() const noexcept {
|
|
return _memtables.end();
|
|
}
|
|
|
|
memtable& active_memtable() noexcept {
|
|
return *_memtables.back();
|
|
}
|
|
|
|
void add_memtable() {
|
|
_memtables.emplace_back(new_memtable());
|
|
}
|
|
|
|
dirty_memory_manager_logalloc::region_group& region_group() noexcept {
|
|
return _dirty_memory_manager->region_group();
|
|
}
|
|
// This is used for explicit flushes. Will queue the memtable for flushing and proceed when the
|
|
// dirty_memory_manager allows us to. We will not seal at this time since the flush itself
|
|
// wouldn't happen anyway. Keeping the memtable in memory will potentially increase the time it
|
|
// spends in memory allowing for more coalescing opportunities.
|
|
// The returned future<> resolves when any pending flushes are complete and the memtable is sealed.
|
|
future<> flush();
|
|
private:
|
|
lw_shared_ptr<memtable> new_memtable();
|
|
};
|
|
|
|
class distributed_loader;
|
|
class table_populator;
|
|
|
|
// The CF has a "stats" structure. But we don't want all fields here,
|
|
// since some of them are fairly complex for exporting to collectd. Also,
|
|
// that structure matches what we export via the API, so better leave it
|
|
// untouched. And we need more fields. We will summarize it in here what
|
|
// we need.
|
|
struct cf_stats {
|
|
int64_t pending_memtables_flushes_count = 0;
|
|
int64_t pending_memtables_flushes_bytes = 0;
|
|
int64_t failed_memtables_flushes_count = 0;
|
|
|
|
// number of time the clustering filter was executed
|
|
int64_t clustering_filter_count = 0;
|
|
// sstables considered by the filter (so dividing this by the previous one we get average sstables per read)
|
|
int64_t sstables_checked_by_clustering_filter = 0;
|
|
// number of times the filter passed the fast-path checks
|
|
int64_t clustering_filter_fast_path_count = 0;
|
|
// how many sstables survived the clustering key checks
|
|
int64_t surviving_sstables_after_clustering_filter = 0;
|
|
|
|
// How many view updates were dropped due to overload.
|
|
int64_t dropped_view_updates = 0;
|
|
|
|
// How many times view building was paused (e.g. due to node unavailability)
|
|
int64_t view_building_paused = 0;
|
|
|
|
// How many view updates were processed for all tables
|
|
uint64_t total_view_updates_pushed_local = 0;
|
|
uint64_t total_view_updates_pushed_remote = 0;
|
|
uint64_t total_view_updates_failed_local = 0;
|
|
uint64_t total_view_updates_failed_remote = 0;
|
|
|
|
// How many times we build view updates only to realize it's the wrong node and drop the update
|
|
uint64_t total_view_updates_on_wrong_node = 0;
|
|
|
|
// How many times we failed to resolve base/view pairing
|
|
uint64_t total_view_updates_failed_pairing = 0;
|
|
// How many times we had to send additional view updates when there was more view replicas than base replicas during pairing.
|
|
uint64_t total_view_updates_due_to_replica_count_mismatch = 0;
|
|
};
|
|
|
|
class table;
|
|
using column_family = table;
|
|
struct table_stats;
|
|
using column_family_stats = table_stats;
|
|
|
|
class database_sstable_write_monitor;
|
|
|
|
extern const ssize_t new_reader_base_cost;
|
|
|
|
struct table_stats {
|
|
/** Number of times flush has resulted in the memtable being switched out. */
|
|
int64_t memtable_switch_count = 0;
|
|
/** Estimated number of tasks pending for this column family */
|
|
int64_t pending_flushes = 0;
|
|
sstables::file_size_stats live_disk_space_used;
|
|
sstables::file_size_stats total_disk_space_used;
|
|
int64_t live_sstable_count = 0;
|
|
/** Estimated number of compactions pending for this column family */
|
|
int64_t pending_compactions = 0;
|
|
/** Number of pending tasks that will potentially perform deletions */
|
|
int64_t pending_sstable_deletions = 0;
|
|
int64_t memtable_partition_insertions = 0;
|
|
int64_t memtable_partition_hits = 0;
|
|
int64_t memtable_range_tombstone_reads = 0;
|
|
int64_t memtable_row_tombstone_reads = 0;
|
|
int64_t tablet_count = 0;
|
|
mutation_application_stats memtable_app_stats;
|
|
utils::timed_rate_moving_average_summary_and_histogram reads{256};
|
|
utils::timed_rate_moving_average_summary_and_histogram writes{256};
|
|
utils::timed_rate_moving_average_summary_and_histogram cas_prepare{256};
|
|
utils::timed_rate_moving_average_summary_and_histogram cas_accept{256};
|
|
utils::timed_rate_moving_average_summary_and_histogram cas_learn{256};
|
|
utils::estimated_histogram estimated_sstable_per_read{35};
|
|
utils::timed_rate_moving_average_and_histogram tombstone_scanned;
|
|
utils::timed_rate_moving_average_and_histogram live_scanned;
|
|
utils::estimated_histogram estimated_coordinator_read;
|
|
shared_ptr<alternator::table_stats> alternator_stats;
|
|
};
|
|
|
|
using storage_options = data_dictionary::storage_options;
|
|
|
|
// Smart table pointer that guards the table object
|
|
// while it's being accessed asynchronously
|
|
class table_holder {
|
|
gate::holder _holder;
|
|
lw_shared_ptr<table> _table_ptr;
|
|
public:
|
|
explicit table_holder(table&);
|
|
|
|
const table& operator*() const noexcept {
|
|
return *_table_ptr;
|
|
}
|
|
|
|
table& operator*() noexcept {
|
|
return *_table_ptr;
|
|
}
|
|
|
|
const table* operator->() const noexcept {
|
|
return _table_ptr.operator->();
|
|
}
|
|
|
|
table* operator->() noexcept {
|
|
return _table_ptr.operator->();
|
|
}
|
|
};
|
|
|
|
class table : public enable_lw_shared_from_this<table>
|
|
, public weakly_referencable<table> {
|
|
public:
|
|
struct config {
|
|
bool enable_disk_writes = true;
|
|
bool enable_disk_reads = true;
|
|
bool enable_cache = true;
|
|
bool enable_commitlog = true;
|
|
bool enable_incremental_backups = false;
|
|
utils::updateable_value<bool> compaction_enforce_min_threshold{false};
|
|
bool enable_dangerous_direct_import_of_cassandra_counters = false;
|
|
replica::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
|
reader_concurrency_semaphore* streaming_read_concurrency_semaphore;
|
|
reader_concurrency_semaphore* compaction_concurrency_semaphore;
|
|
replica::cf_stats* cf_stats = nullptr;
|
|
seastar::scheduling_group memtable_scheduling_group;
|
|
seastar::scheduling_group memtable_to_cache_scheduling_group;
|
|
seastar::scheduling_group memory_compaction_scheduling_group;
|
|
seastar::scheduling_group streaming_scheduling_group;
|
|
bool enable_metrics_reporting = false;
|
|
bool enable_node_aggregated_table_metrics = true;
|
|
size_t view_update_memory_semaphore_limit;
|
|
db::data_listeners* data_listeners = nullptr;
|
|
uint32_t tombstone_warn_threshold{0};
|
|
unsigned x_log2_compaction_groups{0};
|
|
utils::updateable_value<bool> enable_compacting_data_for_streaming_and_repair;
|
|
utils::updateable_value<bool> enable_tombstone_gc_for_streaming_and_repair;
|
|
};
|
|
|
|
using snapshot_details = db::snapshot_ctl::table_snapshot_details;
|
|
struct cache_hit_rate {
|
|
cache_temperature rate;
|
|
lowres_clock::time_point last_updated;
|
|
};
|
|
private:
|
|
schema_ptr _schema;
|
|
config _config;
|
|
locator::effective_replication_map_ptr _erm;
|
|
lw_shared_ptr<const storage_options> _storage_opts;
|
|
memtable_table_shared_data _memtable_shared_data;
|
|
mutable table_stats _stats;
|
|
mutable db::view::stats _view_stats;
|
|
mutable row_locker::stats _row_locker_stats;
|
|
|
|
uint64_t _failed_counter_applies_to_memtable = 0;
|
|
|
|
template<typename... Args>
|
|
void do_apply(compaction_group& cg, db::rp_handle&&, Args&&... args);
|
|
|
|
lw_shared_ptr<memtable_list> make_memory_only_memtable_list();
|
|
lw_shared_ptr<memtable_list> make_memtable_list(compaction_group& cg);
|
|
|
|
compaction::compaction_manager& _compaction_manager;
|
|
compaction::compaction_strategy _compaction_strategy;
|
|
// The storage_group_manager manages either a single storage_group for vnodes or per-tablet storage_group for tablets.
|
|
// It contains and manages both the compaction_groups list and the storage_groups vector.
|
|
std::unique_ptr<storage_group_manager> _sg_manager;
|
|
// Compound SSTable set for all the compaction groups, which is useful for operations spanning all of them.
|
|
lw_shared_ptr<const sstables::sstable_set> _sstables;
|
|
// Control background fibers waiting for sstables to be deleted
|
|
seastar::named_gate _sstable_deletion_gate;
|
|
// This semaphore ensures that any operation updating the SSTable list and deleting unused
|
|
// SSTables will be atomic to other concurrent operations.
|
|
// That means snapshot, for example, won't have its selected sstables deleted by compaction
|
|
// in parallel, a race condition which could easily result in failure.
|
|
// Another example is snapshot not being able to see SSTables deleted by tablet cleanup,
|
|
// while cleanup is in the middle of the list update.
|
|
// TODO: find a better name for this semaphore.
|
|
seastar::named_semaphore _sstable_set_mutation_sem = {1, named_semaphore_exception_factory{"sstable set mutation"}};
|
|
mutable row_cache _cache; // Cache covers only sstables.
|
|
sstables::sstable_generation_generator _sstable_generation_generator;
|
|
|
|
db::replay_position _highest_rp;
|
|
// Tracks the highest replay position flushed to a sstable
|
|
db::replay_position _highest_flushed_rp;
|
|
// Tracks the highest position before flush actually starts
|
|
db::replay_position _flush_rp;
|
|
db::replay_position _lowest_allowed_rp;
|
|
|
|
// Provided by the database that owns this commitlog
|
|
db::commitlog* _commitlog;
|
|
// The table is constructed in readonly mode - this flag is true after the constructor finishes.
|
|
// This allows to read the table on the early stages of the node boot process,
|
|
// when the commitlog is not yet initialized.
|
|
// The flag is set to false in mark_ready_for_writes function, which
|
|
// is called when the commitlog is created and the table is ready to accept writes.
|
|
bool _readonly;
|
|
bool _durable_writes;
|
|
sstables::sstables_manager& _sstables_manager;
|
|
secondary_index::secondary_index_manager _index_manager;
|
|
bool _compaction_disabled_by_user = false;
|
|
bool _tombstone_gc_enabled = true;
|
|
utils::phased_barrier _flush_barrier;
|
|
std::vector<view_ptr> _views;
|
|
|
|
logstor::logstor* _logstor = nullptr;
|
|
std::unique_ptr<logstor::primary_index> _logstor_index;
|
|
|
|
std::unique_ptr<cell_locker> _counter_cell_locks; // Memory-intensive; allocate only when needed.
|
|
|
|
// Labels used to identify writes and reads for this table in the rate_limiter structure.
|
|
db::rate_limiter::label _rate_limiter_label_for_writes;
|
|
db::rate_limiter::label _rate_limiter_label_for_reads;
|
|
|
|
void set_metrics();
|
|
seastar::metrics::metric_groups _metrics;
|
|
|
|
// holds average cache hit rate of all shards
|
|
// recalculated periodically
|
|
cache_temperature _global_cache_hit_rate = cache_temperature(0.0f);
|
|
|
|
// holds cache hit rates per each node in a cluster
|
|
// may not have information for some node, since it fills
|
|
// in dynamically
|
|
std::unordered_map<locator::host_id, cache_hit_rate> _cluster_cache_hit_rates;
|
|
|
|
// Operations like truncate, flush, query, etc, may depend on a column family being alive to
|
|
// complete. Some of them have their own gate already (like flush), used in specialized wait
|
|
// logic. That is particularly useful if there is a particular
|
|
// order in which we need to close those gates. For all the others operations that don't have
|
|
// such needs, we have this generic _async_gate, which all potentially asynchronous operations
|
|
// have to get. It will be closed by stop().
|
|
seastar::named_gate _async_gate;
|
|
|
|
double _cached_percentile = -1;
|
|
lowres_clock::time_point _percentile_cache_timestamp;
|
|
std::chrono::milliseconds _percentile_cache_value;
|
|
|
|
// Phaser used to synchronize with in-progress writes. This is useful for code that,
|
|
// after some modification, needs to ensure that news writes will see it before
|
|
// it can proceed, such as the view building code.
|
|
utils::phased_barrier _pending_writes_phaser;
|
|
// Corresponding phaser for in-progress reads.
|
|
utils::phased_barrier _pending_reads_phaser;
|
|
// Corresponding phaser for in-progress streams
|
|
utils::phased_barrier _pending_streams_phaser;
|
|
// Corresponding phaser for in-progress flushes
|
|
utils::phased_barrier _pending_flushes_phaser;
|
|
|
|
// This field cashes the last truncation time for the table.
|
|
// The master resides in system.truncated table
|
|
std::optional<db_clock::time_point> _truncated_at;
|
|
|
|
// This field is used to determine whether the table is eligible to write rejection on critical
|
|
// disk utilization.
|
|
bool _eligible_to_write_rejection_on_critical_disk_utilization { false };
|
|
|
|
bool _is_bootstrap_or_replace = false;
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state state);
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state state, sstables::sstable_version_types version);
|
|
|
|
public:
|
|
void on_flush_timer();
|
|
void deregister_metrics();
|
|
|
|
data_dictionary::table as_data_dictionary() const;
|
|
|
|
// The usage of these functions are restricted to preexisting sstables that aren't being
|
|
// moved anywhere, so should never be used in the context of file streaming and intra
|
|
// node migration. The only user today is distributed loader, which populates the
|
|
// sstables for each column family on boot.
|
|
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
|
|
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
|
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
|
|
|
|
bool add_logstor_segment(logstor::segment_descriptor&, dht::token first_token, dht::token last_token);
|
|
|
|
logstor::separator_buffer& get_logstor_separator_buffer(dht::token token, size_t write_size);
|
|
|
|
// Restricted to new sstables produced by external processes such as repair.
|
|
// The sstable might undergo split if table is in split mode.
|
|
// If no need for split, the input sstable will only be attached to the sstable set.
|
|
// If split happens, the output sstables will be attached and the input sstable unlinked.
|
|
// On failure, the input sstable is unlinked and exception propagated to the caller.
|
|
// The on_add callback will be called on all sstables to be added into the set.
|
|
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
|
|
add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
|
std::function<future<>(sstables::shared_sstable)> on_add,
|
|
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
|
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
|
|
add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
|
std::function<future<>(sstables::shared_sstable)> on_add);
|
|
|
|
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
|
sstables::shared_sstable make_sstable();
|
|
void set_truncation_time(db_clock::time_point truncated_at) noexcept {
|
|
_truncated_at = truncated_at;
|
|
}
|
|
db_clock::time_point get_truncation_time() const;
|
|
|
|
void notify_bootstrap_or_replace_start();
|
|
|
|
void notify_bootstrap_or_replace_end();
|
|
|
|
// Ensures that concurrent preemptible mutations to sstable lists will produce correct results.
|
|
// User will hold this permit until done with all updates. As soon as it's released, another concurrent
|
|
// attempt to update the lists will be able to proceed.
|
|
struct sstable_list_permit {
|
|
using permit_t = semaphore_units<seastar::named_semaphore_exception_factory>;
|
|
permit_t permit;
|
|
|
|
sstable_list_permit(permit_t p) : permit(std::move(p)) {}
|
|
};
|
|
// This permit ensures that the observer won't be able to find the intermediate state left by any
|
|
// process updating the sstable list. It guarantees atomicity of list updates. That being said,
|
|
// an iteration over the list should always acquire this permit in order to guarantee stability
|
|
// during the traversal. For example, that none of the SSTables will be found unlinked.
|
|
future<sstable_list_permit> get_sstable_list_permit();
|
|
|
|
class sstable_list_builder {
|
|
lw_shared_ptr<table> _t;
|
|
sstable_list_permit _permit;
|
|
public:
|
|
explicit sstable_list_builder(table& t, sstable_list_permit p) : _t(t.shared_from_this()), _permit(std::move(p)) {}
|
|
sstable_list_builder& operator=(const sstable_list_builder&) = delete;
|
|
sstable_list_builder(const sstable_list_builder&) = delete;
|
|
|
|
// Struct to return the newly built sstable set and the removed sstables
|
|
struct result {
|
|
lw_shared_ptr<sstables::sstable_set> new_sstable_set;
|
|
std::vector<sstables::shared_sstable> removed_sstables;
|
|
};
|
|
|
|
// Builds new sstable set from existing one, with new sstables added to it and old sstables removed from it.
|
|
// Returns the updated sstable set and a list of removed sstables.
|
|
future<result>
|
|
build_new_list(const sstables::sstable_set& current_sstables,
|
|
sstables::sstable_set new_sstable_list,
|
|
const std::vector<sstables::shared_sstable>& new_sstables,
|
|
const std::vector<sstables::shared_sstable>& old_sstables);
|
|
|
|
future<> delete_sstables_atomically(std::vector<sstables::shared_sstable> sstables_to_remove);
|
|
};
|
|
// NOTE: Always use this interface for deleting SSTables in the table, since it guarantees
|
|
// synchronization with concurrent iterations.
|
|
future<> delete_sstables_atomically(const sstable_list_permit&, std::vector<sstables::shared_sstable> sstables_to_remove);
|
|
|
|
// Precondition: table needs tablet splitting.
|
|
// Returns true if all storage of table is ready for splitting.
|
|
bool all_storage_groups_split();
|
|
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info);
|
|
|
|
// Splits compaction group of a single tablet, if and only if the underlying table has
|
|
// split request emitted by coordinator (found in tablet metadata).
|
|
// If split is required, then the compaction group of the given tablet is guaranteed to
|
|
// be split once it returns.
|
|
future<> maybe_split_compaction_group_of(locator::tablet_id);
|
|
|
|
dht::token_range get_token_range_after_split(const dht::token&) const noexcept;
|
|
private:
|
|
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
|
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
|
// NOTE: it must only be used on new SSTables that weren't added to the set yet.
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable&);
|
|
|
|
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
|
|
// each tablet split into two, so this replica will remap all of its compaction groups
|
|
// that were previously split.
|
|
future<> handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap);
|
|
|
|
// Select a storage group from a given token.
|
|
storage_group& storage_group_for_token(dht::token token) const;
|
|
// Return storage groups, present in this shard, that own a particular token range.
|
|
// This is a much safer interface to view all data belonging to a tablet replica since data can be
|
|
// moved across compaction groups belonging to the same replica. So data could escape an iteration
|
|
// on compaction groups. Iterating on storage groups instead, allows the caller to see all the
|
|
// data at any point in time. In short, writes can operate on compaction group level, but reads
|
|
// must operate on storage group level.
|
|
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const;
|
|
storage_group& storage_group_for_id(size_t i) const;
|
|
|
|
std::unique_ptr<storage_group_manager> make_storage_group_manager();
|
|
compaction_group* get_compaction_group(size_t id) const;
|
|
// NOTE: all readers must only operate on storage groups, which can provide all data belonging to
|
|
// a given tablet replica. Interfaces below should only be used in the context of writes, for
|
|
// example, to append data to memtable. Iterating on compaction groups is susceptible to races
|
|
// since sstables might move across compaction groups in background.
|
|
// Select a compaction group from a given token.
|
|
compaction_group& compaction_group_for_token(dht::token token) const;
|
|
// Select a compaction group from a given key.
|
|
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const;
|
|
// Select a compaction group from a given sstable based on its token range.
|
|
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const;
|
|
// Safely iterate through compaction groups, while performing async operations on them.
|
|
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
|
|
void for_each_compaction_group(std::function<void(compaction_group&)> action);
|
|
void for_each_compaction_group(std::function<void(const compaction_group&)> action) const;
|
|
// Unsafe reference to all storage groups. Don't use it across preemption points.
|
|
const storage_group_map& storage_groups() const;
|
|
|
|
// Returns a sstable set that can be safely used for purging any expired tombstone in a compaction group.
|
|
// Only the sstables in the compaction group is not sufficient, since there might be other compaction
|
|
// groups during tablet split with overlapping token range, and we need to include them all in a single
|
|
// sstable set to allow safe tombstone gc.
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc(const compaction_group&) const;
|
|
|
|
bool cache_enabled() const {
|
|
return _config.enable_cache && _schema->caching_options().enabled();
|
|
}
|
|
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
|
|
// This function can throw even if the sstable was added into the set. When the sstable was successfully
|
|
// added, the sstable ptr @sst will be set to nullptr. Allowing caller to optionally discard the sstable.
|
|
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy, bool trigger_compaction);
|
|
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
|
|
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
|
|
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);
|
|
void add_maintenance_sstable(compaction_group& cg, sstables::shared_sstable sst);
|
|
static void add_sstable_to_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
|
|
static void remove_sstable_from_backlog_tracker(compaction::compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
|
|
lw_shared_ptr<memtable> new_memtable();
|
|
future<> try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
|
|
// Caller must keep m alive.
|
|
future<> update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts);
|
|
struct merge_comparator;
|
|
|
|
sstables::generation_type calculate_generation_for_new_table();
|
|
private:
|
|
void rebuild_statistics();
|
|
void subtract_compaction_group_from_stats(const compaction_group& cg) noexcept;
|
|
private:
|
|
mutation_source_opt _virtual_reader;
|
|
std::optional<noncopyable_function<future<>(const frozen_mutation&)>> _virtual_writer;
|
|
|
|
// Creates a mutation reader which covers given sstables.
|
|
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
|
// The 'range' parameter must be live as long as the reader is used.
|
|
// Mutations returned by the reader will all have given schema.
|
|
mutation_reader make_sstable_reader(schema_ptr schema,
|
|
reader_permit permit,
|
|
lw_shared_ptr<const sstables::sstable_set> sstables,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstables::sstable_predicate& = sstables::default_sstable_predicate(),
|
|
sstables::integrity_check integrity = sstables::integrity_check::no) const;
|
|
|
|
lw_shared_ptr<const sstables::sstable_set> make_compound_sstable_set() const;
|
|
// Compound sstable set must be refreshed whenever any of its managed sets are changed
|
|
void refresh_compound_sstable_set();
|
|
|
|
max_purgeable_fn get_max_purgeable_fn_for_cache_underlying_reader() const;
|
|
snapshot_source sstables_as_snapshot_source();
|
|
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<const sstables::sstable_set>);
|
|
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
|
|
|
dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
|
|
return _config.dirty_memory_manager->region_group();
|
|
}
|
|
|
|
// reserve_fn will be called before any element is added to readers
|
|
void add_memtables_to_reader_list(std::vector<mutation_reader>& readers,
|
|
const schema_ptr& s,
|
|
const reader_permit& permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
const tracing::trace_state_ptr& trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
std::function<void(size_t)> reserve_fn) const;
|
|
public:
|
|
const storage_options& get_storage_options() const noexcept { return *_storage_opts; }
|
|
lw_shared_ptr<const storage_options> get_storage_options_ptr() const noexcept { return _storage_opts; }
|
|
future<> init_storage();
|
|
future<> destroy_storage();
|
|
|
|
seastar::named_gate& async_gate() { return _async_gate; }
|
|
|
|
uint64_t failed_counter_applies_to_memtable() const {
|
|
return _failed_counter_applies_to_memtable;
|
|
}
|
|
|
|
// This function should be called when this column family is ready for writes, IOW,
|
|
// to produce SSTables. Extensive details about why this is important can be found
|
|
// in Scylla's Github Issue #1014
|
|
//
|
|
// Nothing should be writing to SSTables before we have the chance to populate the
|
|
// existing SSTables and calculate what should the next generation number be.
|
|
//
|
|
// However, if that happens, we want to protect against it in a way that does not
|
|
// involve overwriting existing tables. This is one of the ways to do it: every
|
|
// column family starts in an unwriteable state, and when it can finally be written
|
|
// to, we mark it as writeable.
|
|
//
|
|
// Note that this *cannot* be a part of add_column_family. That adds a column family
|
|
// to a db in memory only, and if anybody is about to write to a CF, that was most
|
|
// likely already called. We need to call this explicitly when we are sure we're ready
|
|
// to issue disk operations safely.
|
|
void mark_ready_for_writes(db::commitlog* cl);
|
|
|
|
void init_logstor(logstor::logstor* ls);
|
|
|
|
bool uses_logstor() const {
|
|
return _logstor != nullptr;
|
|
}
|
|
|
|
logstor::primary_index& logstor_index() noexcept {
|
|
return *_logstor_index;
|
|
}
|
|
const logstor::primary_index& logstor_index() const noexcept {
|
|
return *_logstor_index;
|
|
}
|
|
|
|
size_t get_logstor_memory_usage() const;
|
|
|
|
// Creates a mutation reader which covers all data sources for this column family.
|
|
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
|
// Note: for data queries use query() instead.
|
|
// The 'range' parameter must be live as long as the reader is used.
|
|
// Mutations returned by the reader will all have given schema.
|
|
mutation_reader make_mutation_reader(schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state = nullptr,
|
|
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
|
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
|
mutation_reader make_mutation_reader_excluding_staging(schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state = nullptr,
|
|
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
|
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
|
|
|
mutation_reader make_mutation_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range = query::full_partition_range) const {
|
|
auto& full_slice = schema->full_slice();
|
|
return make_mutation_reader(std::move(schema), std::move(permit), range, full_slice);
|
|
}
|
|
|
|
mutation_reader make_logstor_mutation_reader(schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) const;
|
|
|
|
// The streaming mutation reader differs from the regular mutation reader in that:
|
|
// - Reflects all writes accepted by replica prior to creation of the
|
|
// reader and a _bounded_ amount of writes which arrive later.
|
|
// - Does not populate the cache
|
|
// Requires ranges to be sorted and disjoint.
|
|
// When compaction_time is engaged, the reader's output will be compacted, with the provided query time.
|
|
// This compaction doesn't do tombstone garbage collection.
|
|
mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit,
|
|
const dht::partition_range_vector& ranges, gc_clock::time_point compaction_time) const;
|
|
|
|
// Single range overload.
|
|
mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
mutation_reader::forwarding fwd_mr,
|
|
gc_clock::time_point compaction_time) const;
|
|
|
|
mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time) {
|
|
return make_streaming_reader(schema, std::move(permit), range, schema->full_slice(), mutation_reader::forwarding::no, compaction_time);
|
|
}
|
|
|
|
// Stream reader from the given sstables
|
|
mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const;
|
|
|
|
// Make a reader which reads only from the row-cache.
|
|
// The reader doesn't populate the cache, it reads only what is in the cache
|
|
// Supports reading only a single partition.
|
|
// Does not support reading in reverse.
|
|
mutation_reader make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
|
|
const query::partition_slice& slice, tracing::trace_state_ptr ts);
|
|
|
|
sstables::shared_sstable make_streaming_sstable_for_write();
|
|
sstables::shared_sstable make_streaming_staging_sstable();
|
|
|
|
mutation_source as_mutation_source() const;
|
|
mutation_source as_mutation_source_excluding_staging() const;
|
|
|
|
// Select all memtables which contain this token and return them as mutation sources.
|
|
// We could return memtables here, but table has no public memtable accessors so far.
|
|
// Memtables are mutable objects, so it is best to keep it this way.
|
|
std::vector<mutation_source> select_memtables_as_mutation_sources(dht::token) const;
|
|
|
|
void set_virtual_reader(mutation_source virtual_reader) {
|
|
_virtual_reader = std::move(virtual_reader);
|
|
}
|
|
|
|
void set_virtual_writer(noncopyable_function<future<>(const frozen_mutation&)> writer) {
|
|
_virtual_writer.emplace(std::move(writer));
|
|
}
|
|
|
|
bool is_virtual() const {
|
|
return _virtual_reader || _virtual_writer;
|
|
}
|
|
|
|
// Queries can be satisfied from multiple data sources, so they are returned
|
|
// as temporaries.
|
|
//
|
|
// FIXME: in case a query is satisfied from a single memtable, avoid a copy
|
|
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
|
|
using const_row_ptr = std::unique_ptr<const row>;
|
|
// Allow an action to be performed on each active memtable, each of which belongs to a different compaction group.
|
|
void for_each_active_memtable(noncopyable_function<void(memtable&)> action);
|
|
api::timestamp_type min_memtable_timestamp() const;
|
|
api::timestamp_type min_memtable_live_timestamp() const;
|
|
api::timestamp_type min_memtable_live_row_marker_timestamp() const;
|
|
api::timestamp_type get_max_timestamp_for_tablet(locator::tablet_id) const;
|
|
|
|
const row_cache& get_row_cache() const {
|
|
return _cache;
|
|
}
|
|
|
|
row_cache& get_row_cache() {
|
|
return _cache;
|
|
}
|
|
|
|
db::rate_limiter::label& get_rate_limiter_label_for_op_type(db::operation_type op_type) {
|
|
switch (op_type) {
|
|
case db::operation_type::write:
|
|
return _rate_limiter_label_for_writes;
|
|
case db::operation_type::read:
|
|
return _rate_limiter_label_for_reads;
|
|
}
|
|
std::abort(); // compiler will error if we get here
|
|
}
|
|
|
|
db::rate_limiter::label& get_rate_limiter_label_for_writes() {
|
|
return _rate_limiter_label_for_writes;
|
|
}
|
|
|
|
db::rate_limiter::label& get_rate_limiter_label_for_reads() {
|
|
return _rate_limiter_label_for_reads;
|
|
}
|
|
|
|
future<std::vector<locked_cell>> lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout);
|
|
|
|
logalloc::occupancy_stats occupancy() const;
|
|
public:
|
|
table(schema_ptr schema, config cfg, lw_shared_ptr<const storage_options> sopts, compaction::compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm);
|
|
|
|
table(column_family&&) = delete; // 'this' is being captured during construction
|
|
~table();
|
|
|
|
table_holder hold() {
|
|
return table_holder(*this);
|
|
}
|
|
|
|
const schema_ptr& schema() const { return _schema; }
|
|
void set_schema(schema_ptr);
|
|
db::commitlog* commitlog() const;
|
|
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
|
|
void update_effective_replication_map(locator::effective_replication_map_ptr);
|
|
[[gnu::always_inline]] bool uses_tablets() const;
|
|
int64_t calculate_tablet_count() const;
|
|
private:
|
|
void update_tombstone_gc_rf_one();
|
|
|
|
future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg);
|
|
future<> stop_compaction_groups(storage_group& sg);
|
|
future<> flush_compaction_groups(storage_group& sg);
|
|
future<> cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group& sg);
|
|
public:
|
|
future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id);
|
|
// For tests only.
|
|
future<> cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid);
|
|
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
|
|
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
|
|
shard_id shard_for_reads(dht::token t) const;
|
|
dht::shard_replica_set shard_for_writes(dht::token t) const;
|
|
// Applies given mutation to this column family
|
|
// The mutation is always upgraded to current schema.
|
|
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) {
|
|
do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema);
|
|
}
|
|
void apply(const mutation& m, db::rp_handle&& h = {}) {
|
|
do_apply(compaction_group_for_token(m.token()), std::move(h), m);
|
|
}
|
|
|
|
future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point tmo);
|
|
future<> apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point tmo);
|
|
|
|
// Returns at most "cmd.limit" rows
|
|
// The saved_querier parameter is an input-output parameter which contains
|
|
// the saved querier from the previous page (if there was one) and after
|
|
// completion it contains the to-be saved querier for the next page (if
|
|
// there is one). Pass nullptr when queriers are not saved.
|
|
future<lw_shared_ptr<query::result>>
|
|
query(schema_ptr,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
query::result_options opts,
|
|
const dht::partition_range_vector& ranges,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_limiter& memory_limiter,
|
|
db::timeout_clock::time_point timeout,
|
|
std::optional<querier>* saved_querier = { });
|
|
|
|
// Performs a query on given data source returning data in reconcilable form.
|
|
//
|
|
// Reads at most row_limit rows. If less rows are returned, the data source
|
|
// didn't have more live data satisfying the query.
|
|
//
|
|
// Any cells which have expired according to query_time are returned as
|
|
// deleted cells and do not count towards live data. The mutations are
|
|
// compact, meaning that any cell which is covered by higher-level tombstone
|
|
// is absent in the results.
|
|
//
|
|
// 'source' doesn't have to survive deferring.
|
|
//
|
|
// The saved_querier parameter is an input-output parameter which contains
|
|
// the saved querier from the previous page (if there was one) and after
|
|
// completion it contains the to-be saved querier for the next page (if
|
|
// there is one). Pass nullptr when queriers are not saved.
|
|
future<reconcilable_result>
|
|
mutation_query(schema_ptr query_schema,
|
|
reader_permit permit,
|
|
const query::read_command& cmd,
|
|
const dht::partition_range& range,
|
|
tracing::trace_state_ptr trace_state,
|
|
query::result_memory_accounter accounter,
|
|
db::timeout_clock::time_point timeout,
|
|
bool tombstone_gc_enabled = true,
|
|
std::optional<querier>* saved_querier = { });
|
|
|
|
void start();
|
|
future<> stop();
|
|
future<> flush(std::optional<db::replay_position> = {});
|
|
bool needs_flush() const;
|
|
future<> clear(); // discards memtable(s) without flushing them to disk.
|
|
future<db::replay_position> discard_sstables(db_clock::time_point);
|
|
future<> discard_logstor_segments();
|
|
|
|
bool can_flush() const;
|
|
|
|
using do_flush = bool_class<struct do_flush_tag>;
|
|
|
|
// Start a compaction of all sstables in a process known as major compaction
|
|
// Active memtable is flushed first to guarantee that data like tombstone,
|
|
// sitting in the memtable, will be compacted with shadowed data.
|
|
future<> compact_all_sstables(tasks::task_info info, do_flush = do_flush::yes, bool consider_only_existing_data = false);
|
|
|
|
future<bool> snapshot_exists(sstring name);
|
|
|
|
db::replay_position set_low_replay_position_mark();
|
|
db::replay_position highest_flushed_replay_position() const;
|
|
|
|
future<std::pair<std::vector<sstables::shared_sstable>, sstable_list_permit>> snapshot_sstables();
|
|
|
|
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
|
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
|
|
|
|
bool incremental_backups_enabled() const {
|
|
return _config.enable_incremental_backups;
|
|
}
|
|
|
|
void set_incremental_backups(bool val) {
|
|
_config.enable_incremental_backups = val;
|
|
}
|
|
|
|
bool uses_static_sharding() const {
|
|
return !_erm || _erm->get_replication_strategy().is_vnode_based();
|
|
}
|
|
|
|
/*!
|
|
* \brief get sstables by key
|
|
* Return a set of the sstables names that contain the given
|
|
* partition key in nodetool format
|
|
*/
|
|
future<std::unordered_set<sstables::shared_sstable>> get_sstables_by_partition_key(const sstring& key) const;
|
|
|
|
const sstables::sstable_set& get_sstable_set() const;
|
|
lw_shared_ptr<const sstable_list> get_sstables() const;
|
|
lw_shared_ptr<const sstable_list> get_sstables_including_compacted_undeleted() const;
|
|
std::vector<sstables::shared_sstable> select_sstables(const dht::partition_range& range) const;
|
|
future<> drop_quarantined_sstables();
|
|
size_t sstables_count() const;
|
|
std::vector<uint64_t> sstable_count_per_level() const;
|
|
int64_t get_unleveled_sstables() const;
|
|
|
|
void start_compaction();
|
|
void trigger_compaction();
|
|
void try_trigger_compaction(compaction_group& cg) noexcept;
|
|
void trigger_logstor_compaction();
|
|
// Triggers offstrategy compaction, if needed, in the background.
|
|
void trigger_offstrategy_compaction();
|
|
// Performs offstrategy compaction, if needed, returning
|
|
// a future<bool> that is resolved when offstrategy_compaction completes.
|
|
// The future value is true iff offstrategy compaction was required.
|
|
future<bool> perform_offstrategy_compaction(tasks::task_info info);
|
|
future<> perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges,
|
|
tasks::task_info info,
|
|
do_flush = do_flush::yes);
|
|
future<unsigned> estimate_pending_compactions() const;
|
|
|
|
void set_compaction_strategy(compaction::compaction_strategy_type strategy);
|
|
const compaction::compaction_strategy& get_compaction_strategy() const {
|
|
return _compaction_strategy;
|
|
}
|
|
|
|
compaction::compaction_strategy& get_compaction_strategy() {
|
|
return _compaction_strategy;
|
|
}
|
|
|
|
const compaction::compaction_manager& get_compaction_manager() const noexcept {
|
|
return _compaction_manager;
|
|
}
|
|
|
|
compaction::compaction_manager& get_compaction_manager() noexcept {
|
|
return _compaction_manager;
|
|
}
|
|
|
|
logstor::segment_manager& get_logstor_segment_manager() noexcept {
|
|
return _logstor->get_segment_manager();
|
|
}
|
|
|
|
const logstor::segment_manager& get_logstor_segment_manager() const noexcept {
|
|
return _logstor->get_segment_manager();
|
|
}
|
|
|
|
logstor::compaction_manager& get_logstor_compaction_manager() noexcept {
|
|
return _logstor->get_compaction_manager();
|
|
}
|
|
|
|
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
|
|
|
|
future<logstor::table_segment_stats> get_logstor_segment_stats() const;
|
|
|
|
table_stats& get_stats() const {
|
|
return _stats;
|
|
}
|
|
|
|
locator::combined_load_stats table_load_stats() const;
|
|
|
|
const db::view::stats& get_view_stats() const {
|
|
return _view_stats;
|
|
}
|
|
|
|
db::view::stats& view_stats() const noexcept {
|
|
return _view_stats;
|
|
}
|
|
|
|
replica::cf_stats* cf_stats() const {
|
|
return _config.cf_stats;
|
|
}
|
|
|
|
const config& get_config() const {
|
|
return _config;
|
|
}
|
|
|
|
cache_temperature get_global_cache_hit_rate() const {
|
|
return _global_cache_hit_rate;
|
|
}
|
|
|
|
bool durable_writes() const {
|
|
return _durable_writes;
|
|
}
|
|
|
|
void set_durable_writes(bool dw) {
|
|
_durable_writes = dw;
|
|
}
|
|
|
|
void set_global_cache_hit_rate(cache_temperature rate) {
|
|
_global_cache_hit_rate = rate;
|
|
}
|
|
|
|
void set_hit_rate(locator::host_id addr, cache_temperature rate);
|
|
cache_hit_rate get_my_hit_rate() const;
|
|
cache_hit_rate get_hit_rate(const gms::gossiper& g, locator::host_id addr);
|
|
void drop_hit_rate(locator::host_id addr);
|
|
|
|
void enable_auto_compaction();
|
|
future<> disable_auto_compaction();
|
|
|
|
void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept;
|
|
|
|
bool tombstone_gc_enabled() const noexcept {
|
|
return _tombstone_gc_enabled;
|
|
}
|
|
|
|
bool is_auto_compaction_disabled_by_user() const {
|
|
return _compaction_disabled_by_user;
|
|
}
|
|
|
|
utils::phased_barrier::operation write_in_progress() {
|
|
return _pending_writes_phaser.start();
|
|
}
|
|
|
|
future<> await_pending_writes() noexcept {
|
|
return _pending_writes_phaser.advance_and_await();
|
|
}
|
|
|
|
size_t writes_in_progress() const {
|
|
return _pending_writes_phaser.operations_in_progress();
|
|
}
|
|
|
|
utils::phased_barrier::operation read_in_progress() {
|
|
return _pending_reads_phaser.start();
|
|
}
|
|
|
|
future<> await_pending_reads() noexcept {
|
|
return _pending_reads_phaser.advance_and_await();
|
|
}
|
|
|
|
size_t reads_in_progress() const {
|
|
return _pending_reads_phaser.operations_in_progress();
|
|
}
|
|
|
|
utils::phased_barrier::operation stream_in_progress() {
|
|
return _pending_streams_phaser.start();
|
|
}
|
|
|
|
future<> await_pending_streams() noexcept {
|
|
return _pending_streams_phaser.advance_and_await();
|
|
}
|
|
|
|
size_t streams_in_progress() const {
|
|
return _pending_streams_phaser.operations_in_progress();
|
|
}
|
|
|
|
future<> await_pending_flushes() noexcept {
|
|
return _pending_flushes_phaser.advance_and_await();
|
|
}
|
|
|
|
future<> await_pending_ops() noexcept {
|
|
return when_all(await_pending_reads(), await_pending_writes(), await_pending_streams(), await_pending_flushes()).discard_result();
|
|
}
|
|
|
|
void add_or_update_view(view_ptr v);
|
|
void remove_view(view_ptr v);
|
|
void clear_views();
|
|
const std::vector<view_ptr>& views() const;
|
|
future<row_locker::lock_holder> push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const;
|
|
future<row_locker::lock_holder> push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const;
|
|
future<row_locker::lock_holder>
|
|
stream_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
|
std::vector<sstables::shared_sstable>& excluded_sstables) const;
|
|
|
|
void add_coordinator_read_latency(utils::estimated_histogram::duration latency);
|
|
std::chrono::milliseconds get_coordinator_read_latency_percentile(double percentile);
|
|
|
|
secondary_index::secondary_index_manager& get_index_manager() {
|
|
return _index_manager;
|
|
}
|
|
|
|
const secondary_index::secondary_index_manager& get_index_manager() const noexcept {
|
|
return _index_manager;
|
|
}
|
|
|
|
sstables::sstables_manager& get_sstables_manager() noexcept {
|
|
return _sstables_manager;
|
|
}
|
|
|
|
const sstables::sstables_manager& get_sstables_manager() const noexcept {
|
|
return _sstables_manager;
|
|
}
|
|
|
|
sstables::sstable_generation_generator& get_sstable_generation_generator() {
|
|
return _sstable_generation_generator;
|
|
}
|
|
|
|
reader_concurrency_semaphore& streaming_read_concurrency_semaphore() {
|
|
return *_config.streaming_read_concurrency_semaphore;
|
|
}
|
|
|
|
reader_concurrency_semaphore& compaction_concurrency_semaphore() {
|
|
return *_config.compaction_concurrency_semaphore;
|
|
}
|
|
|
|
size_t estimate_read_memory_cost() const;
|
|
|
|
void set_eligible_to_write_rejection_on_critical_disk_utilization(bool eligible) {
|
|
_eligible_to_write_rejection_on_critical_disk_utilization = eligible;
|
|
}
|
|
|
|
bool is_eligible_to_write_rejection_on_critical_disk_utilization() const {
|
|
return _eligible_to_write_rejection_on_critical_disk_utilization;
|
|
}
|
|
|
|
private:
|
|
future<row_locker::lock_holder> do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
|
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, query::partition_slice::option_set custom_opts) const;
|
|
std::vector<view_ptr> affected_views(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base, const mutation& update) const;
|
|
|
|
mutable row_locker _row_locker;
|
|
future<row_locker::lock_holder> local_base_lock(
|
|
const schema_ptr& s,
|
|
const dht::decorated_key& pk,
|
|
const query::clustering_row_ranges& rows,
|
|
db::timeout_clock::time_point timeout) const;
|
|
|
|
timer<lowres_clock> _flush_timer;
|
|
|
|
// One does not need to wait on this future if all we are interested in, is
|
|
// initiating the write. The writes initiated here will eventually
|
|
// complete, and the seastar::gate below will make sure they are all
|
|
// completed before we stop() this column_family.
|
|
//
|
|
// But it is possible to synchronously wait for the seal to complete by
|
|
// waiting on this future. This is useful in situations where we want to
|
|
// synchronously flush data to disk.
|
|
//
|
|
// The function never fails.
|
|
// It either succeeds eventually after retrying or aborts.
|
|
future<> seal_active_memtable(compaction_group& cg, flush_permit&&) noexcept;
|
|
|
|
void check_valid_rp(const db::replay_position&) const;
|
|
|
|
void recalculate_tablet_count_stats();
|
|
public:
|
|
|
|
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
|
|
// Testing purposes.
|
|
// to let test classes access calculate_generation_for_new_table
|
|
friend class ::column_family_test;
|
|
friend class ::table_for_tests;
|
|
|
|
friend class distributed_loader;
|
|
friend class table_populator;
|
|
|
|
private:
|
|
timer<> _off_strategy_trigger;
|
|
void do_update_off_strategy_trigger();
|
|
|
|
compaction_group* try_get_compaction_group_with_static_sharding() const;
|
|
public:
|
|
void update_off_strategy_trigger();
|
|
void enable_off_strategy_trigger();
|
|
|
|
compaction::compaction_group_view& try_get_compaction_group_view_with_static_sharding() const;
|
|
// Safely iterate through table states, while performing async operations on them.
|
|
future<> parallel_foreach_compaction_group_view(std::function<future<>(compaction::compaction_group_view&)> action);
|
|
compaction::compaction_group_view& compaction_group_view_for_sstable(const sstables::shared_sstable& sst) const;
|
|
|
|
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
|
// Returns true iff sst was found and erased.
|
|
bool erase_sstable_cleanup_state(const sstables::shared_sstable& sst);
|
|
|
|
// Returns true if the sstable requires cleanup.
|
|
bool requires_cleanup(const sstables::shared_sstable& sst) const;
|
|
|
|
// Returns true if any of the sstables requires cleanup.
|
|
bool requires_cleanup(const sstables::sstable_set& set) const;
|
|
|
|
// Takes snapshot of current storage state (includes memtable and sstables) from
|
|
// all compaction groups that overlap with a given token range. The output is
|
|
// a list of SSTables that represent the snapshot.
|
|
future<utils::chunked_vector<sstables::sstable_files_snapshot>> take_storage_snapshot(dht::token_range tr);
|
|
|
|
// Takes snapshot of current sstable set all compaction groups.
|
|
future<utils::chunked_vector<sstables::shared_sstable>> take_sstable_set_snapshot();
|
|
|
|
// Clones storage of a given tablet. Memtable is flushed first to guarantee that the
|
|
// snapshot (list of sstables) will include all the data written up to the time it was taken.
|
|
// If leave_unsealead is set, all the destination sstables will be left unsealed.
|
|
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
|
|
|
|
tombstone_gc_state get_tombstone_gc_state() const;
|
|
|
|
friend class compaction_group;
|
|
friend class compaction::compaction_task_impl;
|
|
|
|
future<> update_repaired_at_for_merge();
|
|
|
|
future<> clear_being_repaired_for_range(dht::token_range range);
|
|
|
|
future<compaction_reenablers_and_lock_holders> get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
|
const service::frozen_topology_guard& guard, dht::token_range range);
|
|
future<uint64_t> estimated_partitions_in_range(dht::token_range tr) const;
|
|
};
|
|
|
|
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);
|
|
|
|
using user_types_metadata = data_dictionary::user_types_metadata;
|
|
|
|
using keyspace_metadata = data_dictionary::keyspace_metadata;
|
|
|
|
// Encapsulates objects needed to update keyspace schema
|
|
struct keyspace_change {
|
|
lw_shared_ptr<keyspace_metadata> metadata;
|
|
locator::replication_strategy_ptr strategy;
|
|
locator::static_effective_replication_map_ptr erm;
|
|
|
|
const sstring& keyspace_name() const {
|
|
return metadata->name();
|
|
}
|
|
};
|
|
|
|
class keyspace {
|
|
public:
|
|
struct config {
|
|
bool enable_commitlog = true;
|
|
bool enable_disk_reads = true;
|
|
bool enable_disk_writes = true;
|
|
bool enable_cache = true;
|
|
bool enable_incremental_backups = false;
|
|
utils::updateable_value<bool> compaction_enforce_min_threshold{false};
|
|
bool enable_dangerous_direct_import_of_cassandra_counters = false;
|
|
replica::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
|
reader_concurrency_semaphore* streaming_read_concurrency_semaphore;
|
|
reader_concurrency_semaphore* compaction_concurrency_semaphore;
|
|
replica::cf_stats* cf_stats = nullptr;
|
|
seastar::scheduling_group memtable_scheduling_group;
|
|
seastar::scheduling_group memtable_to_cache_scheduling_group;
|
|
seastar::scheduling_group memory_compaction_scheduling_group;
|
|
seastar::scheduling_group streaming_scheduling_group;
|
|
bool enable_metrics_reporting = false;
|
|
size_t view_update_memory_semaphore_limit;
|
|
};
|
|
private:
|
|
locator::replication_strategy_ptr _replication_strategy;
|
|
locator::static_effective_replication_map_ptr _effective_replication_map;
|
|
lw_shared_ptr<keyspace_metadata> _metadata;
|
|
config _config;
|
|
locator::effective_replication_map_factory& _erm_factory;
|
|
|
|
public:
|
|
explicit keyspace(config cfg, locator::effective_replication_map_factory& erm_factory);
|
|
keyspace(const keyspace&) = delete;
|
|
void operator=(const keyspace&) = delete;
|
|
keyspace(keyspace&&) = default;
|
|
|
|
future<> shutdown() noexcept;
|
|
|
|
void apply(keyspace_change kc);
|
|
|
|
/** Note: return by shared pointer value, since the meta data is
|
|
* semi-volatile. I.e. we could do alter keyspace at any time, and
|
|
* boom, it is replaced.
|
|
*/
|
|
lw_shared_ptr<keyspace_metadata> metadata() const;
|
|
|
|
static locator::replication_strategy_ptr create_replication_strategy(
|
|
lw_shared_ptr<keyspace_metadata> metadata,
|
|
const locator::topology& topology);
|
|
future<locator::static_effective_replication_map_ptr> create_static_effective_replication_map(
|
|
locator::replication_strategy_ptr strategy,
|
|
const locator::token_metadata_ptr& tm) const;
|
|
void update_static_effective_replication_map(locator::static_effective_replication_map_ptr erm);
|
|
|
|
data_dictionary::keyspace as_data_dictionary() const;
|
|
|
|
/**
|
|
* This should not really be return by reference, since replication
|
|
* strategy is also volatile in that it could be replaced at "any" time.
|
|
* However, all current uses at least are "instantateous", i.e. does not
|
|
* carry it across a continuation. So it is sort of same for now, but
|
|
* should eventually be refactored.
|
|
*/
|
|
const locator::abstract_replication_strategy& get_replication_strategy() const;
|
|
locator::replication_strategy_ptr get_replication_strategy_ptr() const {
|
|
return _replication_strategy;
|
|
}
|
|
|
|
// Get the keyspace static effective replication map, for non-tablets keyspaces
|
|
locator::static_effective_replication_map_ptr get_static_effective_replication_map() const;
|
|
|
|
bool uses_tablets() const {
|
|
return _replication_strategy->uses_tablets();
|
|
}
|
|
|
|
column_family::config make_column_family_config(const schema& s, const database& db) const;
|
|
void add_or_update_column_family(const schema_ptr& s);
|
|
void add_user_type(const user_type ut);
|
|
void remove_user_type(const user_type ut);
|
|
|
|
bool incremental_backups_enabled() const {
|
|
return _config.enable_incremental_backups;
|
|
}
|
|
|
|
void set_incremental_backups(bool val) {
|
|
_config.enable_incremental_backups = val;
|
|
}
|
|
};
|
|
|
|
using no_such_keyspace = data_dictionary::no_such_keyspace;
|
|
using no_such_column_family = data_dictionary::no_such_column_family;
|
|
|
|
struct database_config {
|
|
seastar::scheduling_group memtable_scheduling_group;
|
|
seastar::scheduling_group memtable_to_cache_scheduling_group; // FIXME: merge with memtable_scheduling_group
|
|
seastar::scheduling_group compaction_scheduling_group;
|
|
seastar::scheduling_group memory_compaction_scheduling_group;
|
|
seastar::scheduling_group statement_scheduling_group;
|
|
seastar::scheduling_group streaming_scheduling_group;
|
|
seastar::scheduling_group gossip_scheduling_group;
|
|
seastar::scheduling_group commitlog_scheduling_group;
|
|
seastar::scheduling_group schema_commitlog_scheduling_group;
|
|
size_t available_memory;
|
|
};
|
|
|
|
struct string_pair_eq {
|
|
using is_transparent = void;
|
|
using spair = std::pair<std::string_view, std::string_view>;
|
|
bool operator()(spair lhs, spair rhs) const;
|
|
};
|
|
|
|
struct counter_update_guard {
|
|
utils::phased_barrier::operation op;
|
|
std::vector<locked_cell> locks;
|
|
};
|
|
|
|
class db_user_types_storage;
|
|
|
|
// Policy for sharded<database>:
|
|
// broadcast metadata writes
|
|
// local metadata reads
|
|
// use table::shard_for_reads()/table::shard_for_writes() for data
|
|
|
|
class database : public peering_sharded_service<database>, qos::qos_configuration_change_subscriber {
|
|
friend class ::database_test_wrapper;
|
|
public:
|
|
enum class table_kind {
|
|
system,
|
|
user,
|
|
};
|
|
|
|
struct drain_progress {
|
|
int32_t total_cfs;
|
|
int32_t remaining_cfs;
|
|
|
|
drain_progress& operator+=(const drain_progress& other) {
|
|
total_cfs += other.total_cfs;
|
|
remaining_cfs += other.remaining_cfs;
|
|
return *this;
|
|
}
|
|
};
|
|
|
|
using ks_cf_t = std::pair<sstring, sstring>;
|
|
using ks_cf_to_uuid_t =
|
|
std::unordered_map<ks_cf_t, table_id, utils::tuple_hash, string_pair_eq>;
|
|
class tables_metadata {
|
|
rwlock _cf_lock;
|
|
std::unordered_map<table_id, lw_shared_ptr<column_family>> _column_families;
|
|
ks_cf_to_uuid_t _ks_cf_to_uuid;
|
|
private:
|
|
void add_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s);
|
|
void remove_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s);
|
|
public:
|
|
size_t size() const noexcept;
|
|
|
|
// write lock is needed during adding or removing table
|
|
future<rwlock::holder> hold_write_lock();
|
|
void add_table(database& db, keyspace& ks, table& cf, schema_ptr s);
|
|
void remove_table(database& db, table& cf) noexcept;
|
|
|
|
table& get_table(table_id id) const;
|
|
table_id get_table_id(const std::pair<std::string_view, std::string_view>& kscf) const;
|
|
lw_shared_ptr<table> get_table_if_exists(table_id id) const;
|
|
table_id get_table_id_if_exists(const std::pair<std::string_view, std::string_view>& kscf) const;
|
|
bool contains(table_id id) const;
|
|
bool contains(std::pair<std::string_view, std::string_view> kscf) const;
|
|
void for_each_table(std::function<void(table_id, lw_shared_ptr<table>)> f) const;
|
|
void for_each_table_id(std::function<void(const ks_cf_t&, table_id)> f) const;
|
|
future<> for_each_table_gently(std::function<future<>(table_id, lw_shared_ptr<table>)> f);
|
|
future<> parallel_for_each_table(std::function<future<>(table_id, lw_shared_ptr<table>)> f);
|
|
const std::unordered_map<table_id, lw_shared_ptr<table>> get_column_families_copy() const;
|
|
|
|
auto filter(std::function<bool(std::pair<table_id, lw_shared_ptr<table>>)> f) const {
|
|
return _column_families | std::views::filter(std::move(f));
|
|
}
|
|
};
|
|
private:
|
|
replica::cf_stats _cf_stats;
|
|
static constexpr size_t max_count_concurrent_reads{100};
|
|
static constexpr size_t max_count_concurrent_view_update_reads{50};
|
|
size_t max_memory_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
|
size_t max_memory_concurrent_view_update_reads() { return _dbcfg.available_memory * 0.01; }
|
|
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
|
|
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 1000; }
|
|
// They're rather heavyweight, so limit more
|
|
static constexpr size_t max_count_streaming_concurrent_reads{10};
|
|
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
|
static constexpr size_t max_count_system_concurrent_reads{10};
|
|
size_t max_memory_system_concurrent_reads() { return _dbcfg.available_memory * 0.02; };
|
|
size_t max_memory_pending_view_updates() const {
|
|
auto ret = _dbcfg.available_memory * 0.1;
|
|
utils::get_local_injector().inject("view_update_limit", [&ret] {
|
|
ret = 250000;
|
|
});
|
|
return ret;
|
|
}
|
|
// Limit of concurrent local view updates for each shard and service level
|
|
static constexpr size_t max_concurrent_local_view_updates{128};
|
|
|
|
struct db_stats {
|
|
uint64_t total_writes = 0;
|
|
uint64_t total_writes_failed = 0;
|
|
uint64_t total_writes_timedout = 0;
|
|
uint64_t total_writes_rate_limited = 0;
|
|
uint64_t total_writes_rejected_due_to_out_of_space_prevention = 0;
|
|
uint64_t total_reads = 0;
|
|
uint64_t total_reads_failed = 0;
|
|
uint64_t total_reads_rate_limited = 0;
|
|
|
|
uint64_t short_data_queries = 0;
|
|
uint64_t short_mutation_queries = 0;
|
|
|
|
uint64_t multishard_query_unpopped_fragments = 0;
|
|
uint64_t multishard_query_unpopped_bytes = 0;
|
|
uint64_t multishard_query_failed_reader_stops = 0;
|
|
uint64_t multishard_query_failed_reader_saves = 0;
|
|
};
|
|
|
|
lw_shared_ptr<db_stats> _stats;
|
|
std::shared_ptr<db_user_types_storage> _user_types;
|
|
std::unique_ptr<cell_locker_stats> _cl_stats;
|
|
|
|
const db::config& _cfg;
|
|
|
|
dirty_memory_manager _system_dirty_memory_manager;
|
|
dirty_memory_manager _dirty_memory_manager;
|
|
|
|
timer<lowres_clock> _dirty_memory_threshold_controller;
|
|
|
|
database_config _dbcfg;
|
|
flush_controller _memtable_controller;
|
|
drain_progress _drain_progress {};
|
|
|
|
|
|
reader_concurrency_semaphore _streaming_concurrency_sem;
|
|
reader_concurrency_semaphore _compaction_concurrency_sem;
|
|
reader_concurrency_semaphore _system_read_concurrency_sem;
|
|
|
|
// The view update read concurrency semaphores used for view updates coming from user writes.
|
|
reader_concurrency_semaphore_group _view_update_read_concurrency_semaphores_group;
|
|
std::unordered_map<scheduling_group, db::timeout_semaphore> _view_update_concurrency_semaphores;
|
|
db::timeout_semaphore _view_update_memory_sem{max_memory_pending_view_updates()};
|
|
|
|
cache_tracker _row_cache_tracker;
|
|
seastar::shared_ptr<db::view::view_update_generator> _view_update_generator;
|
|
|
|
inheriting_concrete_execution_stage<
|
|
future<>,
|
|
database*,
|
|
schema_ptr,
|
|
const frozen_mutation&,
|
|
tracing::trace_state_ptr,
|
|
db::timeout_clock::time_point,
|
|
db::commitlog_force_sync,
|
|
db::per_partition_rate_limit::info> _apply_stage;
|
|
|
|
flat_hash_map<sstring, keyspace> _keyspaces;
|
|
tables_metadata _tables_metadata;
|
|
std::unique_ptr<db::commitlog> _commitlog;
|
|
std::unique_ptr<db::commitlog> _schema_commitlog;
|
|
utils::updateable_value_source<table_schema_version> _version;
|
|
uint32_t _schema_change_count = 0;
|
|
// compaction_manager object is referenced by all column families of a database.
|
|
compaction::compaction_manager& _compaction_manager;
|
|
seastar::metrics::metric_groups _metrics;
|
|
bool _enable_incremental_backups = false;
|
|
uint32_t _critical_disk_utilization_mode_count = 0;
|
|
bool _shutdown = false;
|
|
bool _enable_autocompaction_toggle = false;
|
|
querier_cache _querier_cache;
|
|
|
|
std::unique_ptr<logstor::logstor> _logstor;
|
|
|
|
std::unique_ptr<db::large_data_handler> _large_data_handler;
|
|
std::unique_ptr<db::large_data_handler> _nop_large_data_handler;
|
|
|
|
std::unique_ptr<db::system_table_corrupt_data_handler> _corrupt_data_handler;
|
|
std::unique_ptr<db::nop_corrupt_data_handler> _nop_corrupt_data_handler;
|
|
|
|
std::unique_ptr<sstables::sstables_manager> _user_sstables_manager;
|
|
std::unique_ptr<sstables::sstables_manager> _system_sstables_manager;
|
|
|
|
query::result_memory_limiter _result_memory_limiter;
|
|
|
|
friend db::data_listeners;
|
|
std::unique_ptr<db::data_listeners> _data_listeners;
|
|
|
|
service::migration_notifier& _mnotifier;
|
|
gms::feature_service& _feat;
|
|
std::vector<std::any> _listeners;
|
|
locator::shared_token_metadata& _shared_token_metadata;
|
|
lang::manager& _lang_manager;
|
|
|
|
reader_concurrency_semaphore_group _reader_concurrency_semaphores_group;
|
|
scheduling_group _default_read_concurrency_group;
|
|
noncopyable_function<future<>()> _unsubscribe_qos_configuration_change;
|
|
|
|
utils::cross_shard_barrier _stop_barrier;
|
|
|
|
db::rate_limiter _rate_limiter;
|
|
|
|
serialized_action _update_memtable_flush_static_shares_action;
|
|
utils::observer<float> _memtable_flush_static_shares_observer;
|
|
|
|
db_clock::time_point _all_tables_flushed_at;
|
|
|
|
utils::disk_space_monitor::subscription _out_of_space_subscription;
|
|
|
|
public:
|
|
data_dictionary::database as_data_dictionary() const;
|
|
db::commitlog* commitlog_for(const schema_ptr& schema);
|
|
std::shared_ptr<data_dictionary::user_types_storage> as_user_types_storage() const noexcept;
|
|
const data_dictionary::user_types_storage& user_types() const noexcept;
|
|
future<> init_commitlog();
|
|
future<> init_logstor();
|
|
future<> recover_logstor();
|
|
const gms::feature_service& features() const { return _feat; }
|
|
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
|
|
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);
|
|
|
|
drain_progress get_drain_progress() const noexcept {
|
|
return _drain_progress;
|
|
}
|
|
|
|
future<> drain();
|
|
|
|
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
|
future<> unplug_system_keyspace() noexcept;
|
|
|
|
void plug_view_update_generator(db::view::view_update_generator& generator) noexcept;
|
|
void unplug_view_update_generator() noexcept;
|
|
|
|
private:
|
|
future<> flush_non_system_column_families();
|
|
future<> flush_system_column_families();
|
|
|
|
using system_keyspace = bool_class<struct system_keyspace_tag>;
|
|
future<std::unique_ptr<keyspace>> create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, const locator::token_metadata_ptr& token_metadata, system_keyspace system);
|
|
void setup_metrics();
|
|
void setup_scylla_memory_diagnostics_producer();
|
|
reader_concurrency_semaphore& read_concurrency_sem();
|
|
reader_concurrency_semaphore& view_update_read_concurrency_sem();
|
|
auto sum_read_concurrency_sem_var(std::invocable<reader_concurrency_semaphore&> auto member);
|
|
auto sum_read_concurrency_sem_stat(std::invocable<reader_concurrency_semaphore::stats&> auto stats_member);
|
|
|
|
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info);
|
|
future<> do_apply_many(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
|
|
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
|
|
|
|
future<mutation> read_and_transform_counter_mutation_to_shards(mutation m, column_family& cf, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
|
|
|
|
template<typename Future>
|
|
Future update_write_metrics(Future&& f);
|
|
template<typename Future>
|
|
Future update_write_metrics_if_failed(Future&& f);
|
|
void update_write_metrics_for_timed_out_write();
|
|
void update_write_metrics_for_rejected_writes();
|
|
future<std::unique_ptr<keyspace>> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, const locator::token_metadata_ptr& token_metadata, system_keyspace system);
|
|
void remove(table&) noexcept;
|
|
future<keyspace_change> prepare_update_keyspace(const keyspace& ks, lw_shared_ptr<keyspace_metadata> metadata, const locator::token_metadata_ptr& token_metadata) const;
|
|
static future<> modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func);
|
|
|
|
future<> foreach_reader_concurrency_semaphore(std::function<future<>(reader_concurrency_semaphore&)> func);
|
|
friend class ::sigquit_handler; // wants access to all semaphores to dump diagnostics
|
|
|
|
static future<> set_in_critical_disk_utilization_mode(sharded<database>& sharded_db, bool enabled);
|
|
public:
|
|
bool is_in_critical_disk_utilization_mode() const;
|
|
|
|
void insert_keyspace(std::unique_ptr<keyspace> ks);
|
|
void update_keyspace(std::unique_ptr<keyspace_change> change);
|
|
void drop_keyspace(const sstring& name);
|
|
|
|
static table_schema_version empty_version;
|
|
|
|
query::result_memory_limiter& get_result_memory_limiter() {
|
|
return _result_memory_limiter;
|
|
}
|
|
|
|
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
|
|
|
void enable_autocompaction_toggle() noexcept { _enable_autocompaction_toggle = true; }
|
|
friend class api::autocompaction_toggle_guard;
|
|
|
|
// Load the schema definitions kept in schema tables from disk and initialize in-memory schema data structures
|
|
// (keyspace/table definitions, column mappings etc.)
|
|
future<> parse_system_tables(sharded<service::storage_proxy>&, sharded<db::system_keyspace>&);
|
|
|
|
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
|
|
compaction::compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&,
|
|
const abort_source& abort, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
|
|
database(database&&) = delete;
|
|
~database();
|
|
|
|
cache_tracker& row_cache_tracker() { return _row_cache_tracker; }
|
|
future<> drop_caches() const;
|
|
|
|
void update_version(const table_schema_version& version);
|
|
|
|
const table_schema_version& get_version() const;
|
|
utils::observable<table_schema_version>& observable_schema_version() const { return _version.as_observable(); }
|
|
|
|
db::commitlog* commitlog() const {
|
|
return _commitlog.get();
|
|
}
|
|
db::commitlog* schema_commitlog() const {
|
|
return _schema_commitlog.get();
|
|
}
|
|
replica::cf_stats* cf_stats() {
|
|
return &_cf_stats;
|
|
}
|
|
|
|
seastar::scheduling_group get_gossip_scheduling_group() const { return _dbcfg.gossip_scheduling_group; }
|
|
|
|
compaction::compaction_manager& get_compaction_manager() {
|
|
return _compaction_manager;
|
|
}
|
|
const compaction::compaction_manager& get_compaction_manager() const {
|
|
return _compaction_manager;
|
|
}
|
|
|
|
locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
|
locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); }
|
|
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
|
|
|
lang::manager& lang() noexcept { return _lang_manager; }
|
|
const lang::manager& lang() const noexcept { return _lang_manager; }
|
|
|
|
std::optional<table_id> get_base_table_for_tablet_colocation(const schema& s, const std::unordered_map<table_id, schema_ptr>& new_cfms = {});
|
|
|
|
service::migration_notifier& get_notifier() { return _mnotifier; }
|
|
const service::migration_notifier& get_notifier() const { return _mnotifier; }
|
|
|
|
// Setup in-memory data structures for this table (`table` and, if it doesn't exist yet, `keyspace` object).
|
|
// Create the keyspace data directories if the keyspace wasn't created yet.
|
|
//
|
|
// Note: 'system table' does not necessarily mean it sits in `system` keyspace, it could also be `system_schema`;
|
|
// in general we mean local tables created by the system (not the user).
|
|
future<> create_local_system_table(
|
|
schema_ptr table, bool write_in_user_memory, locator::effective_replication_map_factory&);
|
|
|
|
void init_schema_commitlog();
|
|
|
|
using is_new_cf = bool_class<struct is_new_cf_tag>;
|
|
void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata = nullptr);
|
|
future<> make_column_family_directory(schema_ptr schema);
|
|
future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new);
|
|
|
|
|
|
/* throws no_such_column_family if missing */
|
|
table_id find_uuid(std::string_view ks, std::string_view cf) const;
|
|
table_id find_uuid(const schema_ptr&) const;
|
|
|
|
using created_keyspace_per_shard = std::vector<seastar::foreign_ptr<std::unique_ptr<keyspace>>>;
|
|
using keyspace_change_per_shard = std::vector<seastar::foreign_ptr<std::unique_ptr<keyspace_change>>>;
|
|
|
|
/**
|
|
* Creates a keyspace for a given metadata if it still doesn't exist.
|
|
*
|
|
* @return ready future when the operation is complete
|
|
*/
|
|
static future<created_keyspace_per_shard> prepare_create_keyspace_on_all_shards(sharded<database>& sharded_db, sharded<service::storage_proxy>& proxy, const keyspace_metadata& ksm, const locator::pending_token_metadata& pending_token_metadata);
|
|
/* below, find_keyspace throws no_such_<type> on fail */
|
|
keyspace& find_keyspace(std::string_view name);
|
|
const keyspace& find_keyspace(std::string_view name) const;
|
|
bool has_keyspace(std::string_view name) const;
|
|
void validate_keyspace_update(keyspace_metadata& ksm);
|
|
void validate_new_keyspace(keyspace_metadata& ksm);
|
|
static future<keyspace_change_per_shard> prepare_update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm, const locator::pending_token_metadata& pending_token_metadata);
|
|
std::vector<sstring> get_non_system_keyspaces() const;
|
|
std::vector<sstring> get_user_keyspaces() const;
|
|
std::vector<sstring> get_all_keyspaces() const;
|
|
std::vector<sstring> get_non_local_strategy_keyspaces() const;
|
|
std::vector<sstring> get_non_local_vnode_based_strategy_keyspaces() const;
|
|
// All static_effective_replication_map_ptr must hold a vnode_effective_replication_map
|
|
std::unordered_map<sstring, locator::static_effective_replication_map_ptr> get_non_local_strategy_keyspaces_erms() const;
|
|
std::vector<sstring> get_tablets_keyspaces() const;
|
|
column_family& find_column_family(std::string_view ks, std::string_view name);
|
|
const column_family& find_column_family(std::string_view ks, std::string_view name) const;
|
|
column_family& find_column_family(const table_id&);
|
|
const column_family& find_column_family(const table_id&) const;
|
|
column_family& find_column_family(const schema_ptr&);
|
|
const column_family& find_column_family(const schema_ptr&) const;
|
|
bool column_family_exists(const table_id& uuid) const;
|
|
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const;
|
|
schema_ptr find_schema(const table_id&) const;
|
|
bool has_schema(std::string_view ks_name, std::string_view cf_name) const;
|
|
std::set<sstring> existing_index_names(const sstring& ks_name, const sstring& cf_to_exclude = sstring()) const;
|
|
sstring get_available_index_name(const sstring& ks_name, const sstring& cf_name,
|
|
std::optional<sstring> index_name_root) const;
|
|
schema_ptr find_indexed_table(const sstring& ks_name, const sstring& index_name) const;
|
|
/// Revert the system read concurrency to the normal value.
|
|
///
|
|
/// When started the database uses a higher initial concurrency for system
|
|
/// reads, to speed up startup. After startup this should be reverted to
|
|
/// the normal concurrency.
|
|
void revert_initial_system_read_concurrency_boost();
|
|
future<> start(sharded<qos::service_level_controller>&, utils::disk_space_monitor* dsm);
|
|
future<> shutdown();
|
|
future<> stop();
|
|
future<> close_tables(table_kind kind_to_close);
|
|
|
|
/// Checks whether per-partition rate limit can be applied to the operation or not.
|
|
bool can_apply_per_partition_rate_limit(const schema& s, db::operation_type op_type) const;
|
|
|
|
/// Tries to account given operation to the rate limit when the coordinator is a replica.
|
|
/// This function can be called ONLY when rate limiting can be applied to the operation (see `can_apply_per_partition_rate_limit`)
|
|
/// AND the current node/shard is a replica for the given operation.
|
|
///
|
|
/// nullopt -> the decision should be delegated to replicas
|
|
/// can_proceed::no -> operation should be rejected
|
|
/// can_proceed::yes -> operation should be accepted
|
|
std::optional<db::rate_limiter::can_proceed> account_coordinator_operation_to_rate_limit(table& tbl, const dht::token& token,
|
|
db::per_partition_rate_limit::account_and_enforce account_and_enforce_info,
|
|
db::operation_type op_type);
|
|
|
|
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> query(schema_ptr query_schema, const query::read_command& cmd, query::result_options opts,
|
|
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
|
|
db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
|
|
future<std::tuple<reconcilable_result, cache_temperature>> query_mutations(schema_ptr query_schema, const query::read_command& cmd, const dht::partition_range& range,
|
|
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, bool tombstone_gc_enabled = true);
|
|
// Apply the mutation atomically.
|
|
// Throws timed_out_error when timeout is reached.
|
|
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
|
|
// Apply mutations atomically.
|
|
// On restart, either all mutations will be replayed or none of them.
|
|
// All mutations must belong to the same commitlog domain.
|
|
// All mutations must be owned by the current shard.
|
|
// Mutations may be partially visible to reads during the call.
|
|
// Mutations may be partially visible to reads until restart on exception (FIXME).
|
|
future<> apply(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
|
|
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
|
|
|
|
future<counter_update_guard> acquire_counter_locks(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
|
future<mutation> prepare_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
|
future<> apply_counter_update(schema_ptr, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
|
|
|
const sstring& get_snitch_name() const;
|
|
/*!
|
|
* \brief clear snapshot based on a tag
|
|
* The clear_snapshot method deletes specific or multiple snapshots
|
|
* You can specify:
|
|
* tag - The snapshot tag (the one that was used when creating the snapshot) if not specified
|
|
* All snapshot will be deleted
|
|
* keyspace_names - a vector of keyspace names that will be deleted, if empty all keyspaces
|
|
* will be deleted.
|
|
* table_name - A name of a specific table inside the keyspace, if empty all tables will be deleted.
|
|
*/
|
|
future<> clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, const sstring& table_name);
|
|
|
|
using snapshot_details = db::snapshot_ctl::db_snapshot_details;
|
|
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
|
|
|
friend std::ostream& operator<<(std::ostream& out, const database& db);
|
|
const flat_hash_map<sstring, keyspace>& get_keyspaces() const {
|
|
return _keyspaces;
|
|
}
|
|
|
|
flat_hash_map<sstring, keyspace>& get_keyspaces() {
|
|
return _keyspaces;
|
|
}
|
|
|
|
const tables_metadata& get_tables_metadata() const {
|
|
return _tables_metadata;
|
|
}
|
|
|
|
tables_metadata& get_tables_metadata() {
|
|
return _tables_metadata;
|
|
}
|
|
|
|
std::vector<lw_shared_ptr<column_family>> get_non_system_column_families() const;
|
|
|
|
std::vector<view_ptr> get_views() const;
|
|
|
|
const db::config& get_config() const {
|
|
return _cfg;
|
|
}
|
|
const db::extensions& extensions() const;
|
|
|
|
sstables::sstables_manager& get_user_sstables_manager() const noexcept {
|
|
SCYLLA_ASSERT(_user_sstables_manager);
|
|
return *_user_sstables_manager;
|
|
}
|
|
|
|
sstables::sstables_manager& get_system_sstables_manager() const noexcept {
|
|
SCYLLA_ASSERT(_system_sstables_manager);
|
|
return *_system_sstables_manager;
|
|
}
|
|
|
|
sstables::sstables_manager& get_sstables_manager(system_keyspace is_sys_ks) const noexcept {
|
|
return is_sys_ks ? get_system_sstables_manager() : get_user_sstables_manager();
|
|
}
|
|
|
|
sstables::sstables_manager& get_sstables_manager(const schema& s) const;
|
|
|
|
// Returns the list of ranges held by this endpoint
|
|
// The returned list is sorted, and its elements are non overlapping and non wrap-around.
|
|
future<dht::token_range_vector> get_keyspace_local_ranges(locator::static_effective_replication_map_ptr erm);
|
|
|
|
future<> flush_all_memtables();
|
|
future<> flush(const sstring& ks, const sstring& cf);
|
|
// flush a table identified by the given id on all shards.
|
|
static future<> flush_table_on_all_shards(sharded<database>& sharded_db, table_id id);
|
|
// flush a single table in a keyspace on all shards.
|
|
static future<> flush_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::string_view table_name);
|
|
// flush a list of tables in a keyspace on all shards.
|
|
static future<> flush_tables_on_all_shards(sharded<database>& sharded_db, std::vector<table_info> tables);
|
|
// flush all tables in a keyspace on all shards.
|
|
static future<> flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
|
// flush all tables on this shard.
|
|
// Note: force_new_active_segment in the commitlog, so that
|
|
// flushing all tables will allow reclaiming of all commitlog segments
|
|
future<> flush_all_tables();
|
|
// a wrapper around flush_all_tables, allowing the caller to express intent more clearly
|
|
future<> flush_commitlog() { return flush_all_tables(); }
|
|
|
|
static future<> trigger_logstor_compaction_on_all_shards(sharded<database>& sharded_db, bool major);
|
|
void trigger_logstor_compaction(bool major);
|
|
static future<> flush_logstor_separator_on_all_shards(sharded<database>& sharded_db);
|
|
future<> flush_logstor_separator(std::optional<size_t> seq_num = std::nullopt);
|
|
future<logstor::table_segment_stats> get_logstor_table_segment_stats(table_id table) const;
|
|
size_t get_logstor_memory_usage() const;
|
|
|
|
static future<db_clock::time_point> get_all_tables_flushed_at(sharded<database>& sharded_db);
|
|
|
|
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
|
|
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
|
|
|
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
|
|
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
|
|
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
|
|
|
|
public:
|
|
bool update_column_family(schema_ptr s);
|
|
private:
|
|
keyspace::config make_keyspace_config(const keyspace_metadata& ksm, system_keyspace is_system);
|
|
struct table_truncate_state;
|
|
|
|
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
|
|
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, const global_table_ptr&, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt);
|
|
future<> truncate(db::system_keyspace& sys_ks, column_family& cf, std::vector<lw_shared_ptr<replica::table>>& views, const table_truncate_state&);
|
|
public:
|
|
/** Truncates the given column family */
|
|
// If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear.
|
|
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt = {}, bool with_snapshot = true, std::optional<sstring> snapshot_name_opt = {});
|
|
|
|
static future<tables_metadata_lock_on_all_shards> lock_tables_metadata(sharded<database>& sharded_db);
|
|
|
|
// Drops the table and removes the table directory if there are no snapshots,
|
|
// it's executed in 3 steps: prepare, drop and cleanup so that the middle step
|
|
// (actual drop) could be atomic.
|
|
static future<global_table_ptr> prepare_drop_table_on_all_shards(sharded<database>& sharded_db, table_id uuid);
|
|
// drop_table should be called on all shards
|
|
static void drop_table(sharded<database>& sharded_db,
|
|
sstring ks_name, sstring cf_name,
|
|
bool with_snapshot, global_table_ptr& table_shards);
|
|
static future<> cleanup_drop_table_on_all_shards(sharded<database>& sharded_db,
|
|
sharded<db::system_keyspace>& sys_ks,
|
|
bool with_snapshot, global_table_ptr& table_shards);
|
|
static future<> legacy_drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
|
|
sstring ks_name, sstring cf_name,
|
|
bool with_snapshot = true);
|
|
|
|
const dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
|
|
return _dirty_memory_manager.region_group();
|
|
}
|
|
|
|
db_stats& get_stats() {
|
|
return *_stats;
|
|
}
|
|
|
|
void set_querier_cache_entry_ttl(std::chrono::seconds entry_ttl) {
|
|
_querier_cache.set_entry_ttl(entry_ttl);
|
|
}
|
|
|
|
const querier_cache::stats& get_querier_cache_stats() const {
|
|
return _querier_cache.get_stats();
|
|
}
|
|
|
|
querier_cache& get_querier_cache() {
|
|
return _querier_cache;
|
|
}
|
|
|
|
db::view::update_backlog get_view_update_backlog() const {
|
|
return {max_memory_pending_view_updates() - _view_update_memory_sem.current(), max_memory_pending_view_updates()};
|
|
}
|
|
|
|
db::data_listeners& data_listeners() const {
|
|
return *_data_listeners;
|
|
}
|
|
|
|
// Get the maximum result size for a query, appropriate for the
|
|
// query class, which is deduced from the current scheduling group.
|
|
query::max_result_size get_query_max_result_size() const;
|
|
|
|
// Get the reader concurrency semaphore, appropriate for the query class,
|
|
// which is deduced from the current scheduling group.
|
|
reader_concurrency_semaphore& get_reader_concurrency_semaphore();
|
|
|
|
// Convenience method to obtain an admitted permit. See reader_concurrency_semaphore::obtain_permit().
|
|
future<reader_permit> obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
|
future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
|
|
|
bool is_internal_query() const;
|
|
bool is_user_semaphore(const reader_concurrency_semaphore& semaphore) const;
|
|
|
|
db::timeout_semaphore& get_view_update_concurrency_sem();
|
|
|
|
db::timeout_semaphore& view_update_memory_sem() {
|
|
return _view_update_memory_sem;
|
|
}
|
|
|
|
future<> clear_inactive_reads_for_tablet(table_id table, dht::token_range tablet_range);
|
|
|
|
/** This callback is going to be called just before the service level is available **/
|
|
virtual future<> on_before_service_level_add(qos::service_level_options slo, qos::service_level_info sl_info) override;
|
|
/** This callback is going to be called just after the service level is removed **/
|
|
virtual future<> on_after_service_level_remove(qos::service_level_info sl_info) override;
|
|
/** This callback is going to be called just before the service level is changed **/
|
|
virtual future<> on_before_service_level_change(qos::service_level_options slo_before, qos::service_level_options slo_after, qos::service_level_info sl_info) override;
|
|
virtual future<> on_effective_service_levels_cache_reloaded() override;
|
|
|
|
// Returns true if RF-rack-validity must be enforced for the given keyspace and config, and false otherwise.
|
|
static bool enforce_rf_rack_validity_for_keyspace(const db::config& cfg, const keyspace_metadata& ksm);
|
|
|
|
// Returns true if RF-rack-validity must be enforced for the given keyspace, and false otherwise.
|
|
bool enforce_rf_rack_validity_for_keyspace(const keyspace& ks) const {
|
|
return enforce_rf_rack_validity_for_keyspace(_cfg, *ks.metadata());
|
|
}
|
|
|
|
// Verify that the existing keyspaces are all RF-rack-valid.
|
|
// Throws an exception or prints a warning depending on whether RF-rack-validity must be enforced for the keyspace
|
|
// as determined by `enforce_rf_rack_validity_for_keyspace`.
|
|
//
|
|
// Result:
|
|
// * throws an std::invalid_argument exception with a relevant message if there is a keyspace that violates RF-rack-validity
|
|
// and RF-rack-validity must be enforced for that keyspace.
|
|
// * Otherwise, a warning will be printed for all keyspaces that are not RF-rack-valid but not
|
|
// enforced, and no exception should be produced.
|
|
//
|
|
// Preconditions:
|
|
// * the `locator::topology` instance corresponding to the passed `locator::token_metadata_ptr`
|
|
// must contain a complete list of racks and data centers in the cluster.
|
|
void check_rf_rack_validity(const locator::token_metadata_ptr) const;
|
|
|
|
// Verifies whether all keyspaces that require RF-rack-validity (as determined by enforce_rf_rack_validity_for_keyspace)
|
|
// would remain RF-rack-valid after applying a topology change.
|
|
//
|
|
// Returns true if all such keyspaces would remain RF-rack-valid after the change, and false otherwise.
|
|
// For keyspaces that would become RF-rack-invalid but do not require enforcement,
|
|
// a warning is printed, but this does not affect the return value.
|
|
//
|
|
// Preconditions:
|
|
// * The provided locator::topology instance (from the passed locator::token_metadata_ptr)
|
|
// must contain a complete list of racks and data centers in the cluster.
|
|
bool check_rf_rack_validity_with_topology_change(locator::token_metadata_ptr, locator::rf_rack_topology_operation) const;
|
|
|
|
// Verify that all tablet keyspaces have a rack list configured.
|
|
//
|
|
// Result:
|
|
// * If `enforce_rack_list`, throws an exception with a relevant message
|
|
// if there is a tablet keyspace that uses numeric replication factors.
|
|
// * If not `enforce_rack_list`, a warning will be printed for all keyspaces
|
|
// that use numeric replication factors, but no exception should be thrown.
|
|
void check_rack_list_everywhere(const bool enforce_rack_list) const;
|
|
|
|
private:
|
|
// SSTable sampling might require considerable amounts of memory,
|
|
// so we want to limit the number of concurrent sampling operations.
|
|
//
|
|
// The `sharded` semaphore serializes the number of SSTable sampling operations
|
|
// for which this shard is the coordinator.
|
|
// The `local` semaphore serializes the number of SSTable sampling operations
|
|
// in which this shard is a participant.
|
|
size_t _memory_for_data_file_samples = 16*1024*1024;
|
|
semaphore _sample_data_files_memory_limiter{_memory_for_data_file_samples};
|
|
semaphore _sample_data_files_local_concurrency_limiter{1};
|
|
public:
|
|
// Returns a vector of file chunks randomly sampled from all Data.db files of this table.
|
|
future<utils::chunked_vector<temporary_buffer<char>>> sample_data_files(
|
|
table_id id,
|
|
uint64_t chunk_size,
|
|
uint64_t n_chunks
|
|
);
|
|
};
|
|
|
|
// A helper function to parse the directory name back
|
|
// into name and uuid of the table (see init_table_storage())
|
|
std::pair<sstring, table_id> parse_table_directory_name(const sstring&);
|
|
|
|
} // namespace replica
|
|
|
|
future<> start_large_data_handler(sharded<replica::database>& db);
|
|
|
|
// Creates a streaming reader that reads from all shards.
|
|
//
|
|
// Shard readers are created via `table::make_streaming_reader()`.
|
|
// Range generator must generate disjoint, monotonically increasing ranges.
|
|
// Opt-in for compacting the output by passing `compaction_time`, see
|
|
// make_streaming_reader() for more details.
|
|
// Setting multishard_reader_buffer_size enables the multishard reader's buffer
|
|
// size optimization (see make_multishard_combining_reader()), using the
|
|
// given size.
|
|
mutation_reader make_multishard_streaming_reader(
|
|
sharded<replica::database>& db,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
std::function<std::optional<dht::partition_range>()> range_generator,
|
|
gc_clock::time_point compaction_time,
|
|
std::optional<size_t> multishard_reader_buffer_size,
|
|
read_ahead read_ahead);
|
|
|
|
mutation_reader make_multishard_streaming_reader(
|
|
sharded<replica::database>& db,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
gc_clock::time_point compaction_time,
|
|
std::optional<size_t> multishard_reader_buffer_size,
|
|
read_ahead read_ahead);
|
|
|
|
bool is_internal_keyspace(std::string_view name);
|
|
|
|
class streaming_reader_lifecycle_policy
|
|
: public reader_lifecycle_policy
|
|
, public enable_shared_from_this<streaming_reader_lifecycle_policy> {
|
|
|
|
template <typename T>
|
|
using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
|
|
|
|
struct reader_context {
|
|
foreign_ptr<lw_shared_ptr<const dht::partition_range>> range;
|
|
foreign_unique_ptr<utils::phased_barrier::operation> read_operation;
|
|
reader_concurrency_semaphore* semaphore;
|
|
};
|
|
sharded<replica::database>& _db;
|
|
table_id _table_id;
|
|
gc_clock::time_point _compaction_time;
|
|
std::vector<reader_context> _contexts;
|
|
public:
|
|
streaming_reader_lifecycle_policy(sharded<replica::database>& db, table_id table_id, gc_clock::time_point compaction_time)
|
|
: _db(db)
|
|
, _table_id(table_id)
|
|
, _compaction_time(compaction_time)
|
|
, _contexts(smp::count) {
|
|
}
|
|
virtual mutation_reader create_reader(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr,
|
|
mutation_reader::forwarding fwd_mr) override {
|
|
const auto shard = this_shard_id();
|
|
auto& cf = _db.local().find_column_family(schema);
|
|
|
|
_contexts[shard].range = make_foreign(make_lw_shared<const dht::partition_range>(range));
|
|
_contexts[shard].read_operation = make_foreign(std::make_unique<utils::phased_barrier::operation>(cf.read_in_progress()));
|
|
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
|
|
|
|
return cf.make_streaming_reader(std::move(schema), std::move(permit), *_contexts[shard].range, slice, fwd_mr, _compaction_time);
|
|
}
|
|
virtual const dht::partition_range* get_read_range() const override {
|
|
const auto shard = this_shard_id();
|
|
return _contexts[shard].range.get();
|
|
}
|
|
virtual void update_read_range(lw_shared_ptr<const dht::partition_range> range) override {
|
|
const auto shard = this_shard_id();
|
|
_contexts[shard].range = make_foreign(std::move(range));
|
|
}
|
|
virtual future<> destroy_reader(stopped_reader reader) noexcept override {
|
|
auto ctx = std::move(_contexts[this_shard_id()]);
|
|
auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(reader.handle));
|
|
if (!reader_opt) {
|
|
return make_ready_future<>();
|
|
}
|
|
return reader_opt->close().finally([ctx = std::move(ctx)] {});
|
|
}
|
|
virtual reader_concurrency_semaphore& semaphore() override {
|
|
const auto shard = this_shard_id();
|
|
if (!_contexts[shard].semaphore) {
|
|
auto& cf = _db.local().find_column_family(_table_id);
|
|
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
|
|
}
|
|
return *_contexts[shard].semaphore;
|
|
}
|
|
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
|
auto& cf = _db.local().find_column_family(_table_id);
|
|
return semaphore().obtain_permit(schema, description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
|
}
|
|
};
|