compaction: do resharding through an interposer

Our resharding code is complex, since the compaction object has to keep
track of many output SSTables, the current shard being written.

When implementing TWCS streaming writers, we ran away from such
write-side complexity by implementing an interposer: the interposer
consumes the flat_mutation_reader stream, creating many different writer
streams. We can do a similar thing for resharding SSTables and have each
writer be guaranteed to contain keys for only a specific source shard.

As we do that, we can move the SSTable and sstable_writer information
to the compacting_sstable_writer object. The compaction object will no
longer be responsible for it and can be simplified, paving the way for
TWCS-major, which will go through an interposer as well.

Note that the compaction_writer, which now holds both the SSTable
pointer and the sstable_writer still needs to be optional. This is
because LCS (and potentially others) still want to create more than
one SSTable per source stream. That is done to guarantee that each
SSTable complies with the max_sstable_size parameter, which is
information available in the sstable_writer that is not present at
the level of the flat_mutation_reader. We want to keep it in the writer
side.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-03-05 09:06:44 -05:00
parent 8fe10863f4
commit 55a8b6e3c9

View File

@@ -68,6 +68,7 @@
#include "leveled_manifest.hh"
#include "utils/observable.hh"
#include "dht/token.hh"
#include "mutation_writer/shard_based_splitting_writer.hh"
namespace sstables {
@@ -124,18 +125,22 @@ static std::vector<shared_sstable> get_uncompacting_sstables(column_family& cf,
class compaction;
struct compaction_writer {
sstable_writer writer;
shared_sstable sst;
};
class compacting_sstable_writer {
compaction& _c;
sstable_writer* _writer = nullptr;
std::optional<compaction_writer> _writer = {};
public:
explicit compacting_sstable_writer(compaction& c) : _c(c) {}
explicit compacting_sstable_writer(compaction& c) : _c(c) { }
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t) { _writer->consume(t); }
stop_iteration consume(static_row&& sr, tombstone, bool) { return _writer->consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _writer->consume(std::move(cr)); }
stop_iteration consume(range_tombstone&& rt) { return _writer->consume(std::move(rt)); }
void consume(tombstone t) { _writer->writer.consume(t); }
stop_iteration consume(static_row&& sr, tombstone, bool) { return _writer->writer.consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _writer->writer.consume(std::move(cr)); }
stop_iteration consume(range_tombstone&& rt) { return _writer->writer.consume(std::move(rt)); }
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
@@ -431,11 +436,10 @@ protected:
}
}
void finish_new_sstable(std::optional<sstable_writer>& writer, shared_sstable& sst) {
writer->consume_end_of_stream();
writer = std::nullopt;
sst->open_data().get0();
_info->end_size += sst->bytes_on_disk();
void finish_new_sstable(compaction_writer* writer) {
writer->writer.consume_end_of_stream();
writer->sst->open_data().get0();
_info->end_size += writer->sst->bytes_on_disk();
// Notify GC'ed-data sstable writer's handler that an output sstable has just been sealed.
// The handler is responsible for making sure that deleting an input sstable will not
// result in resurrection on failure.
@@ -474,7 +478,11 @@ private:
// Default range sstable reader that will only return mutation that belongs to current shard.
virtual flat_mutation_reader make_sstable_reader() const = 0;
flat_mutation_reader setup() {
template <typename GCConsumer>
GCC6_CONCEPT(
requires CompactedFragmentsConsumer<GCConsumer>
)
future<> setup(GCConsumer gc_consumer) {
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_schema));
sstring formatted_msg = "[";
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
@@ -514,9 +522,25 @@ private:
report_start(formatted_msg);
_compacting = std::move(ssts);
return make_sstable_reader();
auto now = gc_clock::now();
auto consumer = make_interposer_consumer([this, gc_consumer = std::move(gc_consumer), now] (flat_mutation_reader reader) mutable
{
using compact_mutations = compact_for_compaction<compacting_sstable_writer, GCConsumer>;
auto cfc = make_stable_flattened_mutations_consumer<compact_mutations>(*schema(), now,
max_purgeable_func(),
get_compacting_sstable_writer(),
std::move(gc_consumer));
return seastar::async([cfc = std::move(cfc), reader = std::move(reader), this] () mutable {
reader.consume_in_thread(std::move(cfc), make_partition_filter(), db::no_timeout);
});
});
return consumer(make_sstable_reader());
}
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
auto ratio = double(_info->end_size) / double(_info->start_size);
@@ -525,6 +549,8 @@ private:
auto throughput = duration.count() > 0 ? (double(_info->end_size) / (1024*1024)) / duration.count() : double{};
sstring new_sstables_msg;
on_end_of_compaction();
for (auto& newtab : _info->new_sstables) {
new_sstables_msg += format("{}:level={:d}, ", newtab->get_filename(), newtab->get_sstable_level());
}
@@ -564,14 +590,16 @@ private:
};
}
virtual void on_new_partition() = 0;
virtual void on_end_of_compaction() = 0;
virtual shared_sstable create_new_sstable() const = 0;
// select a sstable writer based on decorated key.
virtual sstable_writer* select_sstable_writer(const dht::decorated_key& dk) = 0;
// create a writer based on decorated key.
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0;
// stop current writer
virtual void stop_sstable_writer() = 0;
// finish all writers.
virtual void finish_sstable_writer() = 0;
virtual void stop_sstable_writer(compaction_writer* writer) = 0;
compacting_sstable_writer get_compacting_sstable_writer() {
return compacting_sstable_writer(*this);
@@ -625,23 +653,30 @@ void compacting_sstable_writer::consume_new_partition(const dht::decorated_key&
// Compaction manager will catch this exception and re-schedule the compaction.
throw compaction_stop_exception(_c._info->ks_name, _c._info->cf_name, _c._info->stop_requested);
}
_writer = _c.select_sstable_writer(dk);
_writer->consume_new_partition(dk);
if (!_writer) {
_writer = _c.create_compaction_writer(dk);
}
_c.on_new_partition();
_writer->writer.consume_new_partition(dk);
_c._info->total_keys_written++;
}
stop_iteration compacting_sstable_writer::consume_end_of_partition() {
auto ret = _writer->consume_end_of_partition();
auto ret = _writer->writer.consume_end_of_partition();
if (ret == stop_iteration::yes) {
// stop sstable writer being currently used.
_c.stop_sstable_writer();
_c.stop_sstable_writer(&*_writer);
_writer = std::nullopt;
}
return ret;
}
void compacting_sstable_writer::consume_end_of_stream() {
// this will stop any writer opened by compaction.
_c.finish_sstable_writer();
if (_writer) {
_c.stop_sstable_writer(&*_writer);
_writer = std::nullopt;
}
}
void garbage_collected_sstable_writer::setup_on_new_sstable_sealed_handler() {
@@ -695,8 +730,6 @@ class regular_compaction : public compaction {
// used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys.
std::optional<sstable_set::incremental_selector> _selector;
// sstable being currently written.
shared_sstable _sst;
std::optional<sstable_writer> _writer;
std::optional<compaction_weight_registration> _weight_registration;
mutable compaction_read_monitor_generator _monitor_generator;
std::deque<compaction_write_monitor> _active_write_monitors = {};
@@ -728,6 +761,10 @@ public:
_monitor_generator);
}
reader_consumer make_interposer_consumer(reader_consumer end_consumer) override {
return std::move(end_consumer);
}
void report_start(const sstring& formatted_msg) const override {
clogger.info("Compacting {}", formatted_msg);
}
@@ -759,42 +796,37 @@ public:
return _sstable_creator(this_shard_id());
}
virtual sstable_writer* select_sstable_writer(const dht::decorated_key& dk) override {
if (!_writer) {
_sst = _sstable_creator(this_shard_id());
setup_new_sstable(_sst);
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
_active_write_monitors.emplace_back(_sst, _cf, maximum_timestamp(), _sstable_level);
auto&& priority = service::get_local_compaction_priority();
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &_active_write_monitors.back();
cfg.run_identifier = _run_identifier;
_writer.emplace(_sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), priority));
_active_write_monitors.emplace_back(sst, _cf, maximum_timestamp(), _sstable_level);
auto&& priority = service::get_local_compaction_priority();
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &_active_write_monitors.back();
cfg.run_identifier = _run_identifier;
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), priority), sst};
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
finish_new_sstable(writer);
maybe_replace_exhausted_sstables_by_sst(writer->sst);
}
do_pending_replacements();
return &*_writer;
}
virtual void stop_sstable_writer() override {
finish_new_sstable(_writer, _sst);
maybe_replace_exhausted_sstables();
void on_new_partition() override {
update_pending_ranges();
}
virtual void finish_sstable_writer() override {
on_end_of_stream();
if (_writer) {
stop_sstable_writer();
virtual void on_end_of_compaction() override {
if (_weight_registration) {
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
}
replace_remaining_exhausted_sstables();
}
private:
void on_end_of_stream() {
if (_weight_registration) {
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
}
}
void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
//
// Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
@@ -814,14 +846,14 @@ private:
_active_write_monitors.clear();
}
void maybe_replace_exhausted_sstables() {
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
if (!_contains_multi_fragment_runs) {
return;
}
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
auto not_exhausted = [s = _schema, &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) {
auto not_exhausted = [s = _schema, &dk = sst->get_last_decorated_key()] (shared_sstable& sst) {
return sst->get_last_decorated_key().tri_compare(*s, dk) > 0;
};
auto exhausted = std::partition(_sstables.begin(), _sstables.end(), not_exhausted);
@@ -849,7 +881,7 @@ private:
}
}
void do_pending_replacements() {
void update_pending_ranges() {
if (_set.all()->empty() || _info->pending_replacements.empty()) { // set can be empty for testing scenario.
return;
}
@@ -1208,6 +1240,13 @@ public:
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
}
reader_consumer make_interposer_consumer(reader_consumer end_consumer) override {
return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> {
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
};
}
void report_start(const sstring& formatted_msg) const override {
@@ -1226,37 +1265,26 @@ public:
abort();
}
sstable_writer* select_sstable_writer(const dht::decorated_key& dk) override {
_shard = dht::shard_of(*_schema, dk.token());
auto& sst = _output_sstables[_shard].first;
auto& writer = _output_sstables[_shard].second;
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shard = dht::shard_of(*_schema, dk.token());
auto sst = _sstable_creator(shard);
setup_new_sstable(sst);
if (!writer) {
sst = _sstable_creator(_shard);
setup_new_sstable(sst);
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
// sstables generated for a given shard will share the same run identifier.
cfg.run_identifier = _run_identifiers.at(_shard);
auto&& priority = service::get_local_compaction_priority();
writer.emplace(sst->get_writer(*_schema, partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard));
}
return &*writer;
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
// sstables generated for a given shard will share the same run identifier.
cfg.run_identifier = _run_identifiers.at(shard);
auto&& priority = service::get_local_compaction_priority();
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), priority, shard), sst};
}
void stop_sstable_writer() override {
auto& sst = _output_sstables[_shard].first;
auto& writer = _output_sstables[_shard].second;
void on_new_partition() override {}
finish_new_sstable(writer, sst);
}
virtual void on_end_of_compaction() override {}
void finish_sstable_writer() override {
for (auto& p : _output_sstables) {
if (p.second) {
finish_new_sstable(p.second, p.first);
}
void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
finish_new_sstable(writer);
}
}
};
@@ -1267,22 +1295,13 @@ GCC6_CONCEPT(
)
future<compaction_info> compaction::run(std::unique_ptr<compaction> c, GCConsumer gc_consumer) {
return seastar::async([c = std::move(c), gc_consumer = std::move(gc_consumer)] () mutable {
auto reader = c->setup();
auto cr = c->get_compacting_sstable_writer();
auto cfc = make_stable_flattened_mutations_consumer<compact_for_compaction<compacting_sstable_writer, GCConsumer>>(
*c->schema(), gc_clock::now(), c->max_purgeable_func(), std::move(cr), std::move(gc_consumer));
auto consumer = c->setup(std::move(gc_consumer));
auto start_time = db_clock::now();
try {
// make sure the readers are all gone before the compaction object is gone. We will
// leave this block either successfully or exceptionally with the reader object
// destroyed.
auto r = std::move(reader);
r.consume_in_thread(std::move(cfc), c->make_partition_filter(), db::no_timeout);
consumer.get();
} catch (...) {
c->delete_sstables_for_interrupted_compaction();
c = nullptr; // make sure writers are stopped while running in thread context
c = nullptr; // make sure writers are stopped while running in thread context. This is because of calls to file.close().get();
throw;
}