diff --git a/repair/row_level.cc b/repair/row_level.cc index f38be44810..1dbb3b51d3 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -46,6 +46,7 @@ #include "gms/i_endpoint_state_change_subscriber.hh" #include "gms/gossiper.hh" #include "repair/row_level.hh" +#include "mutation_source_metadata.hh" extern logging::logger rlogger; @@ -415,10 +416,16 @@ public: [&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) { auto& t = db.local().find_column_family(reader.schema()); return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable { + //FIXME: for better estimations this should be transmitted from remote + auto metadata = mutation_source_metadata{}; + auto& cs = t->get_compaction_strategy(); + const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); + auto consumer = cs.make_interposer_consumer(metadata, + [t = std::move(t), use_view_update_path, adjusted_estimated_partitions] (flat_mutation_reader reader) { sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write(); schema_ptr s = reader.schema(); auto& pc = service::get_local_streaming_write_priority(); - return sst->write_components(std::move(reader), std::max(1ul, estimated_partitions), s, + return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s, sstables::sstable_writer_config{}, encoding_stats{}, pc).then([sst] { return sst->open_data(); }).then([t, sst] { @@ -429,6 +436,8 @@ public: } return _view_update_generator->local().register_staging_sstable(sst, std::move(t)); }); + }); + return consumer(std::move(reader)); }); }, t.stream_in_progress());