From 0d946a5fdfd5311ced19a7b49ebc915367ca0a4a Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:19:44 +0300 Subject: [PATCH] distributed_loader: Propagate view_builder& via process_upload_dir() Preparation to next patches, they'll make use of this new argument Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 10 +++++----- replica/distributed_loader.hh | 5 +++-- sstables_loader.cc | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 8b3e9a9bad..b6a6bd2207 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -139,7 +139,7 @@ distributed_loader::reshape(sharded& dir, sharded distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, - sharded& view_update_generator, bool needs_view_update, sstring ks, sstring cf) { + sharded& view_update_generator, sharded& vb, bool needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); @@ -174,7 +174,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh future<> distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks, sstring cf) { + distributed& view_update_generator, sharded& vb, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); @@ -183,7 +183,7 @@ distributed_loader::process_upload_dir(distributed& db, distr on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, &vb, ks = std::move(ks), cf = std::move(cf)] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -234,8 +234,8 @@ distributed_loader::process_upload_dir(distributed& db, distr // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); - size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) { - return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf); + size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator, &vb] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, view_update_generator, vb, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f6276f87d0..686340a78a 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -34,6 +34,7 @@ class system_distributed_keyspace; class system_keyspace; namespace view { class view_update_generator; +class view_builder; } } @@ -71,7 +72,7 @@ class distributed_loader { static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, - sharded& db, sharded& view_update_generator, + sharded& db, sharded& view_update_generator, sharded& vb, bool needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sharded& sys_ks, keyspace& ks, sstring ks_name); @@ -97,7 +98,7 @@ public: static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks_name, sstring cf_name); + distributed& view_update_generator, sharded& vb, sstring ks_name, sstring cf_name); }; } diff --git a/sstables_loader.cc b/sstables_loader.cc index 5588a7572c..e80145e299 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); }); } else { - co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name); + co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, _view_builder, ks_name, cf_name); } } catch (...) { llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",