repair: Get rid of the gc_grace_seconds

The gc_grace_seconds is a very fragile and broken design inherited from
Cassandra. Deleted data can be resurrected if cluster wide repair is not
performed within gc_grace_seconds. This design pushes the job of making
the database consistency to the user. In practice, it is very hard to
guarantee repair is performed within gc_grace_seconds all the time. For
example, repair workload has the lowest priority in the system which can
be slowed down by the higher priority workload, so that there is no
guarantee when a repair can finish. A gc_grace_seconds value that is
used to work might not work after data volume grows in a cluster. Users
might want to avoid running repair during a specific period where
latency is the top priority for their business.

To solve this problem, an automatic mechanism to protect data
resurrection is proposed and implemented. The main idea is to remove the
tombstone only after the range that covers the tombstone is repaired.

In this patch, a new table option tombstone_gc is added. The option is
used to configure tombstone gc mode. For example:

1) GC a tombstone after gc_grace_seconds

cqlsh> ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'timeout'} ;

This is the default mode. If no tombstone_gc option is specified by the
user. The old gc_grace_seconds based gc will be used.

2) Never GC a tombstone

cqlsh> ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'disabled'};

3) GC a tombstone immediately

cqlsh> ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'immediate'};

4) GC a tombstone after repair

cqlsh> ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'repair'};

In addition to the 'mode' option, another option 'propagation_delay_in_seconds'
is added. It defines the max time a write could possibly delay before it
eventually arrives at a node.

A new gossip feature TOMBSTONE_GC_OPTIONS is added. The new tombstone_gc
option can only be used after the whole cluster supports the new
feature. A mixed cluster works with no problem.

Tests: compaction_test.py, ninja test

Fixes #3560

[avi: resolve conflicts vs data_dictionary]
This commit is contained in:
Asias He
2021-09-13 13:47:42 +08:00
committed by Avi Kivity
parent 5eccb42846
commit a8ad385ecd
59 changed files with 1055 additions and 104 deletions

View File

@@ -537,6 +537,8 @@ set(scylla_sources
raft/tracker.cc
range_tombstone.cc
range_tombstone_list.cc
tombstone_gc_options.cc
tombstone_gc.cc
reader_concurrency_semaphore.cc
redis/abstract_command.cc
redis/command_factory.cc

View File

@@ -76,6 +76,7 @@
#include "utils/UUID_gen.hh"
#include "utils/utf8.hh"
#include "utils/fmt-compat.hh"
#include "tombstone_gc.hh"
namespace sstables {
@@ -643,7 +644,7 @@ private:
void setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(make_sstable_set_for_input());
formatted_sstables_list formatted_msg;
auto fully_expired = _table_s.fully_expired_sstables(_sstables);
auto fully_expired = _table_s.fully_expired_sstables(_sstables, gc_clock::now());
min_max_tracker<api::timestamp_type> timestamp_tracker;
for (auto& sst : _sstables) {
@@ -733,7 +734,6 @@ private:
max_purgeable_func(),
get_compacted_fragments_writer(),
noop_compacted_fragments_consumer());
reader.consume_in_thread(std::move(cfc));
});
});
@@ -1746,7 +1746,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, compaction_data& cd
}
std::unordered_set<sstables::shared_sstable>
get_fully_expired_sstables(const table_state& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point gc_before) {
get_fully_expired_sstables(const table_state& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point compaction_time) {
clogger.debug("Checking droppable sstables in {}.{}", table_s.schema()->ks_name(), table_s.schema()->cf_name());
if (compacting.empty()) {
@@ -1760,6 +1760,7 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
int64_t min_timestamp = std::numeric_limits<int64_t>::max();
for (auto& sstable : overlapping) {
auto gc_before = sstable->get_gc_before_for_fully_expire(compaction_time);
if (sstable->get_max_local_deletion_time() >= gc_before) {
min_timestamp = std::min(min_timestamp, sstable->get_stats_metadata().min_timestamp);
}
@@ -1778,6 +1779,7 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
// SStables that do not contain live data is added to list of possibly expired sstables.
for (auto& candidate : compacting) {
auto gc_before = candidate->get_gc_before_for_fully_expire(compaction_time);
clogger.debug("Checking if candidate of generation {} and max_deletion_time {} is expired, gc_before is {}",
candidate->generation(), candidate->get_stats_metadata().max_local_deletion_time, gc_before);
// A fully expired sstable which has an ancestor undeleted shouldn't be compacted because
@@ -1798,11 +1800,12 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
if (candidate->get_stats_metadata().max_timestamp >= min_timestamp) {
it = candidates.erase(it);
} else {
clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={}, gcBefore={})",
candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time, gc_before);
clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={})",
candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time);
it++;
}
}
clogger.debug("Checking droppable sstables in {}.{}, candidates={}", table_s.schema()->ks_name(), table_s.schema()->cf_name(), candidates.size());
return candidates;
}

View File

@@ -68,7 +68,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s
return compaction_descriptor(std::move(candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before) {
bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time) {
if (_disable_tombstone_compaction) {
return false;
}
@@ -79,6 +79,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
if (db_clock::now()-_tombstone_compaction_interval < sst->data_file_write_time()) {
return false;
}
auto gc_before = sst->get_gc_before_for_drop_estimation(compaction_time);
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
@@ -421,20 +422,20 @@ time_window_compaction_strategy::time_window_compaction_strategy(const std::map<
} // namespace sstables
std::vector<sstables::shared_sstable>
date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point gc_before) {
date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point compaction_time) {
if (table_s.get_sstable_set().all()->empty()) {
return {};
}
// Find fully expired SSTables. Those will be included no matter what.
auto expired = table_s.fully_expired_sstables(uncompacting);
auto expired = table_s.fully_expired_sstables(uncompacting, compaction_time);
if (!expired.empty()) {
auto is_expired = [&] (const sstables::shared_sstable& s) { return expired.contains(s); };
uncompacting.erase(boost::remove_if(uncompacting, is_expired), uncompacting.end());
}
auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, gc_before);
auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, compaction_time);
if (!expired.empty()) {
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
}
@@ -464,7 +465,7 @@ int64_t date_tiered_manifest::get_estimated_tasks(table_state& table_s) const {
}
std::vector<sstables::shared_sstable>
date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point gc_before) {
date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point compaction_time) {
int base = table_s.schema()->min_compaction_threshold();
int64_t now = get_now(table_s.get_sstable_set().all());
auto most_interesting = get_compaction_candidates(table_s, non_expiring_sstables, now, base);
@@ -582,8 +583,8 @@ date_tiered_compaction_strategy::date_tiered_compaction_strategy(const std::map<
}
compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) {
auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
auto sstables = _manifest.get_next_sstables(table_s, candidates, gc_before);
auto compaction_time = gc_clock::now();
auto sstables = _manifest.get_next_sstables(table_s, candidates, compaction_time);
if (!sstables.empty()) {
date_tiered_manifest::logger.debug("datetiered: Compacting {} out of {} sstables", sstables.size(), candidates.size());
@@ -591,8 +592,8 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti
}
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
auto e = boost::range::remove_if(candidates, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
auto e = boost::range::remove_if(candidates, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time);
});
candidates.erase(e, candidates.end());
if (candidates.empty()) {

View File

@@ -74,7 +74,7 @@ public:
// Check if a given sstable is entitled for tombstone compaction based on its
// droppable tombstone histogram and gc_before.
bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before);
bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time);
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;

View File

@@ -112,12 +112,12 @@ public:
: _options(options) {}
std::vector<sstables::shared_sstable>
get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point gc_before);
get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point compaction_time);
int64_t get_estimated_tasks(table_state& table_s) const;
private:
std::vector<sstables::shared_sstable>
get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point gc_before);
get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point compaction_time);
std::vector<sstables::shared_sstable>
get_compaction_candidates(table_state& table_s, std::vector<sstables::shared_sstable> candidate_sstables, int64_t now, int base);

View File

@@ -48,19 +48,21 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(t
// unlike stcs, lcs can look for sstable with highest droppable tombstone ratio, so as not to choose
// a sstable which droppable data shadow data in older sstable, by starting from highest levels which
// theoretically contain oldest non-overlapping data.
auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
auto compaction_time = gc_clock::now();
for (auto level = int(manifest.get_level_count()); level >= 0; level--) {
auto& sstables = manifest.get_level(level);
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time);
});
sstables.erase(e, sstables.end());
if (sstables.empty()) {
continue;
}
auto& sst = *std::max_element(sstables.begin(), sstables.end(), [&] (auto& i, auto& j) {
return i->estimate_droppable_tombstone_ratio(gc_before) < j->estimate_droppable_tombstone_ratio(gc_before);
auto gc_before1 = i->get_gc_before_for_drop_estimation(compaction_time);
auto gc_before2 = j->get_gc_before_for_drop_estimation(compaction_time);
return i->estimate_droppable_tombstone_ratio(gc_before1) < j->estimate_droppable_tombstone_ratio(gc_before2);
});
return sstables::compaction_descriptor({ sst }, table_s.get_sstable_set(), service::get_local_compaction_priority(), sst->get_sstable_level());
}

