sstables_loader: Track download progress of download_task_impl

Previously, the progress of download_task_impl launched by the "restore" API
was not tracked. Since restore operations can involve large data transfers,
this makes it difficult for users to monitor progress.

The restore process happens in two sequential steps:
1. Open specified SSTables from object storage
2. Download and stream mutation fragments from the opened SSTables to
   mapped destinations

While both steps contribute to overall progress, they use different units
of measurement, making a unified progress metric challenging. Because
the load-and-stream step (step 2) is the largest time-consuming part of the
restore. This change implements progress tracking for this step as an
initial improvement to provide users with partial visibility into the
restore operation.

Fixes scylladb/scylladb#21427
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2024-11-13 14:31:49 +08:00
parent e57f674066
commit 5ab4932f34
3 changed files with 45 additions and 18 deletions

View File

@@ -153,12 +153,12 @@ public:
virtual ~sstable_streamer() {}
virtual future<> stream();
virtual future<> stream(std::function<void(unsigned)> on_streamed);
inet_address_vector_replica_set get_endpoints(const dht::token& token) const;
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>);
protected:
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const;
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>);
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, std::function<void(unsigned)> on_streamed);
};
class tablet_sstable_streamer : public sstable_streamer {
@@ -169,7 +169,7 @@ public:
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
}
virtual future<> stream() override;
virtual future<> stream(std::function<void(unsigned)> on_streamed) override;
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const override;
private:
@@ -183,9 +183,9 @@ private:
return result;
}
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, std::function<void(unsigned)> on_streamed) {
// FIXME: fully contained sstables can be optimized.
return stream_sstables(pr, std::move(sstables));
return stream_sstables(pr, std::move(sstables), std::move(on_streamed));
}
};
@@ -211,13 +211,13 @@ inet_address_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(c
return to_replica_set(replicas);
}
future<> sstable_streamer::stream() {
future<> sstable_streamer::stream(std::function<void(unsigned)> on_streamed) {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
co_await stream_sstables(full_partition_range, std::move(_sstables));
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(on_streamed));
}
future<> tablet_sstable_streamer::stream() {
future<> tablet_sstable_streamer::stream(std::function<void(unsigned)> on_streamed) {
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
@@ -257,12 +257,12 @@ future<> tablet_sstable_streamer::stream() {
}
auto tablet_pr = dht::to_partition_range(tablet_range);
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained));
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained));
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), on_streamed);
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), on_streamed);
}
}
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, std::function<void(unsigned)> on_streamed) {
size_t nr_sst_total = _sstables.size();
size_t nr_sst_current = 0;
@@ -280,6 +280,9 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
fmt::join(sst_processed | boost::adaptors::transformed([] (auto sst) { return sst->get_filename(); }), ", "));
nr_sst_current += sst_processed.size();
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed));
if (on_streamed) {
std::invoke(on_streamed, batch_sst_nr);
}
}
}
@@ -389,14 +392,15 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink) {
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink,
std::function<void(unsigned)> on_streamed) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(sstables),
primary_replica_only(primary), unlink_sstables(unlink));
co_await streamer->stream();
co_await streamer->stream(on_streamed);
}
// For more details, see distributed_loader::process_upload_dir().
@@ -432,7 +436,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
};
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name, cfg);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true);
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, {});
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
@@ -457,6 +461,7 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
sstring _cf;
sstring _prefix;
std::vector<sstring> _sstables;
std::vector<unsigned> _num_sstables_processed;
protected:
virtual future<> run() override;
@@ -473,7 +478,10 @@ public:
, _cf(std::move(cf))
, _prefix(std::move(prefix))
, _sstables(std::move(sstables))
{}
, _num_sstables_processed(smp::count)
{
_status.progress_units = "sstables";
}
virtual std::string type() const override {
return "download_sstables";
@@ -486,6 +494,16 @@ public:
virtual tasks::is_user_task is_user_task() const noexcept override {
return tasks::is_user_task::yes;
}
virtual future<tasks::task_manager::task::progress> get_progress() const override {
llog.debug("get_progress: {}", _num_sstables_processed);
unsigned processed = co_await _loader.map_reduce(adder<unsigned>(), [this] (auto&) {
return _num_sstables_processed[this_shard_id()];
});
co_return tasks::task_manager::task::progress {
.completed = processed,
.total = _sstables.size(),
};
}
};
future<> sstables_loader::download_task_impl::run() {
@@ -498,7 +516,10 @@ future<> sstables_loader::download_task_impl::run() {
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg);
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false);
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false,
[this] (unsigned num_streamed) {
_num_sstables_processed[this_shard_id()] += num_streamed;
});
});
}

View File

@@ -58,7 +58,8 @@ private:
future<> load_and_stream(sstring ks_name, sstring cf_name,
table_id, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only, bool unlink_sstables);
bool primary_replica_only, bool unlink_sstables,
std::function<void(unsigned)> on_streamed);
public:
sstables_loader(sharded<replica::database>& db,

View File

@@ -271,7 +271,12 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
print('Try to restore')
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
assert status is not None
assert status['state'] == 'done'
assert status['progress_units'] == "sstables"
assert status['progress_completed'] == len(toc_names)
assert status['progress_total'] == len(toc_names)
print('Check that sstables came back')
files = list_sstables()
assert len(files) > 0