sstable_loader: Decouple sstable streaming from selection

That will make it easy to introduce tablet-based load-and-stream.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2024-02-19 17:14:15 -03:00
parent 0a41f2a11f
commit d1db17d490

View File

@@ -140,44 +140,58 @@ public:
}
future<> stream(bool primary_replica_only);
private:
future<> stream_sstable_mutations(const dht::partition_range&, std::vector<sstables::shared_sstable>, bool primary_replica_only);
};
future<> sstable_streamer::stream(bool primary_replica_only) {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
const auto full_token_range = dht::token_range::make_open_ended_both_sides();
while (!_sstables.empty()) {
size_t batch_sst_nr = 16;
std::vector<sstables::shared_sstable> sst_processed;
while (batch_sst_nr-- && !_sstables.empty()) {
auto sst = _sstables.back();
sst_processed.push_back(sst);
_sstables.pop_back();
}
co_await stream_sstable_mutations(full_partition_range, std::move(sst_processed), primary_replica_only);
}
}
future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
auto s = _table.schema();
const auto cf_id = s->id();
const auto reason = streaming::stream_reason::repair;
size_t nr_sst_total = _sstables.size();
size_t nr_sst_current = 0;
while (!_sstables.empty()) {
// FIXME: indentation
auto ops_uuid = streaming::plan_id{utils::make_random_uuid()};
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
size_t batch_sst_nr = 16;
std::vector<sstring> sst_names;
std::vector<sstables::shared_sstable> sst_processed;
size_t estimated_partitions = 0;
while (batch_sst_nr-- && !_sstables.empty()) {
auto sst = _sstables.back();
estimated_partitions += sst->estimated_keys_for_range(full_token_range);
for (auto& sst : sstables) {
estimated_partitions += sst->estimated_keys_for_range(token_range);
sst_names.push_back(sst->get_filename());
sst_set->insert(sst);
sst_processed.push_back(sst);
_sstables.pop_back();
}
llog.info("load_and_stream: started ops_uuid={}, process [{}-{}] out of {} sstables={}",
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names);
ops_uuid, nr_sst_current, nr_sst_current + sstables.size(), nr_sst_total, sst_names);
auto start_time = std::chrono::steady_clock::now();
inet_address_vector_replica_set current_targets;
std::unordered_map<gms::inet_address, send_meta_data> metas;
size_t num_partitions_processed = 0;
size_t num_bytes_read = 0;
nr_sst_current += sst_processed.size();
nr_sst_current += sstables.size();
auto permit = co_await _db.obtain_reader_permit(_table, "sstables_loader::load_and_stream()", db::no_timeout, {});
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set, gc_clock::now()));
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), pr, sst_set, gc_clock::now()));
std::exception_ptr eptr;
bool failed = false;
@@ -231,7 +245,7 @@ future<> sstable_streamer::stream(bool primary_replica_only) {
}
if (!failed) {
try {
co_await coroutine::parallel_for_each(sst_processed, [&] (sstables::shared_sstable& sst) {
co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) {
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
ops_uuid, s->ks_name(), s->cf_name(), sst->component_filenames());
return sst->unlink();
@@ -256,7 +270,7 @@ future<> sstable_streamer::stream(bool primary_replica_only) {
if (failed) {
std::rethrow_exception(eptr);
}
}
co_return;
}