View File

@@ -161,7 +161,7 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
// make local copies so they can't be changed out from under us mid-method
int min_threshold = table_s.min_compaction_threshold();
int max_threshold = table_s.schema()->max_compaction_threshold();
auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
auto compaction_time = gc_clock::now();
// TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables).
@@ -184,8 +184,8 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
// tombstone purge, i.e. less likely to shadow even older data.
for (auto&& sstables : buckets | boost::adaptors::reversed) {
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time);
});
sstables.erase(e, sstables.end());
if (sstables.empty()) {

View File

@@ -42,7 +42,7 @@ public:
virtual unsigned min_compaction_threshold() const noexcept = 0;
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
virtual const sstables::sstable_set& get_sstable_set() const = 0;
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const = 0;
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;

View File

@@ -227,7 +227,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
compaction_descriptor
time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidates) {
auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
auto compaction_time = gc_clock::now();
if (candidates.empty()) {
return compaction_descriptor();
@@ -238,7 +238,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = table_s.fully_expired_sstables(candidates);
expired = table_s.fully_expired_sstables(candidates, compaction_time);
_last_expired_check = db_clock::now();
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
@@ -249,7 +249,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), gc_before);
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
return compaction_descriptor(std::move(compaction_candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
@@ -270,7 +270,7 @@ time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timesta
std::vector<shared_sstable>
time_window_compaction_strategy::get_next_non_expired_sstables(table_state& table_s, strategy_control& control,
std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time) {
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables);
if (!most_interesting.empty()) {
@@ -279,8 +279,8 @@ time_window_compaction_strategy::get_next_non_expired_sstables(table_state& tabl
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
auto e = boost::range::remove_if(non_expiring_sstables, [this, compaction_time] (const shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time);
});
non_expiring_sstables.erase(e, non_expiring_sstables.end());
if (non_expiring_sstables.empty()) {

View File

@@ -139,7 +139,7 @@ private:
compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const;
std::vector<shared_sstable>
get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before);
get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time);
std::vector<shared_sstable> get_compaction_candidates(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidate_sstables);
public:

View File

@@ -989,6 +989,8 @@ scylla_core = (['database.cc',
'table_helper.cc',
'range_tombstone.cc',
'range_tombstone_list.cc',
'tombstone_gc_options.cc',
'tombstone_gc.cc',
'utils/disk-error-handler.cc',
'duration.cc',
'vint-serialization.cc',

View File

@@ -349,7 +349,7 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
{
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
_properties->validate(db, schema_extensions);
_properties->validate(db, keyspace(), schema_extensions);
if (!cf.views().empty() && _properties->get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(

View File

@@ -90,7 +90,7 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
}
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
_properties->validate(db, schema_extensions);
_properties->validate(db, keyspace(), schema_extensions);
auto builder = schema_builder(schema);
_properties->apply_to_builder(builder, std::move(schema_extensions));

View File

@@ -46,6 +46,8 @@
#include "cdc/cdc_extension.hh"
#include "gms/feature.hh"
#include "gms/feature_service.hh"
#include "tombstone_gc_extension.hh"
#include "tombstone_gc.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -94,7 +96,7 @@ schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions
return er;
}
void cf_prop_defs::validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const {
void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const {
// Skip validation if the comapction strategy class is already set as it means we've alreayd
// prepared (and redoing it would set strategyClass back to null, which we don't want)
if (_compaction_strategy_class) {
@@ -156,6 +158,9 @@ void cf_prop_defs::validate(const data_dictionary::database db, const schema::ex
throw exceptions::configuration_exception("CDC not supported by the cluster");
}
auto tombstone_gc_options = get_tombstone_gc_options(schema_extensions);
validate_tombstone_gc_options(tombstone_gc_options, db.real_database(), ks_name);
validate_minimum_int(KW_DEFAULT_TIME_TO_LIVE, 0, DEFAULT_DEFAULT_TIME_TO_LIVE);
validate_minimum_int(KW_PAXOSGRACESECONDS, 0, DEFAULT_GC_GRACE_SECONDS);
@@ -235,6 +240,16 @@ const cdc::options* cf_prop_defs::get_cdc_options(const schema::extensions_map&
return &cdc_ext->get_options();
}
const tombstone_gc_options* cf_prop_defs::get_tombstone_gc_options(const schema::extensions_map& schema_exts) const {
auto it = schema_exts.find(tombstone_gc_extension::NAME);
if (it == schema_exts.end()) {
return nullptr;
}
auto ext = dynamic_pointer_cast<tombstone_gc_extension>(it->second);
return &ext->get_options();
}
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));

View File

@@ -51,6 +51,8 @@ namespace data_dictionary {
class database;
}
class tombstone_gc_options;
namespace db {
class extensions;
}
@@ -101,11 +103,12 @@ public:
std::optional<sstables::compaction_strategy_type> get_compaction_strategy_class() const;
schema::extensions_map make_schema_extensions(const db::extensions& exts) const;
void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const;
void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const;
std::map<sstring, sstring> get_compaction_type_options() const;
std::optional<std::map<sstring, sstring>> get_compression_options() const;
const cdc::options* get_cdc_options(const schema::extensions_map&) const;
std::optional<caching_options> get_caching_options() const;
const tombstone_gc_options* get_tombstone_gc_options(const schema::extensions_map&) const;
#if 0
public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException
{

View File

@@ -94,8 +94,8 @@ public:
_defined_ordering.emplace_back(alias, reversed);
}
void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const {
_properties->validate(db, schema_extensions);
void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const {
_properties->validate(db, std::move(ks_name), schema_extensions);
}
};

View File

@@ -211,7 +211,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
throw exceptions::invalid_request_exception(format("Multiple definition of identifier {}", (*i)->text()));
}
_properties.validate(db, _properties.properties()->make_schema_extensions(db.extensions()));
_properties.validate(db, keyspace(), _properties.properties()->make_schema_extensions(db.extensions()));
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());

View File

@@ -154,7 +154,7 @@ view_ptr create_view_statement::prepare_view(data_dictionary::database db) const
// - make sure base_table gc_grace_seconds > 0
auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions());
_properties.validate(db, schema_extensions);
_properties.validate(db, keyspace(), schema_extensions);
if (_properties.use_compact_storage()) {
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));

View File

@@ -71,6 +71,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "timeout_config.hh"
#include "tombstone_gc.hh"
#include "data_dictionary/impl.hh"
@@ -940,6 +941,7 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
lw_shared_ptr<table> cf;
try {
cf = _column_families.at(uuid);
drop_repair_history_map_for_table(uuid);
} catch (std::out_of_range&) {
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
}
@@ -2067,6 +2069,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// call.
auto low_mark = cf.set_low_replay_position_mark();
const auto uuid = cf.schema()->id();
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
future<> f = make_ready_future<>();
@@ -2112,6 +2115,8 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
});
});
});
}).then([this, uuid] {
drop_repair_history_map_for_table(uuid);
});
});
}

View File

@@ -35,6 +35,7 @@
#include <seastar/net/tls.hh>
#include "cdc/cdc_extension.hh"
#include "tombstone_gc_extension.hh"
#include "config.hh"
#include "extensions.hh"
#include "log.hh"

View File

@@ -958,7 +958,7 @@ mutation compact_for_schema_digest(const mutation& m) {
// See https://issues.apache.org/jira/browse/CASSANDRA-6862.
// We achieve similar effect with compact_for_compaction().
mutation m_compacted(m);
m_compacted.partition().compact_for_compaction(*m.schema(), always_gc, gc_clock::time_point::max());
m_compacted.partition().compact_for_compaction_drop_tombstones_unconditionally(*m.schema(), m.decorated_key());
return m_compacted;
}

View File

@@ -288,6 +288,26 @@ schema_ptr system_keyspace::raft_config() {
return schema;
}
schema_ptr system_keyspace::repair_history() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_HISTORY);
return schema_builder(NAME, REPAIR_HISTORY, std::optional(id))
.with_column("table_uuid", uuid_type, column_kind::partition_key)
// The time is repair start time
.with_column("repair_time", timestamp_type, column_kind::clustering_key)
.with_column("repair_uuid", uuid_type, column_kind::clustering_key)
// The token range is (range_start, range_end]
.with_column("range_start", long_type, column_kind::clustering_key)
.with_column("range_end", long_type, column_kind::clustering_key)
.with_column("keyspace_name", utf8_type, column_kind::static_column)
.with_column("table_name", utf8_type, column_kind::static_column)
.set_comment("Record repair history")
.with_version(generate_schema_version(id))
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2518,6 +2538,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
compactions_in_progress(), compaction_history(),
sstable_activity(), clients(), size_estimates(), large_partitions(), large_rows(), large_cells(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),

View File

@@ -149,6 +149,7 @@ public:
static constexpr auto RAFT = "raft";
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_CONFIG = "raft_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static const char *const CLIENTS;
struct v3 {
@@ -230,6 +231,7 @@ public:
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);

