sstables_loader: Track download progress of download_task_impl
Previously, the progress of download_task_impl launched by the "restore" API was not tracked. Since restore operations can involve large data transfers, this makes it difficult for users to monitor progress. The restore process happens in two sequential steps: 1. Open specified SSTables from object storage 2. Download and stream mutation fragments from the opened SSTables to mapped destinations While both steps contribute to overall progress, they use different units of measurement, making a unified progress metric challenging. Because the load-and-stream step (step 2) is the largest time-consuming part of the restore. This change implements progress tracking for this step as an initial improvement to provide users with partial visibility into the restore operation. Fixes scylladb/scylladb#21427 Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
@@ -153,12 +153,12 @@ public:
|
||||
|
||||
virtual ~sstable_streamer() {}
|
||||
|
||||
virtual future<> stream();
|
||||
virtual future<> stream(std::function<void(unsigned)> on_streamed);
|
||||
inet_address_vector_replica_set get_endpoints(const dht::token& token) const;
|
||||
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>);
|
||||
protected:
|
||||
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const;
|
||||
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>);
|
||||
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, std::function<void(unsigned)> on_streamed);
|
||||
};
|
||||
|
||||
class tablet_sstable_streamer : public sstable_streamer {
|
||||
@@ -169,7 +169,7 @@ public:
|
||||
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
|
||||
}
|
||||
|
||||
virtual future<> stream() override;
|
||||
virtual future<> stream(std::function<void(unsigned)> on_streamed) override;
|
||||
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const override;
|
||||
|
||||
private:
|
||||
@@ -183,9 +183,9 @@ private:
|
||||
return result;
|
||||
}
|
||||
|
||||
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
|
||||
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, std::function<void(unsigned)> on_streamed) {
|
||||
// FIXME: fully contained sstables can be optimized.
|
||||
return stream_sstables(pr, std::move(sstables));
|
||||
return stream_sstables(pr, std::move(sstables), std::move(on_streamed));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -211,13 +211,13 @@ inet_address_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(c
|
||||
return to_replica_set(replicas);
|
||||
}
|
||||
|
||||
future<> sstable_streamer::stream() {
|
||||
future<> sstable_streamer::stream(std::function<void(unsigned)> on_streamed) {
|
||||
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
|
||||
|
||||
co_await stream_sstables(full_partition_range, std::move(_sstables));
|
||||
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(on_streamed));
|
||||
}
|
||||
|
||||
future<> tablet_sstable_streamer::stream() {
|
||||
future<> tablet_sstable_streamer::stream(std::function<void(unsigned)> on_streamed) {
|
||||
// sstables are sorted by first key in reverse order.
|
||||
auto sstable_it = _sstables.rbegin();
|
||||
|
||||
@@ -257,12 +257,12 @@ future<> tablet_sstable_streamer::stream() {
|
||||
}
|
||||
|
||||
auto tablet_pr = dht::to_partition_range(tablet_range);
|
||||
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained));
|
||||
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained));
|
||||
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), on_streamed);
|
||||
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), on_streamed);
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
|
||||
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, std::function<void(unsigned)> on_streamed) {
|
||||
size_t nr_sst_total = _sstables.size();
|
||||
size_t nr_sst_current = 0;
|
||||
|
||||
@@ -280,6 +280,9 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
|
||||
fmt::join(sst_processed | boost::adaptors::transformed([] (auto sst) { return sst->get_filename(); }), ", "));
|
||||
nr_sst_current += sst_processed.size();
|
||||
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed));
|
||||
if (on_streamed) {
|
||||
std::invoke(on_streamed, batch_sst_nr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -389,14 +392,15 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
|
||||
}
|
||||
|
||||
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink) {
|
||||
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink,
|
||||
std::function<void(unsigned)> on_streamed) {
|
||||
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
|
||||
// throughout its lifetime.
|
||||
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
|
||||
_messaging, _db.local(), table_id, std::move(sstables),
|
||||
primary_replica_only(primary), unlink_sstables(unlink));
|
||||
|
||||
co_await streamer->stream();
|
||||
co_await streamer->stream(on_streamed);
|
||||
}
|
||||
|
||||
// For more details, see distributed_loader::process_upload_dir().
|
||||
@@ -432,7 +436,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
};
|
||||
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name, cfg);
|
||||
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (sstables_loader& loader) mutable -> future<> {
|
||||
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true);
|
||||
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, {});
|
||||
});
|
||||
} else {
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
|
||||
@@ -457,6 +461,7 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
|
||||
sstring _cf;
|
||||
sstring _prefix;
|
||||
std::vector<sstring> _sstables;
|
||||
std::vector<unsigned> _num_sstables_processed;
|
||||
|
||||
protected:
|
||||
virtual future<> run() override;
|
||||
@@ -473,7 +478,10 @@ public:
|
||||
, _cf(std::move(cf))
|
||||
, _prefix(std::move(prefix))
|
||||
, _sstables(std::move(sstables))
|
||||
{}
|
||||
, _num_sstables_processed(smp::count)
|
||||
{
|
||||
_status.progress_units = "sstables";
|
||||
}
|
||||
|
||||
virtual std::string type() const override {
|
||||
return "download_sstables";
|
||||
@@ -486,6 +494,16 @@ public:
|
||||
virtual tasks::is_user_task is_user_task() const noexcept override {
|
||||
return tasks::is_user_task::yes;
|
||||
}
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
llog.debug("get_progress: {}", _num_sstables_processed);
|
||||
unsigned processed = co_await _loader.map_reduce(adder<unsigned>(), [this] (auto&) {
|
||||
return _num_sstables_processed[this_shard_id()];
|
||||
});
|
||||
co_return tasks::task_manager::task::progress {
|
||||
.completed = processed,
|
||||
.total = _sstables.size(),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
future<> sstables_loader::download_task_impl::run() {
|
||||
@@ -498,7 +516,10 @@ future<> sstables_loader::download_task_impl::run() {
|
||||
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg);
|
||||
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
||||
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
|
||||
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false);
|
||||
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false,
|
||||
[this] (unsigned num_streamed) {
|
||||
_num_sstables_processed[this_shard_id()] += num_streamed;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,8 @@ private:
|
||||
|
||||
future<> load_and_stream(sstring ks_name, sstring cf_name,
|
||||
table_id, std::vector<sstables::shared_sstable> sstables,
|
||||
bool primary_replica_only, bool unlink_sstables);
|
||||
bool primary_replica_only, bool unlink_sstables,
|
||||
std::function<void(unsigned)> on_streamed);
|
||||
|
||||
public:
|
||||
sstables_loader(sharded<replica::database>& db,
|
||||
|
||||
@@ -271,7 +271,12 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
print('Try to restore')
|
||||
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
assert status is not None
|
||||
assert status['state'] == 'done'
|
||||
assert status['progress_units'] == "sstables"
|
||||
assert status['progress_completed'] == len(toc_names)
|
||||
assert status['progress_total'] == len(toc_names)
|
||||
|
||||
print('Check that sstables came back')
|
||||
files = list_sstables()
|
||||
assert len(files) > 0
|
||||
|
||||
Reference in New Issue
Block a user