sstable_loader: Decouple sstable streaming from selection
That will make it easy to introduce tablet-based load-and-stream. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -140,44 +140,58 @@ public:
|
||||
}
|
||||
|
||||
future<> stream(bool primary_replica_only);
|
||||
private:
|
||||
future<> stream_sstable_mutations(const dht::partition_range&, std::vector<sstables::shared_sstable>, bool primary_replica_only);
|
||||
};
|
||||
|
||||
future<> sstable_streamer::stream(bool primary_replica_only) {
|
||||
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
|
||||
const auto full_token_range = dht::token_range::make_open_ended_both_sides();
|
||||
|
||||
while (!_sstables.empty()) {
|
||||
size_t batch_sst_nr = 16;
|
||||
std::vector<sstables::shared_sstable> sst_processed;
|
||||
while (batch_sst_nr-- && !_sstables.empty()) {
|
||||
auto sst = _sstables.back();
|
||||
sst_processed.push_back(sst);
|
||||
_sstables.pop_back();
|
||||
}
|
||||
|
||||
co_await stream_sstable_mutations(full_partition_range, std::move(sst_processed), primary_replica_only);
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
|
||||
bool primary_replica_only) {
|
||||
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
|
||||
auto s = _table.schema();
|
||||
const auto cf_id = s->id();
|
||||
const auto reason = streaming::stream_reason::repair;
|
||||
|
||||
size_t nr_sst_total = _sstables.size();
|
||||
size_t nr_sst_current = 0;
|
||||
while (!_sstables.empty()) {
|
||||
|
||||
// FIXME: indentation
|
||||
auto ops_uuid = streaming::plan_id{utils::make_random_uuid()};
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
size_t batch_sst_nr = 16;
|
||||
std::vector<sstring> sst_names;
|
||||
std::vector<sstables::shared_sstable> sst_processed;
|
||||
size_t estimated_partitions = 0;
|
||||
while (batch_sst_nr-- && !_sstables.empty()) {
|
||||
auto sst = _sstables.back();
|
||||
estimated_partitions += sst->estimated_keys_for_range(full_token_range);
|
||||
for (auto& sst : sstables) {
|
||||
estimated_partitions += sst->estimated_keys_for_range(token_range);
|
||||
sst_names.push_back(sst->get_filename());
|
||||
sst_set->insert(sst);
|
||||
sst_processed.push_back(sst);
|
||||
_sstables.pop_back();
|
||||
}
|
||||
|
||||
llog.info("load_and_stream: started ops_uuid={}, process [{}-{}] out of {} sstables={}",
|
||||
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names);
|
||||
ops_uuid, nr_sst_current, nr_sst_current + sstables.size(), nr_sst_total, sst_names);
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
inet_address_vector_replica_set current_targets;
|
||||
std::unordered_map<gms::inet_address, send_meta_data> metas;
|
||||
size_t num_partitions_processed = 0;
|
||||
size_t num_bytes_read = 0;
|
||||
nr_sst_current += sst_processed.size();
|
||||
nr_sst_current += sstables.size();
|
||||
auto permit = co_await _db.obtain_reader_permit(_table, "sstables_loader::load_and_stream()", db::no_timeout, {});
|
||||
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set, gc_clock::now()));
|
||||
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), pr, sst_set, gc_clock::now()));
|
||||
std::exception_ptr eptr;
|
||||
bool failed = false;
|
||||
|
||||
@@ -231,7 +245,7 @@ future<> sstable_streamer::stream(bool primary_replica_only) {
|
||||
}
|
||||
if (!failed) {
|
||||
try {
|
||||
co_await coroutine::parallel_for_each(sst_processed, [&] (sstables::shared_sstable& sst) {
|
||||
co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) {
|
||||
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
|
||||
ops_uuid, s->ks_name(), s->cf_name(), sst->component_filenames());
|
||||
return sst->unlink();
|
||||
@@ -256,7 +270,7 @@ future<> sstable_streamer::stream(bool primary_replica_only) {
|
||||
if (failed) {
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user