distributed_loader: Propagate view_builder& via process_upload_dir()

Preparation to next patches, they'll make use of this new argument

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2024-05-16 11:19:44 +03:00
parent d917b06857
commit 0d946a5fdf
3 changed files with 9 additions and 8 deletions

View File

@@ -139,7 +139,7 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<r
// Loads SSTables into the main directory (or staging) and returns how many were loaded
future<size_t>
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<replica::database>& db,
sharded<db::view::view_update_generator>& view_update_generator, bool needs_view_update, sstring ks, sstring cf) {
sharded<db::view::view_update_generator>& view_update_generator, sharded<db::view::view_builder>& vb, bool needs_view_update, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf);
auto new_sstables = std::vector<sstables::shared_sstable>();
@@ -174,7 +174,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
future<>
distributed_loader::process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks, sstring cf) {
distributed<db::view::view_update_generator>& view_update_generator, sharded<db::view::view_builder>& vb, sstring ks, sstring cf) {
seastar::thread_attributes attr;
attr.sched_group = db.local().get_streaming_scheduling_group();
@@ -183,7 +183,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
}
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] {
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, &vb, ks = std::move(ks), cf = std::move(cf)] {
auto global_table = get_table_on_all_shards(db, ks, cf).get();
sharded<sstables::sstable_directory> directory;
@@ -234,8 +234,8 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf);
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator, &vb] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, view_update_generator, vb, use_view_update_path, ks, cf);
}, size_t(0), std::plus<size_t>()).get();
dblog.info("Loaded {} SSTables", loaded);

View File

@@ -34,6 +34,7 @@ class system_distributed_keyspace;
class system_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -71,7 +72,7 @@ class distributed_loader {
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring cf_name);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator, sharded<db::view::view_builder>& vb,
bool needs_view_update, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name);
@@ -97,7 +98,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<> process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
distributed<db::view::view_update_generator>& view_update_generator, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
};
}

View File

@@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name);
co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, _view_builder, ks_name, cf_name);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",