repair: Use local references in create_writer()

The repair_writer::create_writer() method needs sys-dist-ks
and view-update-generator. It's only called from repair_meta
which already has both.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-05-13 15:08:30 +03:00
parent 394acdc139
commit e748e16352

View File

@@ -584,7 +584,7 @@ public:
return sstables::offstrategy(operations_supported.contains(reason));
}
void create_writer(sharded<database>& db) {
void create_writer(sharded<database>& db, sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<db::view::view_update_generator>& view_update_gen) {
if (_writer_done) {
return;
}
@@ -593,16 +593,16 @@ public:
_mq = std::move(queue_handle);
auto writer = shared_from_this();
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
[&db, &sys_dist_ks, &view_update_gen, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader), reason] (bool use_view_update_path) mutable {
return db::view::check_needs_view_update_path(sys_dist_ks.local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader), reason, &view_update_gen] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = t->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
sstables::offstrategy offstrategy = is_offstrategy_supported(reason);
auto consumer = cs.make_interposer_consumer(metadata,
[t = std::move(t), use_view_update_path, adjusted_estimated_partitions, offstrategy] (flat_mutation_reader reader) {
[t = std::move(t), &view_update_gen, use_view_update_path, adjusted_estimated_partitions, offstrategy] (flat_mutation_reader reader) {
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
@@ -612,11 +612,11 @@ public:
return sst->open_data();
}).then([t, sst, offstrategy] {
return t->add_sstable_and_update_cache(sst, offstrategy);
}).then([t, s, sst, use_view_update_path]() mutable -> future<> {
}).then([t, s, sst, use_view_update_path, &view_update_gen]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return _view_update_generator->local().register_staging_sstable(sst, std::move(t));
return view_update_gen.local().register_staging_sstable(sst, std::move(t));
});
});
return consumer(std::move(reader));
@@ -1386,7 +1386,7 @@ private:
future<> do_apply_rows(std::list<repair_row>&& row_diff, update_working_row_buf update_buf) {
return do_with(std::move(row_diff), [this, update_buf] (std::list<repair_row>& row_diff) {
return with_semaphore(_repair_writer->sem(), 1, [this, update_buf, &row_diff] {
_repair_writer->create_writer(_db);
_repair_writer->create_writer(_db, _sys_dist_ks, _view_update_generator);
return repeat([this, update_buf, &row_diff] () mutable {
if (row_diff.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -1456,7 +1456,7 @@ public:
if (!r.dirty_on_master()) {
continue;
}
_repair_writer->create_writer(_db);
_repair_writer->create_writer(_db, _sys_dist_ks, _view_update_generator);
auto mf = r.get_mutation_fragment_ptr();
const auto& dk = r.get_dk_with_hash()->dk;
if (last_mf && last_dk &&