diff --git a/db/view/view.cc b/db/view/view.cc index 0c2b0f6b79..96dbb92fb4 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2639,10 +2639,9 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac return std::max(backlog, _max.load(std::memory_order_relaxed)); } -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name, - const sstring& cf_name) { +future view_builder::check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name) { using view_statuses_type = std::unordered_map; - return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { + return _sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) { // Only consider status of known hosts. return view_status.second == "STARTED" && tm.get_endpoint_for_host_id_if_known(view_status.first); @@ -2650,17 +2649,20 @@ future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ }); } -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, - streaming::stream_reason reason) { +future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table) { + return _vug.register_staging_sstable(std::move(sst), std::move(table)); +} + +future check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); } if (reason == streaming::stream_reason::repair && !t.views().empty()) { return make_ready_future(true); } - return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) { + return do_with(t.views(), [&vb, &tm] (auto& views) { return map_reduce(views, - [&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); }, + [&vb, &tm] (const view_ptr& view) { return vb.check_view_build_ongoing(tm, view->ks_name(), view->cf_name()); }, false, std::logical_or()); }); diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 805f1950c5..dcc115a390 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -221,6 +221,8 @@ public: // Can only be called on shard-0 future<> mark_existing_views_as_built(); + future check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name); + future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table); private: build_step& get_or_create_build_step(table_id); diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index 77b7113c0d..c15fbba2ed 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -16,19 +16,14 @@ namespace replica { class table; } -namespace db { - -class system_distributed_keyspace; - -} - namespace locator { class token_metadata; } namespace db::view { +class view_builder; -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, +future check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason); } diff --git a/main.cc b/main.cc index b1133d0b12..44a25f00f6 100644 --- a/main.cc +++ b/main.cc @@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl debug::the_stream_manager = &stream_manager; supervisor::notify("starting streaming service"); - stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); + stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { // FIXME -- keep the instances alive, just call .stop on them stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); @@ -1624,7 +1624,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // both) supervisor::notify("starting repair service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); + repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); @@ -1683,17 +1683,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::unset_server_task_manager_test(ctx).get(); }); #endif - supervisor::notify("starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get(); - auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { - sst_loader.stop().get(); - }); - api::set_server_sstables_loader(ctx, sst_loader).get(); - auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] { - api::unset_server_sstables_loader(ctx).get(); - }); - - gossiper.local().register_(ss.local().shared_from_this()); auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] { gossiper.local().unregister_(ss.local().shared_from_this()).get(); @@ -1802,6 +1791,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl view_builder.stop().get(); }); + supervisor::notify("starting sstables loader"); + sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder)).get(); + auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { + sst_loader.stop().get(); + }); + api::set_server_sstables_loader(ctx, sst_loader).get(); + auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] { + api::unset_server_sstables_loader(ctx).get(); + }); + /* * FIXME. In bb07678346 commit the API toggle for autocompaction was * (partially) delayed until system prepared to join the ring. Probably diff --git a/repair/repair.hh b/repair/repair.hh index 13ce82fada..5e0f3e8a1d 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -42,9 +42,8 @@ class database; class repair_service; namespace db { namespace view { - class view_update_generator; + class view_builder; } - class system_distributed_keyspace; } namespace netw { class messaging_service; } namespace service { diff --git a/repair/row_level.cc b/repair/row_level.cc index e77094e807..2169dea3df 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -418,8 +418,7 @@ class repair_writer_impl : public repair_writer::impl { std::optional> _writer_done; mutation_fragment_queue _mq; sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; + sharded& _view_builder; streaming::stream_reason _reason; flat_mutation_reader_v2 _queue_reader; public: @@ -427,8 +426,7 @@ public: schema_ptr schema, reader_permit permit, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, + sharded& view_builder, streaming::stream_reason reason, mutation_fragment_queue queue, flat_mutation_reader_v2 queue_reader) @@ -436,8 +434,7 @@ public: , _permit(std::move(permit)) , _mq(std::move(queue)) , _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) + , _view_builder(view_builder) , _reason(reason) , _queue_reader(std::move(queue_reader)) {} @@ -514,7 +511,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { auto erm = t.get_effective_replication_map(); auto& sharder = erm->get_sharder(*(w->schema())); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), t.stream_in_progress()).then([w, erm] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); @@ -531,11 +528,10 @@ lw_shared_ptr make_repair_writer( reader_permit permit, streaming::stream_reason reason, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator) { + sharded& view_builder) { auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader)); + auto i = std::make_unique(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader)); return make_lw_shared(schema, permit, std::move(i)); } @@ -735,8 +731,6 @@ private: repair_service& _rs; seastar::sharded& _db; netw::messaging_service& _messaging; - seastar::sharded& _sys_dist_ks; - seastar::sharded& _view_update_generator; schema_ptr _schema; reader_permit _permit; dht::token_range _range; @@ -852,8 +846,6 @@ public: : _rs(rs) , _db(rs.get_db()) , _messaging(rs.get_messaging()) - , _sys_dist_ks(rs.get_sys_dist_ks()) - , _view_update_generator(rs.get_view_update_generator()) , _schema(s) , _permit(std::move(permit)) , _range(range) @@ -868,7 +860,7 @@ public: , _remote_sharder(make_remote_sharder()) , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) - , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator)) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder())) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, netw::messaging_service::msg_addr addr) { auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard); @@ -2462,7 +2454,7 @@ future<> repair_service::init_ms_handlers() { auto from = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name, range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable { - if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) { + if (!local_repair._view_builder.local_is_initialized()) { return make_exception_future(std::runtime_error(format("Node {} is not fully initialized for repair, try again later", local_repair.my_address()))); } @@ -3184,9 +3176,8 @@ repair_service::repair_service(distributed& gossiper, sharded& sp, sharded& addr_map, sharded& bm, - sharded& sys_dist_ks, sharded& sys_ks, - sharded& vug, + sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory) @@ -3196,9 +3187,8 @@ repair_service::repair_service(distributed& gossiper, , _sp(sp) , _addr_map(addr_map) , _bm(bm) - , _sys_dist_ks(sys_dist_ks) , _sys_ks(sys_ks) - , _view_update_generator(vug) + , _view_builder(vb) , _repair_module(seastar::make_shared(tm, *this, max_repair_memory)) , _mm(mm) , _node_ops_metrics(_repair_module) diff --git a/repair/row_level.hh b/repair/row_level.hh index f0570d097f..42f6f22d82 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -92,9 +92,8 @@ class repair_service : public seastar::peering_sharded_service { sharded& _sp; sharded& _addr_map; sharded& _bm; - sharded& _sys_dist_ks; sharded& _sys_ks; - sharded& _view_update_generator; + sharded& _view_builder; shared_ptr _repair_module; service::migration_manager& _mm; node_ops_metrics _node_ops_metrics; @@ -122,9 +121,8 @@ public: sharded& sp, sharded& addr_map, sharded& bm, - sharded& sys_dist_ks, sharded& sys_ks, - sharded& vug, + sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory); ~repair_service(); @@ -180,8 +178,7 @@ public: netw::messaging_service& get_messaging() noexcept { return _messaging; } sharded& get_db() noexcept { return _db; } service::migration_manager& get_migration_manager() noexcept { return _mm; } - sharded& get_sys_dist_ks() noexcept { return _sys_dist_ks; } - sharded& get_view_update_generator() noexcept { return _view_update_generator; } + sharded& get_view_builder() noexcept { return _view_builder; } gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); } size_t max_repair_memory() const { return _max_repair_memory; } seastar::semaphore& memory_sem() { return _memory_sem; } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index c3e1932286..7b18e545c7 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -30,7 +30,7 @@ #include "db/view/view_update_checks.hh" #include #include -#include "db/view/view_update_generator.hh" +#include "db/view/view_builder.hh" extern logging::logger dblog; @@ -137,7 +137,7 @@ distributed_loader::reshape(sharded& dir, sharded distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, - sharded& view_update_generator, bool needs_view_update, sstring ks, sstring cf) { + sharded& vb, bool needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); @@ -161,9 +161,9 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh abort(); }); - co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> { + co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> { if (sst->requires_view_building()) { - co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); + co_await vb.local().register_staging_sstable(sst, table.shared_from_this()); } }); @@ -171,8 +171,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh } future<> -distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks, sstring cf) { +distributed_loader::process_upload_dir(distributed& db, sharded& vb, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); @@ -181,7 +180,7 @@ distributed_loader::process_upload_dir(distributed& db, distr on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &vb, ks = std::move(ks), cf = std::move(cf)] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -230,10 +229,10 @@ distributed_loader::process_upload_dir(distributed& db, distr [] (const sstables::shared_sstable&) { return true; }).get(); // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); + const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); - size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) { - return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf); + size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f6276f87d0..6693fec4c4 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -30,10 +30,9 @@ using column_family = table; namespace db { class config; -class system_distributed_keyspace; class system_keyspace; namespace view { -class view_update_generator; +class view_builder; } } @@ -71,7 +70,7 @@ class distributed_loader { static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, - sharded& db, sharded& view_update_generator, + sharded& db, sharded& vb, bool needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sharded& sys_ks, keyspace& ks, sstring ks_name); @@ -96,8 +95,7 @@ public: // The table UUID is returned too. static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); - static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks_name, sstring cf_name); + static future<> process_upload_dir(distributed& db, sharded& vb, sstring ks_name, sstring cf_name); }; } diff --git a/sstables_loader.cc b/sstables_loader.cc index 5588a7572c..d719231688 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); }); } else { - co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name); + co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name); } } catch (...) { llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}", diff --git a/sstables_loader.hh b/sstables_loader.hh index 5db085898a..902aa8b380 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -20,9 +20,8 @@ class database; namespace netw { class messaging_service; } namespace db { -class system_distributed_keyspace; namespace view { -class view_update_generator; +class view_builder; } } @@ -32,9 +31,8 @@ class view_update_generator; // system. Built on top of the distributed_loader functionality. class sstables_loader : public seastar::peering_sharded_service { sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; netw::messaging_service& _messaging; + sharded& _view_builder; // Note that this is obviously only valid for the current shard. Users of // this facility should elect a shard to be the coordinator based on any @@ -50,13 +48,11 @@ class sstables_loader : public seastar::peering_sharded_service public: sstables_loader(sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, - netw::messaging_service& messaging) + netw::messaging_service& messaging, + sharded& vb) : _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) , _messaging(messaging) + , _view_builder(vb) { } diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 533aaa569d..53e8436093 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -11,7 +11,7 @@ #include "consumer.hh" #include "replica/database.hh" #include "mutation/mutation_source_metadata.hh" -#include "db/view/view_update_generator.hh" +#include "db/view/view_builder.hh" #include "db/view/view_update_checks.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" @@ -20,13 +20,12 @@ namespace streaming { std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstring origin, sharded& db, - sharded& sys_dist_ks, - sharded& vug, + sharded& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard) { - return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { + return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { @@ -36,7 +35,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); auto guard = service::topology_guard(frozen_guard); - auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); + auto use_view_update_path = co_await db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy(); @@ -44,7 +43,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin // means partition estimation shouldn't be adjusted. const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); reader_consumer_v2 consumer = - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) { + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) { sstables::shared_sstable sst; try { sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); @@ -66,11 +65,11 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin cf->enable_off_strategy_trigger(); } return cf->add_sstable_and_update_cache(sst, offstrategy); - }).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> { + }).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> { if (!use_view_update_path) { return make_ready_future<>(); } - return vug.local().register_staging_sstable(sst, std::move(cf)); + return vb.local().register_staging_sstable(sst, std::move(cf)); }); }; if (!offstrategy) { diff --git a/streaming/consumer.hh b/streaming/consumer.hh index 84e103a764..8ac526cb3b 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -15,9 +15,8 @@ class database; } namespace db { -class system_distributed_keyspace; namespace view { -class view_update_generator; +class view_builder; } } @@ -25,8 +24,7 @@ namespace streaming { std::function(flat_mutation_reader_v2)> make_streaming_consumer(sstring origin, sharded& db, - sharded& sys_dist_ks, - sharded& vug, + sharded& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index a4ac0c48d3..a06a068da4 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -24,14 +24,12 @@ extern logging::logger sslog; stream_manager::stream_manager(db::config& cfg, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, + sharded& view_builder, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg) : _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) + , _view_builder(view_builder) , _ms(ms) , _mm(mm) , _gossiper(gossiper) diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index fd1d8e3588..210e83207c 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -27,9 +27,8 @@ namespace db { class config; -class system_distributed_keyspace; namespace view { -class view_update_generator; +class view_builder; } } @@ -84,8 +83,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en */ private: sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; + sharded& _view_builder; sharded& _ms; sharded& _mm; gms::gossiper& _gossiper; @@ -106,8 +104,7 @@ private: public: stream_manager(db::config& cfg, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, + sharded& view_builder, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 9b0be3dacb..91d00123da 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -83,7 +83,7 @@ public: std::function(flat_mutation_reader_v2)> stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) { - return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); + return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); } void stream_manager::init_messaging_service_handler(abort_source& as) { @@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id); sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session); - if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) { + if (!_view_builder.local_is_initialized()) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", _db.local().get_token_metadata().get_topology().my_address()))); } diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 40ed16f26d..e55900ba7f 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -726,7 +726,7 @@ private: std::ref(_ms), std::ref(_fd)).get(); auto stop_raft_gr = deferred_stop(_group0_registry); - _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); + _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); auto stop_streaming = defer([this] { _stream_manager.stop().get(); }); _feature_service.invoke_on_all([] (auto& fs) {