mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge "compaction: Update backlog tracker correctly when schema is updated" from Raphael
" Backlog tracker isn't updated correctly when facing a schema change, and may leak a SSTable if compaction strategy is changed, which causes backlog to be computed incorrectly. Most of these problems happen because sstable set and tracker are updated independently, so it could happen that tracker lose track (pun intended) of changes applied to set. The first patch will fix the leak when strategy is changed, and the third patch will make sure that tracker is updated atomically with sstable set, so these kind of problems will not happen anymore. Fixes #9157 " * 'fixes_to_backlog_tracker_v4' of github.com:raphaelsc/scylla: compaction: Update backlog tracker correctly when schema is updated compaction: Don't leak backlog of input sstable when compaction strategy is changed compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables() compaction: simplify removal of monitors
This commit is contained in:
@@ -344,10 +344,8 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
|
||||
return _last_position_seen;
|
||||
}
|
||||
|
||||
void remove_sstable(bool is_tracking) {
|
||||
if (is_tracking && _sst) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
|
||||
} else if (_sst) {
|
||||
void remove_sstable() {
|
||||
if (_sst) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
|
||||
}
|
||||
_sst = {};
|
||||
@@ -368,30 +366,24 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
|
||||
};
|
||||
|
||||
virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
|
||||
_generated_monitors.emplace_back(std::move(sst), _cf);
|
||||
return _generated_monitors.back();
|
||||
auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _cf));
|
||||
return p.first->second;
|
||||
}
|
||||
|
||||
explicit compaction_read_monitor_generator(column_family& cf)
|
||||
: _cf(cf) {}
|
||||
|
||||
void remove_sstables(bool is_tracking) {
|
||||
for (auto& rm : _generated_monitors) {
|
||||
rm.remove_sstable(is_tracking);
|
||||
}
|
||||
}
|
||||
|
||||
void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
|
||||
for (auto& rm : _generated_monitors) {
|
||||
if (rm._sst == sst) {
|
||||
rm.remove_sstable(is_tracking);
|
||||
break;
|
||||
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
for (auto& sst : exhausted_sstables) {
|
||||
auto it = _generated_monitors.find(sst->generation());
|
||||
if (it != _generated_monitors.end()) {
|
||||
it->second.remove_sstable();
|
||||
}
|
||||
}
|
||||
}
|
||||
private:
|
||||
column_family& _cf;
|
||||
std::deque<compaction_read_monitor> _generated_monitors;
|
||||
std::unordered_map<int64_t, compaction_read_monitor> _generated_monitors;
|
||||
};
|
||||
|
||||
// Writes a temporary sstable run containing only garbage collected data.
|
||||
@@ -751,8 +743,6 @@ private:
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
|
||||
_info->total_partitions, _info->total_keys_written);
|
||||
|
||||
backlog_tracker_adjust_charges();
|
||||
|
||||
auto info = std::move(_info);
|
||||
_cf.get_compaction_manager().deregister_compaction(info);
|
||||
return std::move(*info);
|
||||
@@ -760,7 +750,6 @@ private:
|
||||
|
||||
virtual std::string_view report_start_desc() const = 0;
|
||||
virtual std::string_view report_finish_desc() const = 0;
|
||||
virtual void backlog_tracker_adjust_charges() { };
|
||||
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
|
||||
if (!tombstone_expiration_enabled()) {
|
||||
@@ -962,7 +951,6 @@ public:
|
||||
class regular_compaction : public compaction {
|
||||
// sstable being currently written.
|
||||
mutable compaction_read_monitor_generator _monitor_generator;
|
||||
std::vector<shared_sstable> _unused_sstables = {};
|
||||
public:
|
||||
regular_compaction(column_family& cf, compaction_descriptor descriptor)
|
||||
: compaction(cf, std::move(descriptor))
|
||||
@@ -990,19 +978,9 @@ public:
|
||||
return "Compacted";
|
||||
}
|
||||
|
||||
void backlog_tracker_adjust_charges() override {
|
||||
_monitor_generator.remove_sstables(_info->tracking);
|
||||
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
|
||||
for (auto& sst : _unused_sstables) {
|
||||
tracker.add_sstable(sst);
|
||||
}
|
||||
_unused_sstables.clear();
|
||||
}
|
||||
|
||||
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
|
||||
auto sst = _sstable_creator(this_shard_id());
|
||||
setup_new_sstable(sst);
|
||||
_unused_sstables.push_back(sst);
|
||||
|
||||
auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
|
||||
sstable_writer_config cfg = make_sstable_writer_config(_info->type);
|
||||
@@ -1031,26 +1009,6 @@ public:
|
||||
_monitor_generator(std::move(sstable));
|
||||
}
|
||||
private:
|
||||
void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
|
||||
//
|
||||
// Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
|
||||
// Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the
|
||||
// new sstables, which replaced the exhausted ones, are not partially written sstables and they can
|
||||
// be added to tracker like any other regular sstable in the table's set.
|
||||
// This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's
|
||||
// an early sstable replacement.
|
||||
//
|
||||
|
||||
for (auto& sst : exhausted_sstables) {
|
||||
_monitor_generator.remove_sstable(_info->tracking, sst);
|
||||
}
|
||||
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
|
||||
for (auto& sst : _unused_sstables) {
|
||||
tracker.add_sstable(sst);
|
||||
}
|
||||
_unused_sstables.clear();
|
||||
}
|
||||
|
||||
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
|
||||
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
|
||||
// meaning incremental compaction is disabled for this compaction.
|
||||
@@ -1083,7 +1041,7 @@ private:
|
||||
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
|
||||
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
|
||||
_sstables.erase(exhausted, _sstables.end());
|
||||
backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
|
||||
_monitor_generator.remove_exhausted_sstables(exhausted_ssts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1564,8 +1522,6 @@ public:
|
||||
return "Resharded";
|
||||
}
|
||||
|
||||
void backlog_tracker_adjust_charges() override { }
|
||||
|
||||
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
|
||||
auto shard = dht::shard_of(*_schema, dk.token());
|
||||
auto sst = _sstable_creator(shard);
|
||||
|
||||
@@ -66,7 +66,6 @@ namespace sstables {
|
||||
int64_t ended_at;
|
||||
std::vector<shared_sstable> new_sstables;
|
||||
sstring stop_requested;
|
||||
bool tracking = true;
|
||||
utils::UUID run_identifier;
|
||||
utils::UUID compaction_uuid;
|
||||
struct replacement {
|
||||
@@ -82,10 +81,6 @@ namespace sstables {
|
||||
void stop(sstring reason) {
|
||||
stop_requested = std::move(reason);
|
||||
}
|
||||
|
||||
void stop_tracking() {
|
||||
tracking = false;
|
||||
}
|
||||
};
|
||||
|
||||
// Compact a list of N sstables into M sstables.
|
||||
|
||||
@@ -940,14 +940,6 @@ future<> compaction_manager::remove(column_family* cf) {
|
||||
});
|
||||
}
|
||||
|
||||
void compaction_manager::stop_tracking_ongoing_compactions(column_family* cf) {
|
||||
for (auto& info : _compactions) {
|
||||
if (info->cf == cf) {
|
||||
info->stop_tracking();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_manager::stop_compaction(sstring type) {
|
||||
sstables::compaction_type target_type;
|
||||
try {
|
||||
|
||||
@@ -241,10 +241,6 @@ public:
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
// No longer interested in tracking backlog for compactions in this column
|
||||
// family. For instance, we could be ALTERing TABLE to a different strategy.
|
||||
void stop_tracking_ongoing_compactions(column_family* cf);
|
||||
|
||||
const stats& get_stats() const {
|
||||
return _stats;
|
||||
}
|
||||
|
||||
@@ -546,6 +546,8 @@ private:
|
||||
void add_maintenance_sstable(sstables::shared_sstable sst);
|
||||
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
|
||||
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
|
||||
// Update compaction backlog tracker with the same changes applied to the underlying sstable set.
|
||||
void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
|
||||
// Caller must keep m alive.
|
||||
|
||||
17
table.cc
17
table.cc
@@ -337,6 +337,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
|
||||
tracker.remove_sstable(std::move(sstable));
|
||||
}
|
||||
|
||||
void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
auto& tracker = _compaction_strategy.get_backlog_tracker();
|
||||
for (auto& sst : new_sstables) {
|
||||
tracker.add_sstable(sst);
|
||||
}
|
||||
for (auto& sst : old_sstables) {
|
||||
tracker.remove_sstable(sst);
|
||||
}
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set>
|
||||
table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
|
||||
enable_backlog_tracker backlog_tracker) {
|
||||
@@ -784,6 +794,8 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
|
||||
_t._main_sstables = std::move(_new_main_list);
|
||||
_t._maintenance_sstables = std::move(_new_maintenance_list);
|
||||
_t.refresh_compound_sstable_set();
|
||||
// Input sstables aren't not removed from backlog tracker because they come from the maintenance set.
|
||||
_t.backlog_tracker_adjust_charges({}, _new_main);
|
||||
}
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
|
||||
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
|
||||
@@ -855,6 +867,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
virtual void execute() override {
|
||||
_t._main_sstables = std::move(_new_sstables);
|
||||
_t.refresh_compound_sstable_set();
|
||||
_t.backlog_tracker_adjust_charges(_desc.old_sstables, _desc.new_sstables);
|
||||
}
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) {
|
||||
return std::make_unique<sstable_list_updater>(t, std::move(permit), d);
|
||||
@@ -1037,10 +1050,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
new_sstables.insert(s);
|
||||
});
|
||||
|
||||
if (!move_read_charges) {
|
||||
_compaction_manager.stop_tracking_ongoing_compactions(this);
|
||||
}
|
||||
|
||||
// now exception safe:
|
||||
_compaction_strategy = std::move(new_cs);
|
||||
_main_sstables = std::move(new_sstables);
|
||||
|
||||
@@ -2945,11 +2945,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
|
||||
SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
cell_locker_stats cl_stats;
|
||||
|
||||
auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
|
||||
auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
auto s = builder.build();
|
||||
@@ -2991,15 +2991,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
|
||||
|
||||
auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen);
|
||||
|
||||
bool stopped_tracking = false;
|
||||
for (auto& info : cf._data->cm.get_compactions()) {
|
||||
if (info->cf == &*cf) {
|
||||
info->stop_tracking();
|
||||
stopped_tracking = true;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE(stopped_tracking);
|
||||
|
||||
// set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
@@ -3007,7 +2999,6 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
|
||||
|
||||
auto ret = fut.get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
BOOST_REQUIRE(ret.tracking == false);
|
||||
}
|
||||
// triggers code that iterates through registered compactions.
|
||||
cf._data->cm.backlog();
|
||||
|
||||
Reference in New Issue
Block a user