View File

@@ -995,7 +995,8 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
throw std::logic_error("Empty materialized view updated");
}
auto gc_before = _now - _schema->gc_grace_seconds();
auto dk = dht::decorate_key(*_schema, _key);
auto gc_before = ::get_gc_before_for_key(_schema, dk, _now);
// We allow existing to be disengaged, which we treat the same as an empty row.
if (existing) {

View File

@@ -151,6 +151,7 @@ extern const std::string_view UDA;
extern const std::string_view SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT;
extern const std::string_view SUPPORTS_RAFT_CLUSTER_MANAGEMENT;
extern const std::string_view USES_RAFT_CLUSTER_MANAGEMENT;
extern const std::string_view TOMBSTONE_GC_OPTIONS;
}

View File

@@ -76,6 +76,7 @@ constexpr std::string_view features::UDA = "UDA";
constexpr std::string_view features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT = "SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT";
constexpr std::string_view features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT = "SUPPORTS_RAFT_CLUSTER_MANAGEMENT";
constexpr std::string_view features::USES_RAFT_CLUSTER_MANAGEMENT = "USES_RAFT_CLUSTER_MANAGEMENT";
constexpr std::string_view features::TOMBSTONE_GC_OPTIONS = "TOMBSTONE_GC_OPTIONS";
static logging::logger logger("features");
@@ -104,6 +105,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _separate_page_size_and_safety_limit(*this, features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT)
, _supports_raft_cluster_mgmt(*this, features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT)
, _uses_raft_cluster_mgmt(*this, features::USES_RAFT_CLUSTER_MANAGEMENT)
, _tombstone_gc_options(*this, features::TOMBSTONE_GC_OPTIONS)
, _raft_support_listener(_supports_raft_cluster_mgmt.when_enabled([this] {
// When the cluster fully supports raft-based cluster management,
// we can re-enable support for the second gossip feature to trigger
@@ -227,6 +229,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT,
gms::features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT,
gms::features::USES_RAFT_CLUSTER_MANAGEMENT,
gms::features::TOMBSTONE_GC_OPTIONS,
};
for (const sstring& s : _config._disabled_features) {
@@ -335,6 +338,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_separate_page_size_and_safety_limit),
std::ref(_supports_raft_cluster_mgmt),
std::ref(_uses_raft_cluster_mgmt),
std::ref(_tombstone_gc_options),
})
{
if (list.contains(f.name())) {

View File

@@ -103,6 +103,7 @@ private:
gms::feature _separate_page_size_and_safety_limit;
gms::feature _supports_raft_cluster_mgmt;
gms::feature _uses_raft_cluster_mgmt;
gms::feature _tombstone_gc_options;
gms::feature::listener_registration _raft_support_listener;
@@ -199,6 +200,10 @@ public:
return bool(_separate_page_size_and_safety_limit);
}
bool cluster_supports_tombstone_gc_options() const {
return bool(_tombstone_gc_options);
}
static std::set<sstring> to_feature_set(sstring features_string);
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`

View File

@@ -142,3 +142,28 @@ struct node_ops_cmd_response {
// Optional field, set by query_pending_ops cmd
std::list<utils::UUID> pending_ops;
};
struct repair_update_system_table_request {
utils::UUID repair_uuid;
utils::UUID table_uuid;
sstring keyspace_name;
sstring table_name;
dht::token_range range;
gc_clock::time_point repair_time;
};
struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
utils::UUID repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;
};
struct repair_flush_hints_batchlog_response {
};
verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request) -> repair_update_system_table_response;
verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request) -> repair_flush_hints_batchlog_response;

View File

@@ -90,6 +90,7 @@
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
#include "tombstone_gc_extension.hh"
#include "alternator/tags_extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
@@ -438,6 +439,7 @@ For more information about individual apps, run: scylla {app_name} --help
ext->add_schema_extension<alternator::tags_extension>(alternator::tags_extension::NAME);
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
ext->add_schema_extension<tombstone_gc_extension>(tombstone_gc_extension::NAME);
auto cfg = make_lw_shared<db::config>(ext);
auto init = app.get_options_description().add_options();
@@ -1095,7 +1097,7 @@ For more information about individual apps, run: scylla {app_name} --help
// both)
supervisor::notify("starting messaging service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});

View File

@@ -81,7 +81,6 @@
#include "idl/gossip_digest.dist.impl.hh"
#include "idl/read_command.dist.impl.hh"
#include "idl/range.dist.impl.hh"
#include "idl/partition_checksum.dist.impl.hh"
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
@@ -103,6 +102,7 @@
#include "locator/snitch_base.hh"
#include "message/rpc_protocol_impl.hh"
#include "idl/partition_checksum.dist.impl.hh"
namespace netw {
@@ -469,6 +469,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
case messaging_verb::REPAIR_UPDATE_SYSTEM_TABLE:
case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
return 1;

View File

@@ -161,7 +161,9 @@ enum class messaging_verb : int32_t {
RAFT_MODIFY_CONFIG = 56,
GROUP0_PEER_EXCHANGE = 57,
GROUP0_MODIFY_CONFIG = 58,
LAST = 59,
REPAIR_UPDATE_SYSTEM_TABLE = 59,
REPAIR_FLUSH_HINTS_BATCHLOG = 60,
LAST = 61,
};
} // namespace netw

View File

@@ -23,6 +23,7 @@
#include "compaction/compaction_garbage_collector.hh"
#include "mutation_fragment.hh"
#include "tombstone_gc.hh"
static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) {
// Like PK range, an empty row range, should be considered an "exclude all" restriction
@@ -150,10 +151,10 @@ template<emit_only_live_rows OnlyLive, compact_for_sstables SSTableCompaction>
class compact_mutation_state {
const schema& _schema;
gc_clock::time_point _query_time;
gc_clock::time_point _gc_before;
std::function<api::timestamp_type(const dht::decorated_key&)> _get_max_purgeable;
can_gc_fn _can_gc;
api::timestamp_type _max_purgeable = api::missing_timestamp;
std::optional<gc_clock::time_point> _gc_before;
const query::partition_slice& _slice;
uint64_t _row_limit{};
uint32_t _partition_limit{};
@@ -209,13 +210,26 @@ private:
}
bool can_purge_tombstone(const tombstone& t) {
return t.deletion_time < _gc_before && can_gc(t);
return can_gc(t) && t.deletion_time < get_gc_before();
};
bool can_purge_tombstone(const row_tombstone& t) {
return t.max_deletion_time() < _gc_before && can_gc(t.tomb());
return can_gc(t.tomb()) && t.max_deletion_time() < get_gc_before();
};
gc_clock::time_point get_gc_before() {
if (_gc_before) {
return _gc_before.value();
} else {
if (_dk) {
_gc_before = ::get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time);
return _gc_before.value();
} else {
return gc_clock::time_point::min();
}
}
}
bool can_gc(tombstone t) {
if (!sstable_compaction()) {
return true;
@@ -241,7 +255,6 @@ public:
uint32_t partition_limit)
: _schema(s)
, _query_time(query_time)
, _gc_before(saturating_subtract(query_time, s.gc_grace_seconds()))
, _can_gc(always_gc)
, _slice(slice)
, _row_limit(limit)
@@ -257,7 +270,6 @@ public:
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable)
: _schema(s)
, _query_time(compaction_time)
, _gc_before(saturating_subtract(_query_time, s.gc_grace_seconds()))
, _get_max_purgeable(std::move(get_max_purgeable))
, _can_gc([this] (tombstone t) { return can_gc(t); })
, _slice(s.full_slice())
@@ -282,6 +294,7 @@ public:
_range_tombstones.clear();
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_max_purgeable = api::missing_timestamp;
_gc_before = std::nullopt;
_last_static_row.reset();
}
@@ -306,8 +319,9 @@ public:
if constexpr (sstable_compaction()) {
_collector->start_collecting_static_row();
}
auto gc_before = get_gc_before();
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, row_tombstone(current_tombstone),
_query_time, _can_gc, _gc_before, _collector.get());
_query_time, _can_gc, gc_before, _collector.get());
_stats.static_rows += is_live;
if constexpr (sstable_compaction()) {
_collector->consume_static_row([this, &gc_consumer, current_tombstone] (static_row&& sr_garbage) {
@@ -350,9 +364,9 @@ public:
cr.remove_tombstone();
}
}
bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before, _collector.get());
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker(),
auto gc_before = get_gc_before();
bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, gc_before, _collector.get());
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, gc_before, cr.marker(),
_collector.get());
_stats.clustering_rows += is_live;
@@ -470,7 +484,6 @@ public:
_rows_in_current_partition = 0;
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_query_time = query_time;
_gc_before = saturating_subtract(query_time, _schema.gc_grace_seconds());
_stats = {};
if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone)
@@ -517,7 +530,8 @@ public:
}
compact_mutation(const schema& s, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer())
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
Consumer consumer, GCConsumer gc_consumer = GCConsumer())
: _state(make_lw_shared<compact_mutation_state<OnlyLive, SSTableCompaction>>(s, compaction_time, get_max_purgeable))
, _consumer(std::move(consumer))
, _gc_consumer(std::move(gc_consumer)) {

View File

@@ -41,6 +41,7 @@
#include "utils/exceptions.hh"
#include "clustering_key_filter.hh"
#include "mutation_partition_view.hh"
#include "tombstone_gc.hh"
logging::logger mplog("mutation_partition");
@@ -1294,17 +1295,20 @@ void mutation_partition::trim_rows(const schema& s,
}
uint32_t mutation_partition::do_compact(const schema& s,
const dht::decorated_key& dk,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
bool reverse,
uint64_t row_limit,
can_gc_fn& can_gc)
can_gc_fn& can_gc,
bool drop_tombstones_unconditionally)
{
check_schema(s);
assert(row_limit > 0);
auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
auto gc_before = drop_tombstones_unconditionally ? gc_clock::time_point::max() :
::get_gc_before_for_key(s.shared_from_this(), dk, query_time);
auto should_purge_tombstone = [&] (const tombstone& t) {
return t.deletion_time < gc_before && can_gc(t);
@@ -1363,6 +1367,7 @@ uint32_t mutation_partition::do_compact(const schema& s,
uint64_t
mutation_partition::compact_for_query(
const schema& s,
const dht::decorated_key& dk,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
@@ -1370,18 +1375,31 @@ mutation_partition::compact_for_query(
uint64_t row_limit)
{
check_schema(s);
return do_compact(s, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc);
bool drop_tombstones_unconditionally = false;
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally);
}
void mutation_partition::compact_for_compaction(const schema& s,
can_gc_fn& can_gc, gc_clock::time_point compaction_time)
can_gc_fn& can_gc, const dht::decorated_key& dk, gc_clock::time_point compaction_time)
{
check_schema(s);
static const std::vector<query::clustering_range> all_rows = {
query::clustering_range::make_open_ended_both_sides()
};
do_compact(s, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc);
bool drop_tombstones_unconditionally = false;
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally);
}
void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(const schema& s, const dht::decorated_key& dk)
{
check_schema(s);
static const std::vector<query::clustering_range> all_rows = {
query::clustering_range::make_open_ended_both_sides()
};
bool drop_tombstones_unconditionally = true;
auto compaction_time = gc_clock::time_point::max();
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally);
}
// Returns true if the mutation_partition represents no writes.

View File

@@ -1241,12 +1241,14 @@ private:
void insert_row(const schema& s, const clustering_key& key, const deletable_row& row);
uint32_t do_compact(const schema& s,
const dht::decorated_key& dk,
gc_clock::time_point now,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
bool reverse,
uint64_t row_limit,
can_gc_fn&);
can_gc_fn&,
bool drop_tombstones_unconditionally);
// Calls func for each row entry inside row_ranges until func returns stop_iteration::yes.
// Removes all entries for which func didn't return stop_iteration::no or wasn't called at all.
@@ -1274,7 +1276,7 @@ public:
//
// The row_limit parameter must be > 0.
//
uint64_t compact_for_query(const schema& s, gc_clock::time_point query_time,
uint64_t compact_for_query(const schema& s, const dht::decorated_key& dk, gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges, bool always_return_static_content,
bool reversed, uint64_t row_limit);
@@ -1283,8 +1285,13 @@ public:
// - drops cells covered by higher-level tombstones
// - drops expired tombstones which timestamp is before max_purgeable
void compact_for_compaction(const schema& s, can_gc_fn&,
const dht::decorated_key& dk,
gc_clock::time_point compaction_time);
// Like compact_for_compaction but drop tombstones unconditionally
void compact_for_compaction_drop_tombstones_unconditionally(const schema& s,
const dht::decorated_key& dk);
// Returns the minimal mutation_partition that when applied to "other" will
// create a mutation_partition equal to the sum of other and this one.
// This and other must both be governed by the same schema s.

View File

@@ -55,6 +55,8 @@
#include <cfloat>
#include "idl/partition_checksum.dist.hh"
logging::logger rlogger("repair");
void node_ops_info::check_abort() {
@@ -313,7 +315,7 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
#endif
}
static future<std::vector<gms::inet_address>> get_hosts_participating_in_repair(database& db,
static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(database& db,
const sstring& ksname,
const dht::token_range_vector& ranges,
const std::vector<sstring>& data_centers,
@@ -333,7 +335,7 @@ static future<std::vector<gms::inet_address>> get_hosts_participating_in_repair(
}
});
co_return std::vector<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
co_return std::list<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
}
static tracker* _the_tracker = nullptr;
@@ -591,8 +593,10 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid)
: db(repair.get_db())
std::optional<utils::UUID> ops_uuid,
bool hints_batchlog_flushed)
: rs(repair)
, db(repair.get_db())
, messaging(repair.get_messaging().container())
, sys_dist_ks(repair.get_sys_dist_ks())
, view_update_generator(repair.get_view_update_generator())
@@ -609,8 +613,10 @@ repair_info::repair_info(repair_service& repair,
, hosts(hosts_)
, ignore_nodes(ignore_nodes_)
, reason(reason_)
, total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor())
, nr_ranges_total(ranges.size())
, _ops_uuid(std::move(ops_uuid)) {
, _ops_uuid(std::move(ops_uuid))
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) {
}
void repair_info::check_failed_ranges() {
@@ -1120,12 +1126,41 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
// Do it in the background.
(void)repair_tracker().run(id, [this, &db, id, keyspace = std::move(keyspace),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
auto uuid = id.uuid;
auto waiting_nodes = db.local().get_token_metadata().get_all_endpoints();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
auto participants = get_hosts_participating_in_repair(db.local(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
bool hints_batchlog_flushed = false;
try {
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
throw;
}
}).get();
hints_batchlog_flushed = true;
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
}
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
abort_source as;
auto uuid = id.uuid;
auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] {
auto tables = std::list<utils::UUID>(table_ids.begin(), table_ids.end());
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
@@ -1155,13 +1190,21 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
rlogger.info("repair[{}]: Finished to shutdown off-strategy compaction updater", uuid);
});
auto cleanup_repair_range_history = defer([this, uuid] () mutable {
try {
this->cleanup_history(uuid).get();
} catch (...) {
rlogger.warn("repair[{}]: Failed to cleanup history: {}", uuid, std::current_exception());
}
});
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges,
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (repair_service& local_repair) mutable {
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid, hints_batchlog_flushed);
return repair_ranges(ri);
});
repair_results.push_back(std::move(f));
@@ -1263,9 +1306,10 @@ future<> repair_service::do_sync_data_using_repair(
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});

View File

@@ -168,6 +168,7 @@ public:
class repair_info {
public:
repair_service& rs;
seastar::sharded<database>& db;
seastar::sharded<netw::messaging_service>& messaging;
sharded<db::system_distributed_keyspace>& sys_dist_ks;
@@ -186,6 +187,7 @@ public:
std::unordered_set<gms::inet_address> ignore_nodes;
streaming::stream_reason reason;
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
size_t total_rf;
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total;
size_t nr_failed_ranges = 0;
@@ -194,6 +196,7 @@ public:
repair_stats _stats;
std::unordered_set<sstring> dropped_tables;
std::optional<utils::UUID> _ops_uuid;
bool _hints_batchlog_flushed = false;
public:
repair_info(repair_service& repair,
const sstring& keyspace_,
@@ -204,7 +207,8 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid);
std::optional<utils::UUID> ops_uuid,
bool hints_batchlog_flushed);
void check_failed_ranges();
void abort();
void check_in_abort();
@@ -219,6 +223,10 @@ public:
return _ops_uuid;
};
bool hints_batchlog_flushed() const {
return _hints_batchlog_flushed;
}
future<> repair_range(const dht::token_range& range);
};
@@ -491,6 +499,29 @@ struct node_ops_cmd_response {
}
};
struct repair_update_system_table_request {
utils::UUID repair_uuid;
utils::UUID table_uuid;
sstring keyspace_name;
sstring table_name;
dht::token_range range;
gc_clock::time_point repair_time;
};
struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
utils::UUID repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;
};
struct repair_flush_hints_batchlog_response {
};
namespace std {
template<>

View File

@@ -52,6 +52,13 @@
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include "db/query_context.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "db/batchlog_manager.hh"
#include "cql3/untyped_result_set.hh"
#include "idl/partition_checksum.dist.hh"
extern logging::logger rlogger;
@@ -2266,6 +2273,66 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
});
}
future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {
rlogger.debug("repair[{}]: Got repair_update_system_table_request from node={}, range={}, repair_time={}", req.repair_uuid, from, req.range, req.repair_time);
auto& db = this->get_db();
bool is_valid_range = true;
if (req.range.start()) {
if (req.range.start()->is_inclusive()) {
is_valid_range = false;
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive()) {
is_valid_range = false;
}
}
if (!is_valid_range) {
throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range));
}
co_await db.invoke_on_all([&req] (database& local_db) {
auto& table = local_db.find_column_family(req.table_uuid);
return ::update_repair_time(table.schema(), req.range, req.repair_time);
});
sstring cql = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)",
db::system_keyspace::REPAIR_HISTORY);
auto range_start = req.range.start() ? req.range.start()->value() : dht::minimum_token();
auto range_end = req.range.end() ? req.range.end()->value() : dht::maximum_token();
db_clock::time_point ts = db_clock::from_time_t(gc_clock::to_time_t(req.repair_time));
co_await db::qctx->execute_cql(cql, req.table_uuid, ts, req.repair_uuid, req.keyspace_name, req.table_name,
dht::token::to_int64(range_start), dht::token::to_int64(range_end)).discard_result();
co_return repair_update_system_table_response();
}
future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) {
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={}, target_nodes={}, hints_timeout={}s, batchlog_timeout={}s",
req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
std::vector<gms::inet_address> target_nodes(req.target_nodes.begin(), req.target_nodes.end());
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
try {
co_await coroutine::all(
[this, &from, &req, &sync_point, &deadline] () -> future<> {
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
co_return;
},
[this, &from, &req] () -> future<> {
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
co_await _bm.local().do_batch_log_replay();
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
}
);
} catch (...) {
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}, target_hosts={}, {}",
req.repair_uuid, from, req.target_nodes, std::current_exception());
throw;
}
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
co_return repair_flush_hints_batchlog_response();
}
future<> repair_service::init_ms_handlers() {
auto& ms = this->_messaging;
@@ -2430,6 +2497,14 @@ future<> repair_service::init_ms_handlers() {
ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) {
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
});
ser::partition_checksum_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_update_system_table_handler(from, std::move(req));
});
ser::partition_checksum_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_flush_hints_batchlog_handler(from, std::move(req));
});
return make_ready_future<>();
}
@@ -2450,7 +2525,10 @@ future<> repair_service::uninit_ms_handlers() {
ms.unregister_repair_row_level_stop(),
ms.unregister_repair_get_estimated_partitions(),
ms.unregister_repair_set_estimated_partitions(),
ms.unregister_repair_get_diff_algorithms()).discard_result();
ms.unregister_repair_get_diff_algorithms(),
ser::partition_checksum_rpc_verbs::unregister_repair_update_system_table(&ms),
ser::partition_checksum_rpc_verbs::unregister_repair_flush_hints_batchlog(&ms)
).discard_result();
}
class repair_meta_tracker {
@@ -2522,6 +2600,8 @@ class row_level_repair {
// the next repair.
uint64_t _seed;
gc_clock::time_point _start_time;
public:
row_level_repair(repair_info& ri,
sstring cf_name,
@@ -2534,7 +2614,8 @@ public:
, _range(std::move(range))
, _all_live_peer_nodes(std::move(all_live_peer_nodes))
, _cf(_ri.db.local().find_column_family(_table_id))
, _seed(get_random_seed()) {
, _seed(get_random_seed())
, _start_time(gc_clock::now()) {
}
private:
@@ -2771,6 +2852,45 @@ private:
master.stats().round_nr_slow_path++;
}
private:
// Update system.repair_history table
future<> update_system_repair_table() {
// Update repair_history table only if it is a reguar repair.
if (_ri.reason != streaming::stream_reason::repair) {
co_return;
}
// Update repair_history table only if all replicas have been repaired
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_ri.total_rf != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
if (!_ri.hints_batchlog_flushed()) {
co_return;
}
repair_service& rs = _ri.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
co_await parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
try {
auto& ms = _ri.messaging.local();
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception());
}
});
co_return;
}
public:
future<> run() {
return seastar::async([this] {
@@ -2884,6 +3004,8 @@ public:
} else {
throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _ri.keyspace, _cf_name, _range));
}
} else {
update_system_repair_table().get();
}
rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}",
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
@@ -2957,6 +3079,8 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
repair_service::repair_service(distributed<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<database>& db,
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
service::migration_manager& mm,
@@ -2964,6 +3088,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
: _gossiper(gossiper)
, _messaging(ms)
, _db(db)
, _sp(sp)
, _bm(bm)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(vug)
, _mm(mm)
@@ -2976,6 +3102,7 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
}
future<> repair_service::start() {
co_await load_history();
co_await init_metrics();
co_await init_ms_handlers();
}
@@ -2991,3 +3118,79 @@ future<> repair_service::stop() {
repair_service::~repair_service() {
assert(_stopped);
}
static shard_id repair_id_to_shard(utils::UUID& repair_id) {
return shard_id(repair_id.get_most_significant_bits()) % smp::count;
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
if (rh.repair_time > repair_time) {
rh.repair_time = repair_time;
}
auto finished_shards = ++(rh.finished_ranges[table_id][range]);
if (finished_shards == smp::count) {
// All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return rh.repair_time;
} else {
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_historytable, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return std::nullopt;
}
});
}
future<> repair_service::cleanup_history(utils::UUID repair_id) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
rs._finished_ranges_history.erase(repair_id);
rlogger.debug("repair[{}]: Finished cleaning up repair_service history", repair_id);
});
}
future<> repair_service::load_history() {
auto tables = get_db().local().get_column_families();
for (const auto& x : tables) {
auto& table_uuid = x.first;
auto& table = x.second;
auto shard = unsigned(table_uuid.get_most_significant_bits()) % smp::count;
if (shard != this_shard_id()) {
continue;
}
rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}",
table->schema()->ks_name(), table->schema()->cf_name(), table_uuid);
auto req = format("SELECT * from system.{} WHERE table_uuid = {}", db::system_keyspace::REPAIR_HISTORY, table_uuid);
co_await db::qctx->qp().query_internal(req, [this] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
auto table_uuid = row.get_as<utils::UUID>("table_uuid");
auto range_start = row.get_as<int64_t>("range_start");
auto range_end = row.get_as<int64_t>("range_end");
auto keyspace_name = row.get_as<sstring>("keyspace_name");
auto table_name = row.get_as<sstring>("table_name");
auto start = range_start == std::numeric_limits<int64_t>::min() ? dht::minimum_token() : dht::token::from_int64(range_start);
auto end = range_end == std::numeric_limits<int64_t>::min() ? dht::maximum_token() : dht::token::from_int64(range_end);
auto repair_time = to_gc_clock(row.get_as<db_clock::time_point>("repair_time"));
auto range = dht::token_range(dht::token_range::bound(start, false), dht::token_range::bound(end, true));
rlogger.debug("Loading repair history for keyspace={}, table={}, table_uuid={}, repair_time={}, range={}",
keyspace_name, table_name, table_uuid, repair_time, range);
co_await get_db().invoke_on_all([table_uuid, range, repair_time, keyspace_name, table_name] (database& local_db) -> future<> {
try {
auto& table = local_db.find_column_family(table_uuid);
::update_repair_time(table.schema(), range, repair_time);
} catch (no_such_column_family&) {
rlogger.trace("Table {}.{} with {} does not exist", keyspace_name, table_name, table_uuid);
} catch (...) {
rlogger.warn("Failed to load repair history for keyspace={}, table={}, range={}, repair_time={}",
keyspace_name, table_name, range, repair_time);
}
co_return;
});
co_return stop_iteration::no;
});
}
co_return;
}

View File

@@ -30,11 +30,13 @@ class row_level_repair_gossip_helper;
namespace service {
class migration_manager;
class storage_proxy;
}
namespace db {
class system_distributed_keyspace;
class batchlog_manager;
}
@@ -42,14 +44,25 @@ namespace gms {
class gossiper;
}
class repair_history {
public:
// The key for the map is the table_id
std::unordered_map<utils::UUID, std::unordered_map<dht::token_range, size_t>> finished_ranges;
gc_clock::time_point repair_time = gc_clock::time_point::max();
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
distributed<gms::gossiper>& _gossiper;
netw::messaging_service& _messaging;
sharded<database>& _db;
sharded<service::storage_proxy>& _sp;
sharded<db::batchlog_manager>& _bm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
service::migration_manager& _mm;
std::unordered_map<utils::UUID, repair_history> _finished_ranges_history;
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
std::unique_ptr<tracker> _tracker;
bool _stopped = false;
@@ -63,6 +76,8 @@ public:
repair_service(distributed<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<database>& db,
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
service::migration_manager& mm, size_t max_repair_memory);
@@ -77,6 +92,10 @@ public:
// stop them abruptly).
future<> shutdown();
future<std::optional<gc_clock::time_point>> update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time);
future<> cleanup_history(utils::UUID repair_id);
future<> load_history();
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
// The tokens are the tokens assigned to the bootstrap node.
@@ -101,6 +120,14 @@ private:
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid);
future<repair_update_system_table_response> repair_update_system_table_handler(
gms::inet_address from,
repair_update_system_table_request req);
future<repair_flush_hints_batchlog_response> repair_flush_hints_batchlog_handler(
gms::inet_address from,
repair_flush_hints_batchlog_request req);
public:
netw::messaging_service& get_messaging() noexcept { return _messaging; }
sharded<database>& get_db() noexcept { return _db; }

View File

@@ -40,8 +40,10 @@
#include "dht/i_partitioner.hh"
#include "dht/token-sharding.hh"
#include "cdc/cdc_extension.hh"
#include "tombstone_gc_extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "utils/rjson.hh"
#include "tombstone_gc_options.hh"
constexpr int32_t schema::NAME_LENGTH;
@@ -529,6 +531,7 @@ bool operator==(const schema& x, const schema& y)
&& x._raw._compaction_strategy_options == y._raw._compaction_strategy_options
&& x._raw._compaction_enabled == y._raw._compaction_enabled
&& x.cdc_options() == y.cdc_options()
&& x.tombstone_gc_options() == y.tombstone_gc_options()
&& x._raw._caching_options == y._raw._caching_options
&& x._raw._dropped_columns == y._raw._dropped_columns
&& x._raw._collections == y._raw._collections
@@ -1267,11 +1270,26 @@ const cdc::options& schema::cdc_options() const {
return default_cdc_options;
}
const ::tombstone_gc_options& schema::tombstone_gc_options() const {
static const ::tombstone_gc_options default_tombstone_gc_options;
const auto& schema_extensions = _raw._extensions;
if (auto it = schema_extensions.find(tombstone_gc_extension::NAME); it != schema_extensions.end()) {
return dynamic_pointer_cast<tombstone_gc_extension>(it->second)->get_options();
}
return default_tombstone_gc_options;
}
schema_builder& schema_builder::with_cdc_options(const cdc::options& opts) {
add_extension(cdc::cdc_extension::NAME, ::make_shared<cdc::cdc_extension>(opts));
return *this;
}
schema_builder& schema_builder::with_tombstone_gc_options(const tombstone_gc_options& opts) {
add_extension(tombstone_gc_extension::NAME, ::make_shared<tombstone_gc_extension>(opts));
return *this;
}
schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) {
add_extension(db::paxos_grace_seconds_extension::NAME, ::make_shared<db::paxos_grace_seconds_extension>(seconds));
return *this;

View File

@@ -41,6 +41,7 @@
#include "caching_options.hh"
#include "column_computation.hh"
#include "timestamp.hh"
#include "tombstone_gc_options.hh"
namespace dht {
@@ -821,6 +822,8 @@ public:
const cdc::options& cdc_options() const;
const ::tombstone_gc_options& tombstone_gc_options() const;
const ::speculative_retry& speculative_retry() const {
return _raw._speculative_retry;
}

View File

@@ -25,6 +25,7 @@
#include "database_fwd.hh"
#include "cdc/log.hh"
#include "dht/i_partitioner.hh"
#include "tombstone_gc_options.hh"
struct schema_builder {
public:
@@ -291,6 +292,7 @@ public:
schema_builder& without_indexes();
schema_builder& with_cdc_options(const cdc::options&);
schema_builder& with_tombstone_gc_options(const tombstone_gc_options& opts);
default_names get_default_names() const {
return default_names(_raw);

View File

@@ -3145,7 +3145,7 @@ private:
auto mp = mutation_partition(s, m.partition());
auto&& ranges = cmd.slice.row_ranges(s, m.key());
bool always_return_static_content = cmd.slice.options.contains<query::partition_slice::option::always_return_static_content>();
mp.compact_for_query(s, cmd.timestamp, ranges, always_return_static_content, is_reversed, limit);
mp.compact_for_query(s, m.decorated_key(), cmd.timestamp, ranges, always_return_static_content, is_reversed, limit);
return primary_key{m.decorated_key(), get_last_reconciled_row(s, mp, is_reversed)};
}
@@ -3220,7 +3220,7 @@ private:
std::vector<query::clustering_range> ranges;
ranges.emplace_back(is_reversed ? query::clustering_range::make_starting_with(std::move(*shortest_read->clustering))
: query::clustering_range::make_ending_with(std::move(*shortest_read->clustering)));
it->live_row_count = it->mut.partition().compact_for_query(s, cmd.timestamp, ranges, always_return_static_content,
it->live_row_count = it->mut.partition().compact_for_query(s, it->mut.decorated_key(), cmd.timestamp, ranges, always_return_static_content,
is_reversed, query::partition_max_rows);
}
}

View File

@@ -88,6 +88,7 @@
#include "mx/reader.hh"
#include "utils/bit_cast.hh"
#include "utils/cached_file.hh"
#include "tombstone_gc.hh"
thread_local disk_error_signal_type sstable_read_error;
thread_local disk_error_signal_type sstable_write_error;
@@ -3139,6 +3140,46 @@ std::optional<large_data_stats_entry> sstable::get_large_data_stat(large_data_ty
return std::make_optional<large_data_stats_entry>();
}
// The gc_before returned by the function can only be used to estimate if the
// sstable is worth dropping some tombstones. We only return the maximum
// gc_before for all the partitions that have record in repair history map. It
// is fine that some of the partitions inside the sstable does not have a
// record.
gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const {
auto s = get_schema();
auto start = get_first_decorated_key().token();
auto end = get_last_decorated_key().token();
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
sstlog.trace("sstable={}, ks={}, cf={}, range={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range);
return ::get_gc_before_for_range(s, range, compaction_time).max_gc_before;
}
// If the sstable contains any regular live cells, we can not drop the sstable.
// We do not even bother to query the gc_before. Return
// gc_clock::time_point::min() as gc_before.
//
// If the token range of the sstable contains tokens that do not have a record
// in the repair history map, we can not drop the sstable, in such case we
// return gc_clock::time_point::min() as gc_before. Otherwise, return the
// gc_before from the repair history map.
gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const {
auto deletion_time = get_max_local_deletion_time();
auto s = get_schema();
// No need to query gc_before for the sstable if the max_deletion_time is max()
if (deletion_time == gc_clock::time_point(gc_clock::duration(std::numeric_limits<int>::max()))) {
sstlog.trace("sstable={}, ks={}, cf={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, shortcut",
get_filename(), s->ks_name(), s->cf_name(), deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count());
return gc_clock::time_point::min();
}
auto start = get_first_decorated_key().token();
auto end = get_last_decorated_key().token();
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, query",
get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count());
auto res = ::get_gc_before_for_range(s, range, compaction_time);
return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min();
}
}
namespace seastar {

View File

@@ -896,6 +896,8 @@ public:
friend std::unique_ptr<DataConsumeRowsContext>
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&);
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const;
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const;
};
// When we compact sstables, we have to atomically instantiate the new

View File

@@ -59,7 +59,6 @@ static logging::logger tlogger("table");
static seastar::metrics::label column_family_label("cf");
static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
flat_mutation_reader_v2
@@ -2408,8 +2407,8 @@ public:
const sstables::sstable_set& get_sstable_set() const override {
return _t.get_sstable_set();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const override {
return sstables::get_fully_expired_sstables(*this, sstables, gc_clock::now() - schema()->gc_grace_seconds());
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(*this, sstables, query_time);
}
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
return _t.compacted_undeleted_sstables();

View File

@@ -266,7 +266,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
m = m1;
m.apply(m4);
m.partition().compact_for_query(*s, gc_clock::now(), { query::clustering_range::make_singular(ck) },
m.partition().compact_for_query(*s, m.decorated_key(), gc_clock::now(), { query::clustering_range::make_singular(ck) },
false, false, query::max_rows);
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
BOOST_REQUIRE(m.partition().static_row().empty());

View File

@@ -158,7 +158,7 @@ SEASTAR_TEST_CASE(test_memtable_flush_reader) {
const auto now = gc_clock::now();
auto compacted_muts = muts;
for (auto& mut : compacted_muts) {
mut.partition().compact_for_compaction(*mut.schema(), always_gc, now);
mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now);
}
testlog.info("Simple read");

View File

@@ -1029,7 +1029,7 @@ sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, s
static mutation compacted(const mutation& m) {
auto result = m;
result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now());
result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now());
return result;
}
@@ -2710,7 +2710,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) {
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto source = mt->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
auto mr = make_compacting_reader(std::move(source), query_time, [] (const dht::decorated_key&) { return api::min_timestamp; });
auto mr = make_compacting_reader(std::move(source), query_time,
[] (const dht::decorated_key&) { return api::min_timestamp; });
if (single_fragment_buffer) {
mr.set_max_buffer_size(1);
}
@@ -2761,7 +2762,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
}
auto mr = make_compacting_reader(make_flat_mutation_reader_from_fragments(ss.schema(), permit, std::move(mfs)),
gc_clock::now(), [] (const dht::decorated_key&) { return api::min_timestamp; });
gc_clock::now(),
[] (const dht::decorated_key&) { return api::min_timestamp; });
mr.set_max_buffer_size(buffer_size);
return mr;

View File

@@ -457,7 +457,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_collection_allocation) {
auto res_mut_opt = read_mutation_from_flat_mutation_reader(rd).get0();
BOOST_REQUIRE(res_mut_opt);
res_mut_opt->partition().compact_for_query(*schema, gc_clock::now(), {query::full_clustering_range}, true, false,
res_mut_opt->partition().compact_for_query(*schema, res_mut_opt->decorated_key(), gc_clock::now(), {query::full_clustering_range}, true, false,
std::numeric_limits<uint32_t>::max());
const auto stats_after = memory::stats();
@@ -1245,7 +1245,7 @@ SEASTAR_TEST_CASE(test_mutation_hash) {
static mutation compacted(const mutation& m) {
auto result = m;
result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now());
result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now());
return result;
}
@@ -1638,7 +1638,7 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1));
m.partition().apply(tomb);
BOOST_REQUIRE(!m.partition().empty());
m.partition().compact_for_compaction(*s, always_gc, gc_clock::now());
m.partition().compact_for_compaction(*s, always_gc, m.decorated_key(), gc_clock::now());
// Check that row was covered by tombstone.
BOOST_REQUIRE(m.partition().empty());
// Check that tombstone was purged after compact_for_compaction().
@@ -1744,11 +1744,11 @@ SEASTAR_TEST_CASE(test_trim_rows) {
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
mutation m2 = m;
m.partition().compact_for_query(*s, now, ranges, false, false, query::max_rows);
m.partition().compact_for_query(*s, m.decorated_key(), now, ranges, false, false, query::max_rows);
BOOST_REQUIRE(m.partition().clustered_rows().empty());
std::reverse(ranges.begin(), ranges.end());
m2.partition().compact_for_query(*s, now, ranges, false, true, query::max_rows);
m2.partition().compact_for_query(*s, m2.decorated_key(), now, ranges, false, true, query::max_rows);
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
};
@@ -1830,8 +1830,8 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
if (s != m2.schema()) {
return;
}
m1.partition().compact_for_compaction(*s, never_gc, now);
m2.partition().compact_for_compaction(*s, never_gc, now);
m1.partition().compact_for_compaction(*s, never_gc, m1.decorated_key(), now);
m2.partition().compact_for_compaction(*s, never_gc, m2.decorated_key(), now);
auto m12 = m1;
m12.apply(m2);
auto m12_with_diff = m1;
@@ -2949,6 +2949,7 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p
auto get_max_purgeable = [] (const dht::decorated_key&) {
return api::max_timestamp;
};
auto gc_grace_seconds = schema.gc_grace_seconds();
auto consumer = make_stable_flattened_mutations_consumer<compact_for_compaction<survived_compacted_fragments_consumer, purged_compacted_fragments_consumer>>(
schema,
query_time,

View File

@@ -371,12 +371,12 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) {
const auto now = gc_clock::now();
for (auto& m : muts) {
m.partition().compact_for_compaction(*random_schema.schema(), always_gc, now);
m.partition().compact_for_compaction(*random_schema.schema(), always_gc, m.decorated_key(), now);
}
std::vector<mutation> combined_mutations;
while (auto m = read_mutation_from_flat_mutation_reader(reader).get0()) {
m->partition().compact_for_compaction(*random_schema.schema(), always_gc, now);
m->partition().compact_for_compaction(*random_schema.schema(), always_gc, m->decorated_key(), now);
combined_mutations.emplace_back(std::move(*m));
}

View File

@@ -695,8 +695,8 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
// Drop empty rows
can_gc_fn never_gc = [] (tombstone) { return false; };
actual.compact_for_compaction(*s, never_gc, gc_clock::now());
expected.compact_for_compaction(*s, never_gc, gc_clock::now());
actual.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now());
expected.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now());
assert_that(s, actual).is_equal_to(expected);
}

View File

@@ -150,8 +150,8 @@ public:
const sstables::sstable_set& get_sstable_set() const override {
return _t->get_sstable_set();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const override {
return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, gc_clock::now() - schema()->gc_grace_seconds());
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, query_time);
}
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
return _compacted_undeleted;
@@ -1394,7 +1394,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t0, t1, std::numeric_limits<int32_t>::max()));
auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits<int32_t>::max()));
std::vector<sstables::shared_sstable> compacting = { sst1, sst2 };
auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15));
auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15) + cf->schema()->gc_grace_seconds());
BOOST_REQUIRE(expired.size() == 0);
}
@@ -1406,7 +1406,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t2, t3, std::numeric_limits<int32_t>::max()));
auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits<int32_t>::max()));
std::vector<sstables::shared_sstable> compacting = { sst1, sst2 };
auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25));
auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25) + cf->schema()->gc_grace_seconds());
BOOST_REQUIRE(expired.size() == 1);
auto expired_sst = *expired.begin();
BOOST_REQUIRE(expired_sst->generation() == 1);
@@ -3370,6 +3370,7 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
auto gc_now = gc_clock::now();
gc_before = gc_now - s->gc_grace_seconds();
auto gc_grace_seconds = s->gc_grace_seconds();
auto cfc = make_stable_flattened_mutations_consumer<compact_for_compaction<compacting_sstable_writer_test, compacting_sstable_writer_test>>(
*s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr));

View File

@@ -501,7 +501,7 @@ public:
BOOST_REQUIRE(bool(mo));
memory::scoped_critical_alloc_section dfg;
mutation got = *mo;
got.partition().compact_for_compaction(*m.schema(), always_gc, query_time);
got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time);
assert_that(got).is_equal_to(m, ck_ranges);
return *this;
}
@@ -912,7 +912,7 @@ public:
BOOST_REQUIRE(bool(mo));
memory::scoped_critical_alloc_section dfg;
mutation got = *mo;
got.partition().compact_for_compaction(*m.schema(), always_gc, query_time);
got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time);
assert_that(got).is_equal_to(m, ck_ranges);
return *this;
}

View File

@@ -945,7 +945,7 @@ void test_all_data_is_read_back(tests::reader_concurrency_semaphore_wrapper& sem
for_each_mutation([&semaphore, &populate, query_time] (const mutation& m) mutable {
auto ms = populate(m.schema(), {m}, query_time);
mutation copy(m);
copy.partition().compact_for_compaction(*copy.schema(), always_gc, query_time);
copy.partition().compact_for_compaction(*copy.schema(), always_gc, copy.decorated_key(), query_time);
assert_that(ms.make_reader(m.schema(), semaphore.make_permit())).produces_compacted(copy, query_time);
});
}
@@ -1623,7 +1623,7 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph
const auto query_time = gc_clock::now();
mutation m_compacted(m);
m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, query_time);
m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, m_compacted.decorated_key(), query_time);
{
auto rd = ms.make_reader_v2(m.schema(), semaphore.make_permit());

195
tombstone_gc.cc Normal file
View File

@@ -0,0 +1,195 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <chrono>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include "schema.hh"
#include "dht/i_partitioner.hh"
#include "gc_clock.hh"
#include "tombstone_gc.hh"
#include "locator/token_metadata.hh"
#include "exceptions/exceptions.hh"
#include "locator/abstract_replication_strategy.hh"
#include "database.hh"
#include "gms/feature_service.hh"
extern logging::logger dblog;
class repair_history_map {
public:
boost::icl::interval_map<dht::token, gc_clock::time_point, boost::icl::partial_absorber, std::less, boost::icl::inplace_max> map;
};
thread_local std::unordered_map<utils::UUID, seastar::lw_shared_ptr<repair_history_map>> repair_history_maps;
static seastar::lw_shared_ptr<repair_history_map> get_or_create_repair_history_map_for_table(const utils::UUID& id) {
auto it = repair_history_maps.find(id);
if (it != repair_history_maps.end()) {
return it->second;
} else {
repair_history_maps[id] = seastar::make_lw_shared<repair_history_map>();
return repair_history_maps[id];
}
}
seastar::lw_shared_ptr<repair_history_map> get_repair_history_map_for_table(const utils::UUID& id) {
auto it = repair_history_maps.find(id);
if (it != repair_history_maps.end()) {
return it->second;
} else {
return {};
}
}
void drop_repair_history_map_for_table(const utils::UUID& id) {
repair_history_maps.erase(id);
}
// This is useful for a sstable to query a gc_before for a range. The range is
// defined by the first and last key in the sstable.
//
// The min_gc_before and max_gc_before returned are the min and max gc_before for all the keys in the range.
//
// The knows_entire_range is set to true:
// 1) if the tombstone_gc_mode is not repair, since we have the same value for all the keys in the ranges.
// 2) if the tombstone_gc_mode is repair, and the range is a sub range of a range in the repair history map.
get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) {
bool knows_entire_range = true;
const auto& options = s->tombstone_gc_options();
switch (options.mode()) {
case tombstone_gc_mode::timeout: {
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=timeout", s->ks_name(), s->cf_name(), range);
auto gc_before = saturating_subtract(query_time, s->gc_grace_seconds());
return {gc_before, gc_before, knows_entire_range};
}
case tombstone_gc_mode::disabled: {
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=disabled", s->ks_name(), s->cf_name(), range);
return {gc_clock::time_point::min(), gc_clock::time_point::min(), knows_entire_range};
}
case tombstone_gc_mode::immediate: {
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=immediate", s->ks_name(), s->cf_name(), range);
return {gc_clock::time_point::max(), gc_clock::time_point::max(), knows_entire_range};
}
case tombstone_gc_mode::repair: {
const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
auto min_gc_before = gc_clock::time_point::min();
auto max_gc_before = gc_clock::time_point::min();
auto min_repair_timestamp = gc_clock::time_point::min();
auto max_repair_timestamp = gc_clock::time_point::min();
int hits = 0;
knows_entire_range = false;
auto m = get_repair_history_map_for_table(s->id());
if (m) {
auto interval = locator::token_metadata::range_to_interval(range);
auto min = gc_clock::time_point::max();
auto max = gc_clock::time_point::min();
bool contains_all = false;
for (auto& x : boost::make_iterator_range(m->map.equal_range(interval))) {
auto r = locator::token_metadata::interval_to_range(x.first);
min = std::min(x.second, min);
max = std::max(x.second, max);
if (++hits == 1 && r.contains(range, dht::tri_compare)) {
contains_all = true;
}
}
if (hits == 0) {
min_repair_timestamp = gc_clock::time_point::min();
max_repair_timestamp = gc_clock::time_point::min();
} else {
knows_entire_range = hits == 1 && contains_all;
min_repair_timestamp = min;
max_repair_timestamp = max;
}
min_gc_before = saturating_subtract(min_repair_timestamp, propagation_delay);
max_gc_before = saturating_subtract(max_repair_timestamp, propagation_delay);
};
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}",
s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range);
return {min_gc_before, max_gc_before, knows_entire_range};
}
}
}
gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) {
// if mode = timeout // default option, if user does not specify tombstone_gc options
// if mode = disabled // never gc tombstone
// if mode = immediate // can gc tombstone immediately
// if mode = repair // gc after repair
const auto& options = s->tombstone_gc_options();
switch (options.mode()) {
case tombstone_gc_mode::timeout:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=timeout", s->ks_name(), s->cf_name(), dk);
return saturating_subtract(query_time, s->gc_grace_seconds());
case tombstone_gc_mode::disabled:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=disabled", s->ks_name(), s->cf_name(), dk);
return gc_clock::time_point::min();
case tombstone_gc_mode::immediate:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=immediate", s->ks_name(), s->cf_name(), dk);
return gc_clock::time_point::max();
case tombstone_gc_mode::repair:
const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
auto gc_before = gc_clock::time_point::min();
auto repair_timestamp = gc_clock::time_point::min();
auto m = get_repair_history_map_for_table(s->id());
if (m) {
const auto it = m->map.find(dk.token());
if (it == m->map.end()) {
gc_before = gc_clock::time_point::min();
} else {
repair_timestamp = it->second;
gc_before = saturating_subtract(repair_timestamp, propagation_delay);
}
}
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}",
s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before);
return gc_before;
}
}
void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time) {
auto m = get_or_create_repair_history_map_for_table(s->id());
m->map += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
}
static bool needs_repair_before_gc(const database& db, sstring ks_name) {
// If a table uses local replication strategy or rf one, there is no
// need to run repair even if tombstone_gc mode = repair.
auto& ks = db.find_keyspace(ks_name);
auto& rs = ks.get_replication_strategy();
auto erm = ks.get_effective_replication_map();
bool needs_repair = rs.get_type() != locator::replication_strategy_type::local
&& erm->get_replication_factor() != 1;
return needs_repair;
}
void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name) {
if (!options) {
return;
}
if (!db.features().cluster_supports_tombstone_gc_options()) {
throw exceptions::configuration_exception("tombstone_gc option not supported by the cluster");
}
if (options->mode() == tombstone_gc_mode::repair && !needs_repair_before_gc(db, ks_name)) {
throw exceptions::configuration_exception("tombstone_gc option with mode = repair not supported for table with RF one or local replication strategy");
}
}

51
tombstone_gc.hh Normal file
View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/shared_ptr.hh>
#include "gc_clock.hh"
#include "dht/token.hh"
#include "schema_fwd.hh"
namespace dht {
class decorated_key;
using token_range = nonwrapping_range<token>;
}
struct get_gc_before_for_range_result {
gc_clock::time_point min_gc_before;
gc_clock::time_point max_gc_before;
bool knows_entire_range;
};
void drop_repair_history_map_for_table(const utils::UUID& id);
get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time);
gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time);
void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time);
void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name);

56
tombstone_gc_extension.hh Normal file
View File

@@ -0,0 +1,56 @@
/*
* Copyright 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <map>
#include <seastar/core/sstring.hh>
#include "bytes.hh"
#include "serializer.hh"
#include "db/extensions.hh"
#include "schema.hh"
#include "serializer_impl.hh"
#include "tombstone_gc_options.hh"
class tombstone_gc_extension : public schema_extension {
tombstone_gc_options _tombstone_gc_options;
public:
static constexpr auto NAME = "tombstone_gc";
tombstone_gc_extension() = default;
tombstone_gc_extension(const tombstone_gc_options& opts) : _tombstone_gc_options(opts) {}
explicit tombstone_gc_extension(std::map<seastar::sstring, seastar::sstring> tags) : _tombstone_gc_options(std::move(tags)) {}
explicit tombstone_gc_extension(const bytes& b) : _tombstone_gc_options(tombstone_gc_extension::deserialize(b)) {}
explicit tombstone_gc_extension(const seastar::sstring& s) {
throw std::logic_error("Cannot create tombstone_gc_extension info from string");
}
bytes serialize() const override {
return ser::serialize_to_buffer<bytes>(_tombstone_gc_options.to_map());
}
static std::map<seastar::sstring, seastar::sstring> deserialize(const bytes_view& buffer) {
return ser::deserialize_from_buffer(buffer, boost::type<std::map<seastar::sstring, seastar::sstring>>());
}
const tombstone_gc_options& get_options() const {
return _tombstone_gc_options;
}
};

90
tombstone_gc_options.cc Normal file
View File

@@ -0,0 +1,90 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tombstone_gc_options.hh"
#include "exceptions/exceptions.hh"
#include <boost/lexical_cast.hpp>
#include <seastar/core/sstring.hh>
#include <map>
#include "utils/rjson.hh"
tombstone_gc_options::tombstone_gc_options(const std::map<seastar::sstring, seastar::sstring>& map) {
for (const auto& x : map) {
if (x.first == "mode") {
if (x.second == "disabled") {
_mode = tombstone_gc_mode::disabled;
} else if (x.second == "repair") {
_mode = tombstone_gc_mode::repair;
} else if (x.second == "timeout") {
_mode = tombstone_gc_mode::timeout;
} else if (x.second == "immediate") {
_mode = tombstone_gc_mode::immediate;
} else {
throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option mode: {}", x.second));
}
} else if (x.first == "propagation_delay_in_seconds") {
try {
auto seconds = boost::lexical_cast<int64_t>(x.second);
if (seconds < 0) {
throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second));
}
_propagation_delay_in_seconds = std::chrono::seconds(seconds);
} catch (...) {
throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second));
}
} else {
throw exceptions::configuration_exception(format("Invalid tombstone_gc option: {}", x.first));
}
}
}
std::map<seastar::sstring, seastar::sstring> tombstone_gc_options::to_map() const {
std::map<seastar::sstring, seastar::sstring> res = {
{"mode", format("{}", _mode)},
{"propagation_delay_in_seconds", format("{}", _propagation_delay_in_seconds.count())},
};
return res;
}
seastar::sstring tombstone_gc_options::to_sstring() const {
return rjson::print(rjson::from_string_map(to_map()));
}
bool
tombstone_gc_options::operator==(const tombstone_gc_options& other) const {
return _mode == other._mode && _propagation_delay_in_seconds == other._propagation_delay_in_seconds;
}
bool
tombstone_gc_options::operator!=(const tombstone_gc_options& other) const {
return !(*this == other);
}
std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& mode) {
switch (mode) {
case tombstone_gc_mode::timeout: return os << "timeout";
case tombstone_gc_mode::disabled: return os << "disabled";
case tombstone_gc_mode::immediate: return os << "immediate";
case tombstone_gc_mode::repair: return os << "repair";
}
return os << "unknown";
}

47
tombstone_gc_options.hh Normal file
View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <map>
#include <chrono>
#include <seastar/core/sstring.hh>
enum class tombstone_gc_mode : uint8_t { timeout, disabled, immediate, repair };
class tombstone_gc_options {
private:
tombstone_gc_mode _mode = tombstone_gc_mode::timeout;
std::chrono::seconds _propagation_delay_in_seconds = std::chrono::seconds(3600);
public:
tombstone_gc_options() = default;
const tombstone_gc_mode& mode() const { return _mode; }
explicit tombstone_gc_options(const std::map<seastar::sstring, seastar::sstring>& map);
const std::chrono::seconds& propagation_delay_in_seconds() const {
return _propagation_delay_in_seconds;
}
std::map<seastar::sstring, seastar::sstring> to_map() const;
seastar::sstring to_sstring() const;
bool operator==(const tombstone_gc_options& other) const;
bool operator!=(const tombstone_gc_options& other) const;
};
std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& m);