compaction: Merge GC writer into regular compaction writer

Turns out most of regular writer can be reused by GC writer, so let's
merge the latter into the former. We gain a lot of simplification,
lots of duplication is removed, and additionally, GC writer can now
be enabled with interposer as it can be created on demand by
each interposer consumer (will be done in a later patch).

Refs #6472.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20211119120841.164317-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-11-19 09:08:41 -03:00
committed by Botond Dénes
parent f8c91bdd1e
commit c94e6f8567

View File

@@ -273,10 +273,37 @@ struct compaction_writer {
class compacting_sstable_writer {
compaction& _c;
std::optional<compaction_writer> _compaction_writer = {};
using creator_func_t = std::function<compaction_writer(const dht::decorated_key&)>;
using stop_func_t = std::function<void(compaction_writer*)>;
creator_func_t _create_compaction_writer;
stop_func_t _stop_compaction_writer;
std::optional<utils::observer<>> _stop_request_observer;
bool _unclosed_partition = false;
private:
inline void maybe_abort_compaction();
utils::observer<> make_stop_request_observer(utils::observable<>& sro) {
return sro.observe([this] () mutable {
assert(!_unclosed_partition);
consume_end_of_stream();
});
}
public:
explicit compacting_sstable_writer(compaction& c) : _c(c) { }
explicit compacting_sstable_writer(compaction& c, creator_func_t cpw, stop_func_t scw)
: _c(c)
, _create_compaction_writer(std::move(cpw))
, _stop_compaction_writer(std::move(scw)) {
}
explicit compacting_sstable_writer(compaction& c, creator_func_t cpw, stop_func_t scw, utils::observable<>& sro)
: _c(c)
, _create_compaction_writer(std::move(cpw))
, _stop_compaction_writer(std::move(scw))
, _stop_request_observer(make_stop_request_observer(sro)) {
}
compacting_sstable_writer(compacting_sstable_writer&& other);
compacting_sstable_writer& operator=(const compacting_sstable_writer&) = delete;
compacting_sstable_writer(const compacting_sstable_writer&) = delete;
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t) { _compaction_writer->writer.consume(t); }
@@ -365,89 +392,6 @@ private:
std::unordered_map<int64_t, compaction_read_monitor> _generated_monitors;
};
// Writes a temporary sstable run containing only garbage collected data.
// Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well,
// right before there's an attempt to release exhausted sstables earlier.
// Generated sstables will be temporarily added to table to make sure that a compaction crash will not
// result in data resurrection.
// When compaction finishes, all the temporary sstables generated here will be deleted and removed
// from table's sstable set.
class garbage_collected_sstable_writer {
public:
// Data for GC writer is stored separately to allow compaction class to communicate directly
// with garbage_collected_sstable_writer which is moved into mutation_compaction, making it
// unreachable after the compaction process has started.
class data {
compaction* _c = nullptr;
// Garbage collected sstables that are sealed but were not added to SSTable set yet.
std::vector<shared_sstable> _unused_garbage_collected_sstables;
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
std::vector<shared_sstable> _used_garbage_collected_sstables;
std::optional<compaction_writer> _compaction_writer;
utils::UUID _run_identifier = utils::make_random_uuid();
sstable_writer& writer() {
return _compaction_writer->writer;
}
public:
explicit data(compaction& c) : _c(&c) {
}
data& operator=(const data&) = delete;
data(const data&) = delete;
void maybe_create_new_sstable_writer();
void finish_sstable_writer();
// Retrieves all unused garbage collected sstables that will be subsequently added
// to the SSTable set, and mark them as used.
std::vector<shared_sstable> consume_unused_garbage_collected_sstables() {
auto unused = std::exchange(_unused_garbage_collected_sstables, {});
_used_garbage_collected_sstables.insert(_used_garbage_collected_sstables.end(), unused.begin(), unused.end());
return unused;
}
const std::vector<shared_sstable>& used_garbage_collected_sstables() const {
return _used_garbage_collected_sstables;
}
friend class garbage_collected_sstable_writer;
};
private:
garbage_collected_sstable_writer::data* _data = nullptr;
sstable_writer& writer() {
return _data->writer();
}
public:
explicit garbage_collected_sstable_writer() = default;
explicit garbage_collected_sstable_writer(garbage_collected_sstable_writer::data& data) : _data(&data) {}
garbage_collected_sstable_writer& operator=(const garbage_collected_sstable_writer&) = delete;
garbage_collected_sstable_writer(const garbage_collected_sstable_writer&) = delete;
garbage_collected_sstable_writer(garbage_collected_sstable_writer&& other) = default;
garbage_collected_sstable_writer& operator=(garbage_collected_sstable_writer&& other) = default;
void consume_new_partition(const dht::decorated_key& dk) {
_data->maybe_create_new_sstable_writer();
writer().consume_new_partition(dk);
}
void consume(tombstone t) { writer().consume(t); }
stop_iteration consume(static_row&& sr, tombstone, bool) { return writer().consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return writer().consume(std::move(cr)); }
stop_iteration consume(range_tombstone&& rt) { return writer().consume(std::move(rt)); }
stop_iteration consume_end_of_partition() {
writer().consume_end_of_partition();
return stop_iteration::no;
}
void consume_end_of_stream() {
_data->finish_sstable_writer();
}
};
class formatted_sstables_list {
bool _include_origin = true;
std::vector<sstring> _ssts;
@@ -503,7 +447,6 @@ protected:
encoding_stats_collector _stats_collector;
bool _contains_multi_fragment_runs = false;
mutation_source_metadata _ms_metadata = {};
garbage_collected_sstable_writer::data _gc_sstable_writer_data;
compaction_sstable_replacer_fn _replacer;
utils::UUID _run_identifier;
::io_priority_class _io_priority;
@@ -512,6 +455,11 @@ protected:
// used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys.
std::optional<sstable_set::incremental_selector> _selector;
std::unordered_set<shared_sstable> _compacting_for_max_purgeable_func;
// Garbage collected sstables that are sealed but were not added to SSTable set yet.
std::vector<shared_sstable> _unused_garbage_collected_sstables;
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
std::vector<shared_sstable> _used_garbage_collected_sstables;
utils::observable<> _stop_request_observable;
private:
compaction_data& init_compaction_data(compaction_data& cdata, const compaction_descriptor& descriptor) const {
cdata.compaction_fan_in = descriptor.fan_in();
@@ -529,7 +477,6 @@ protected:
, _type(descriptor.options.type())
, _max_sstable_size(descriptor.max_sstable_bytes)
, _sstable_level(descriptor.level)
, _gc_sstable_writer_data(*this)
, _replacer(std::move(descriptor.replacer))
, _run_identifier(descriptor.run_identifier)
, _io_priority(descriptor.io_priority)
@@ -605,6 +552,51 @@ protected:
bool tombstone_expiration_enabled() const {
return bool(_sstable_set);
}
compaction_writer create_gc_compaction_writer() const {
auto sst = _sstable_creator(this_shard_id());
auto&& priority = _io_priority;
auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer("garbage_collection");
cfg.run_identifier = _run_identifier;
cfg.monitor = monitor.get();
auto writer = sst->get_writer(*schema(), partitions_per_sstable(), cfg, get_encoding_stats(), priority);
return compaction_writer(std::move(monitor), std::move(writer), std::move(sst));
}
void stop_gc_compaction_writer(compaction_writer* c_writer) {
c_writer->writer.consume_end_of_stream();
auto sst = c_writer->sst;
sst->open_data().get0();
_unused_garbage_collected_sstables.push_back(std::move(sst));
}
// Writes a temporary sstable run containing only garbage collected data.
// Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well,
// right before there's an attempt to release exhausted sstables earlier.
// Generated sstables will be temporarily added to table to make sure that a compaction crash will not
// result in data resurrection.
// When compaction finishes, all the temporary sstables generated here will be deleted and removed
// from table's sstable set.
compacting_sstable_writer get_gc_compacting_sstable_writer() {
return compacting_sstable_writer(*this,
[this] (const dht::decorated_key&) { return create_gc_compaction_writer(); },
[this] (compaction_writer* cw) { stop_gc_compaction_writer(cw); },
_stop_request_observable);
}
// Retrieves all unused garbage collected sstables that will be subsequently added
// to the SSTable set, and mark them as used.
std::vector<shared_sstable> consume_unused_garbage_collected_sstables() {
auto unused = std::exchange(_unused_garbage_collected_sstables, {});
_used_garbage_collected_sstables.insert(_used_garbage_collected_sstables.end(), unused.begin(), unused.end());
return unused;
}
const std::vector<shared_sstable>& used_garbage_collected_sstables() const {
return _used_garbage_collected_sstables;
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -622,9 +614,7 @@ private:
return _cf.get_compaction_strategy().make_sstable_set(_schema);
}
template <typename GCConsumer>
requires CompactedFragmentsConsumer<GCConsumer>
future<> setup(GCConsumer gc_consumer) {
void setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(make_sstable_set_for_input());
formatted_sstables_list formatted_msg;
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
@@ -673,17 +663,30 @@ private:
_ms_metadata.min_timestamp = timestamp_tracker.min();
_ms_metadata.max_timestamp = timestamp_tracker.max();
}
future<> consume() {
auto now = gc_clock::now();
auto consumer = make_interposer_consumer([this, gc_consumer = std::move(gc_consumer), now] (flat_mutation_reader reader) mutable
auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader reader) mutable
{
return seastar::async([this, reader = std::move(reader), gc_consumer = std::move(gc_consumer), now] () mutable {
return seastar::async([this, reader = std::move(reader), now] () mutable {
auto close_reader = deferred_close(reader);
using compact_mutations = compact_for_compaction<compacting_sstable_writer, GCConsumer>;
if (enable_garbage_collected_sstable_writer()) {
using compact_mutations = compact_for_compaction<compacting_sstable_writer, compacting_sstable_writer>;
auto cfc = make_stable_flattened_mutations_consumer<compact_mutations>(*schema(), now,
max_purgeable_func(),
get_compacting_sstable_writer(),
get_gc_compacting_sstable_writer());
reader.consume_in_thread(std::move(cfc));
return;
}
using compact_mutations = compact_for_compaction<compacting_sstable_writer, noop_compacted_fragments_consumer>;
auto cfc = make_stable_flattened_mutations_consumer<compact_mutations>(*schema(), now,
max_purgeable_func(),
get_compacting_sstable_writer(),
std::move(gc_consumer));
max_purgeable_func(),
get_compacting_sstable_writer(),
noop_compacted_fragments_consumer());
reader.consume_in_thread(std::move(cfc));
});
@@ -755,7 +758,9 @@ private:
virtual void stop_sstable_writer(compaction_writer* writer) = 0;
compacting_sstable_writer get_compacting_sstable_writer() {
return compacting_sstable_writer(*this);
return compacting_sstable_writer(*this,
[this] (const dht::decorated_key& dk) { return create_compaction_writer(dk); },
[this] (compaction_writer* cw) { stop_sstable_writer(cw); });
}
const schema_ptr& schema() const {
@@ -805,25 +810,27 @@ protected:
log(log_level::trace, std::move(fmt), std::forward<Args>(args)...);
}
public:
garbage_collected_sstable_writer make_garbage_collected_sstable_writer() {
return garbage_collected_sstable_writer(_gc_sstable_writer_data);
}
bool enable_garbage_collected_sstable_writer() const {
// FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously.
// More details can be found at https://github.com/scylladb/scylla/issues/6472
return _contains_multi_fragment_runs && !use_interposer_consumer();
}
template <typename GCConsumer = noop_compacted_fragments_consumer>
requires CompactedFragmentsConsumer<GCConsumer>
static future<compaction_result> run(std::unique_ptr<compaction> c, GCConsumer gc_consumer = GCConsumer());
static future<compaction_result> run(std::unique_ptr<compaction> c);
friend class compacting_sstable_writer;
friend class garbage_collected_sstable_writer;
friend class garbage_collected_sstable_writer::data;
};
compacting_sstable_writer::compacting_sstable_writer(compacting_sstable_writer&& other)
: _c(other._c)
, _compaction_writer(std::move(other._compaction_writer))
, _create_compaction_writer(std::move(other._create_compaction_writer))
, _stop_compaction_writer(std::move(other._stop_compaction_writer)) {
if (std::exchange(other._stop_request_observer, std::nullopt)) {
_stop_request_observer = make_stop_request_observer(_c._stop_request_observable);
}
}
void compacting_sstable_writer::maybe_abort_compaction() {
if (_c._cdata.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
@@ -834,19 +841,21 @@ void compacting_sstable_writer::maybe_abort_compaction() {
void compacting_sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
maybe_abort_compaction();
if (!_compaction_writer) {
_compaction_writer = _c.create_compaction_writer(dk);
_compaction_writer = _create_compaction_writer(dk);
}
_c.on_new_partition();
_compaction_writer->writer.consume_new_partition(dk);
_c._cdata.total_keys_written++;
_unclosed_partition = true;
}
stop_iteration compacting_sstable_writer::consume_end_of_partition() {
auto ret = _compaction_writer->writer.consume_end_of_partition();
_unclosed_partition = false;
if (ret == stop_iteration::yes) {
// stop sstable writer being currently used.
_c.stop_sstable_writer(&*_compaction_writer);
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
}
return ret;
@@ -854,35 +863,11 @@ stop_iteration compacting_sstable_writer::consume_end_of_partition() {
void compacting_sstable_writer::consume_end_of_stream() {
if (_compaction_writer) {
_c.stop_sstable_writer(&*_compaction_writer);
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
}
}
void garbage_collected_sstable_writer::data::maybe_create_new_sstable_writer() {
if (!_compaction_writer) {
auto sst = _c->_sstable_creator(this_shard_id());
auto&& priority = _c->_io_priority;
auto monitor = std::make_unique<compaction_write_monitor>(sst, _c->_cf, _c->maximum_timestamp(), _c->_sstable_level);
sstable_writer_config cfg = _c->_cf.get_sstables_manager().configure_writer("garbage_collection");
cfg.run_identifier = _run_identifier;
cfg.monitor = monitor.get();
auto writer = sst->get_writer(*_c->schema(), _c->partitions_per_sstable(), cfg, _c->get_encoding_stats(), priority);
_compaction_writer.emplace(std::move(monitor), std::move(writer), std::move(sst));
}
}
void garbage_collected_sstable_writer::data::finish_sstable_writer() {
if (_compaction_writer) {
writer().consume_end_of_stream();
auto sst = std::move(_compaction_writer->sst);
sst->open_data().get0();
_compaction_writer = std::nullopt;
_unused_garbage_collected_sstables.push_back(std::move(sst));
}
}
class reshape_compaction : public compaction {
public:
reshape_compaction(column_family& cf, compaction_descriptor descriptor, compaction_data& cdata)
@@ -1010,15 +995,17 @@ private:
_compacting->erase(sst);
});
// Make sure SSTable created by garbage collected writer is made available
// before exhausted SSTable is released, so as to prevent data resurrection.
_gc_sstable_writer_data.finish_sstable_writer();
// before exhausted SSTable is released, so to prevent data resurrection.
_stop_request_observable();
// Added Garbage collected SSTables to list of unused SSTables that will be added
// to SSTable set. GC SSTables should be added before compaction completes because
// a failure could result in data resurrection if data is not made available.
auto unused_gc_sstables = _gc_sstable_writer_data.consume_unused_garbage_collected_sstables();
auto unused_gc_sstables = consume_unused_garbage_collected_sstables();
_new_unused_sstables.insert(_new_unused_sstables.end(), unused_gc_sstables.begin(), unused_gc_sstables.end());
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
log_debug("Replacing earlier exhausted sstable(s) {} by new sstable {}", formatted_sstables_list(exhausted_ssts, false), sst->get_filename());
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
_monitor_generator.remove_exhausted_sstables(exhausted_ssts);
@@ -1026,13 +1013,13 @@ private:
}
void replace_remaining_exhausted_sstables() {
if (!_sstables.empty() || !_gc_sstable_writer_data.used_garbage_collected_sstables().empty()) {
if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) {
std::vector<shared_sstable> old_sstables;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables));
// Remove Garbage Collected SSTables from the SSTable set if any was previously added.
auto& used_garbage_collected_sstables = _gc_sstable_writer_data.used_garbage_collected_sstables();
old_sstables.insert(old_sstables.end(), used_garbage_collected_sstables.begin(), used_garbage_collected_sstables.end());
auto& used_gc_sstables = used_garbage_collected_sstables();
old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end());
_replacer(get_compaction_completion_desc(std::move(old_sstables), std::move(_new_unused_sstables)));
}
@@ -1565,11 +1552,11 @@ public:
}
};
template <typename GCConsumer>
requires CompactedFragmentsConsumer<GCConsumer>
future<compaction_result> compaction::run(std::unique_ptr<compaction> c, GCConsumer gc_consumer) {
return seastar::async([c = std::move(c), gc_consumer = std::move(gc_consumer)] () mutable {
auto consumer = c->setup(std::move(gc_consumer));
future<compaction_result> compaction::run(std::unique_ptr<compaction> c) {
return seastar::async([c = std::move(c)] () mutable {
c->setup();
auto consumer = c->consume();
auto start_time = db_clock::now();
try {
consumer.get();
@@ -1716,12 +1703,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, compaction_data& cd
// Bypass the usual compaction machinery for dry-mode scrub
return scrub_sstables_validate_mode(std::move(descriptor), cdata, cf);
}
auto c = make_compaction(cf, std::move(descriptor), cdata);
if (c->enable_garbage_collected_sstable_writer()) {
auto gc_writer = c->make_garbage_collected_sstable_writer();
return compaction::run(std::move(c), std::move(gc_writer));
}
return compaction::run(std::move(c));
return compaction::run(make_compaction(cf, std::move(descriptor), cdata));
}
std::unordered_set<sstables::shared_sstable>