repair: pass the data stream through the compaction strategy's interposer consumer

This commit is contained in:
Botond Dénes
2019-05-20 14:53:17 +03:00
parent 9c2407573c
commit e3f4692868

View File

@@ -46,6 +46,7 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/gossiper.hh"
#include "repair/row_level.hh"
#include "mutation_source_metadata.hh"
extern logging::logger rlogger;
@@ -415,10 +416,16 @@ public:
[&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = t->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
auto consumer = cs.make_interposer_consumer(metadata,
[t = std::move(t), use_view_update_path, adjusted_estimated_partitions] (flat_mutation_reader reader) {
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_write_priority();
return sst->write_components(std::move(reader), std::max(1ul, estimated_partitions), s,
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
sstables::sstable_writer_config{}, encoding_stats{}, pc).then([sst] {
return sst->open_data();
}).then([t, sst] {
@@ -429,6 +436,8 @@ public:
}
return _view_update_generator->local().register_staging_sstable(sst, std::move(t));
});
});
return consumer(std::move(reader));
});
},
t.stream_in_progress());