From f4341ea088f8a12ab569d0f5815600f6df594627 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:27:10 +0300 Subject: [PATCH 01/15] repair: Remove unwanted local references from repair_meta When constructed, the class copies local references to services just to push them into make_repair_writer() later in the same initializers list. There's no need in keeping those references. Signed-off-by: Pavel Emelyanov --- repair/row_level.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 777daa6442..280d5a844d 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -734,8 +734,6 @@ private: repair_service& _rs; seastar::sharded& _db; netw::messaging_service& _messaging; - seastar::sharded& _sys_dist_ks; - seastar::sharded& _view_update_generator; schema_ptr _schema; reader_permit _permit; dht::token_range _range; @@ -851,8 +849,6 @@ public: : _rs(rs) , _db(rs.get_db()) , _messaging(rs.get_messaging()) - , _sys_dist_ks(rs.get_sys_dist_ks()) - , _view_update_generator(rs.get_view_update_generator()) , _schema(s) , _permit(std::move(permit)) , _range(range) @@ -867,7 +863,7 @@ public: , _remote_sharder(make_remote_sharder()) , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) - , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator)) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_sys_dist_ks(), rs.get_view_update_generator())) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, netw::messaging_service::msg_addr addr) { auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard); From ff63f8b1a557bcc7d95006766d73460e84bfee78 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:16:49 +0300 Subject: [PATCH 02/15] main: Start sstables loader later This service is on its own, nothing depends on it. Neither it can work before system distributed keyspace is started, so move it lower. Signed-off-by: Pavel Emelyanov --- main.cc | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/main.cc b/main.cc index a1ef8e9381..35915be1c6 100644 --- a/main.cc +++ b/main.cc @@ -1683,17 +1683,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::unset_server_task_manager_test(ctx).get(); }); #endif - supervisor::notify("starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get(); - auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { - sst_loader.stop().get(); - }); - api::set_server_sstables_loader(ctx, sst_loader).get(); - auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] { - api::unset_server_sstables_loader(ctx).get(); - }); - - gossiper.local().register_(ss.local().shared_from_this()); auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] { gossiper.local().unregister_(ss.local().shared_from_this()).get(); @@ -1805,6 +1794,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl view_builder.stop().get(); }); + supervisor::notify("starting sstables loader"); + sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get(); + auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { + sst_loader.stop().get(); + }); + api::set_server_sstables_loader(ctx, sst_loader).get(); + auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] { + api::unset_server_sstables_loader(ctx).get(); + }); + /* * FIXME. In bb07678346 commit the API toggle for autocompaction was * (partially) delayed until system prepared to join the ring. Probably From f269a3754160ff739145de25aa012ee509b8e106 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:17:49 +0300 Subject: [PATCH 03/15] sstables_loader: Add view_bulder dependency Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- sstables_loader.hh | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/main.cc b/main.cc index 35915be1c6..603e2e87d0 100644 --- a/main.cc +++ b/main.cc @@ -1795,7 +1795,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); supervisor::notify("starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get(); + sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(view_builder)).get(); auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { sst_loader.stop().get(); }); diff --git a/sstables_loader.hh b/sstables_loader.hh index 5db085898a..5fc48428da 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -23,6 +23,7 @@ namespace db { class system_distributed_keyspace; namespace view { class view_update_generator; +class view_builder; } } @@ -35,6 +36,7 @@ class sstables_loader : public seastar::peering_sharded_service sharded& _sys_dist_ks; sharded& _view_update_generator; netw::messaging_service& _messaging; + sharded& _view_builder; // Note that this is obviously only valid for the current shard. Users of // this facility should elect a shard to be the coordinator based on any @@ -52,11 +54,13 @@ public: sstables_loader(sharded& db, sharded& sys_dist_ks, sharded& view_update_generator, - netw::messaging_service& messaging) + netw::messaging_service& messaging, + sharded& vb) : _db(db) , _sys_dist_ks(sys_dist_ks) , _view_update_generator(view_update_generator) , _messaging(messaging) + , _view_builder(vb) { } From f0f1097d0cdce7e7af7289b521439907eea56bb6 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:22:45 +0300 Subject: [PATCH 04/15] repair_service: Add view builder dependency Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- repair/repair.hh | 1 + repair/row_level.cc | 2 ++ repair/row_level.hh | 3 +++ 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/main.cc b/main.cc index 603e2e87d0..bcea66dae9 100644 --- a/main.cc +++ b/main.cc @@ -1624,7 +1624,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // both) supervisor::notify("starting repair service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); + repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); diff --git a/repair/repair.hh b/repair/repair.hh index 13ce82fada..4f067445af 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -43,6 +43,7 @@ class repair_service; namespace db { namespace view { class view_update_generator; + class view_builder; } class system_distributed_keyspace; } diff --git a/repair/row_level.cc b/repair/row_level.cc index 280d5a844d..f22f5451a5 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -3182,6 +3182,7 @@ repair_service::repair_service(distributed& gossiper, sharded& sys_dist_ks, sharded& sys_ks, sharded& vug, + sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory) @@ -3194,6 +3195,7 @@ repair_service::repair_service(distributed& gossiper, , _sys_dist_ks(sys_dist_ks) , _sys_ks(sys_ks) , _view_update_generator(vug) + , _view_builder(vb) , _repair_module(seastar::make_shared(tm, *this, max_repair_memory)) , _mm(mm) , _node_ops_metrics(_repair_module) diff --git a/repair/row_level.hh b/repair/row_level.hh index f0570d097f..bf241c3259 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -95,6 +95,7 @@ class repair_service : public seastar::peering_sharded_service { sharded& _sys_dist_ks; sharded& _sys_ks; sharded& _view_update_generator; + sharded& _view_builder; shared_ptr _repair_module; service::migration_manager& _mm; node_ops_metrics _node_ops_metrics; @@ -125,6 +126,7 @@ public: sharded& sys_dist_ks, sharded& sys_ks, sharded& vug, + sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory); ~repair_service(); @@ -182,6 +184,7 @@ public: service::migration_manager& get_migration_manager() noexcept { return _mm; } sharded& get_sys_dist_ks() noexcept { return _sys_dist_ks; } sharded& get_view_update_generator() noexcept { return _view_update_generator; } + sharded& get_view_builder() noexcept { return _view_builder; } gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); } size_t max_repair_memory() const { return _max_repair_memory; } seastar::semaphore& memory_sem() { return _memory_sem; } From d917b068573b92c35da7a762e3d0dc72a5164127 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:13:54 +0300 Subject: [PATCH 05/15] stream_manager: Add view builder dependency Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- streaming/stream_manager.cc | 2 ++ streaming/stream_manager.hh | 3 +++ test/lib/cql_test_env.cc | 2 +- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/main.cc b/main.cc index bcea66dae9..b5f7176639 100644 --- a/main.cc +++ b/main.cc @@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl debug::the_stream_manager = &stream_manager; supervisor::notify("starting streaming service"); - stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); + stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { // FIXME -- keep the instances alive, just call .stop on them stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index a4ac0c48d3..c9b337c164 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -26,12 +26,14 @@ stream_manager::stream_manager(db::config& cfg, sharded& db, sharded& sys_dist_ks, sharded& view_update_generator, + sharded& view_builder, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg) : _db(db) , _sys_dist_ks(sys_dist_ks) , _view_update_generator(view_update_generator) + , _view_builder(view_builder) , _ms(ms) , _mm(mm) , _gossiper(gossiper) diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index fd1d8e3588..6fa1776ed6 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -30,6 +30,7 @@ class config; class system_distributed_keyspace; namespace view { class view_update_generator; +class view_builder; } } @@ -86,6 +87,7 @@ private: sharded& _db; sharded& _sys_dist_ks; sharded& _view_update_generator; + sharded& _view_builder; sharded& _ms; sharded& _mm; gms::gossiper& _gossiper; @@ -108,6 +110,7 @@ public: stream_manager(db::config& cfg, sharded& db, sharded& sys_dist_ks, sharded& view_update_generator, + sharded& view_builder, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index bc01f8d8dd..24ae4edada 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -726,7 +726,7 @@ private: std::ref(_ms), std::ref(_fd)).get(); auto stop_raft_gr = deferred_stop(_group0_registry); - _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); + _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); auto stop_streaming = defer([this] { _stream_manager.stop().get(); }); _feature_service.invoke_on_all([] (auto& fs) { From 0d946a5fdfd5311ced19a7b49ebc915367ca0a4a Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 11:19:44 +0300 Subject: [PATCH 06/15] distributed_loader: Propagate view_builder& via process_upload_dir() Preparation to next patches, they'll make use of this new argument Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 10 +++++----- replica/distributed_loader.hh | 5 +++-- sstables_loader.cc | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 8b3e9a9bad..b6a6bd2207 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -139,7 +139,7 @@ distributed_loader::reshape(sharded& dir, sharded distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, - sharded& view_update_generator, bool needs_view_update, sstring ks, sstring cf) { + sharded& view_update_generator, sharded& vb, bool needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); @@ -174,7 +174,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh future<> distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks, sstring cf) { + distributed& view_update_generator, sharded& vb, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); @@ -183,7 +183,7 @@ distributed_loader::process_upload_dir(distributed& db, distr on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, &vb, ks = std::move(ks), cf = std::move(cf)] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -234,8 +234,8 @@ distributed_loader::process_upload_dir(distributed& db, distr // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); - size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) { - return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf); + size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator, &vb] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, view_update_generator, vb, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f6276f87d0..686340a78a 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -34,6 +34,7 @@ class system_distributed_keyspace; class system_keyspace; namespace view { class view_update_generator; +class view_builder; } } @@ -71,7 +72,7 @@ class distributed_loader { static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, - sharded& db, sharded& view_update_generator, + sharded& db, sharded& view_update_generator, sharded& vb, bool needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sharded& sys_ks, keyspace& ks, sstring ks_name); @@ -97,7 +98,7 @@ public: static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks_name, sstring cf_name); + distributed& view_update_generator, sharded& vb, sstring ks_name, sstring cf_name); }; } diff --git a/sstables_loader.cc b/sstables_loader.cc index 5588a7572c..e80145e299 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); }); } else { - co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name); + co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, _view_builder, ks_name, cf_name); } } catch (...) { llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}", From 5e6893075d35bb52cdae31fc147f7d0098d01088 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:09:24 +0300 Subject: [PATCH 07/15] repair: Keep view_builder& on repair_writer_impl Preparation patch, next patches will make use of this new member Signed-off-by: Pavel Emelyanov --- repair/row_level.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index f22f5451a5..70a4e4b3e8 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -419,6 +419,7 @@ class repair_writer_impl : public repair_writer::impl { sharded& _db; sharded& _sys_dist_ks; sharded& _view_update_generator; + sharded& _view_builder; streaming::stream_reason _reason; flat_mutation_reader_v2 _queue_reader; public: @@ -428,6 +429,7 @@ public: sharded& db, sharded& sys_dist_ks, sharded& view_update_generator, + sharded& view_builder, streaming::stream_reason reason, mutation_fragment_queue queue, flat_mutation_reader_v2 queue_reader) @@ -437,6 +439,7 @@ public: , _db(db) , _sys_dist_ks(sys_dist_ks) , _view_update_generator(view_update_generator) + , _view_builder(view_builder) , _reason(reason) , _queue_reader(std::move(queue_reader)) {} @@ -531,10 +534,11 @@ lw_shared_ptr make_repair_writer( streaming::stream_reason reason, sharded& db, sharded& sys_dist_ks, - sharded& view_update_generator) { + sharded& view_update_generator, + sharded& view_builder) { auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader)); + auto i = std::make_unique(schema, permit, db, sys_dist_ks, view_update_generator, view_builder, reason, std::move(queue), std::move(queue_reader)); return make_lw_shared(schema, permit, std::move(i)); } @@ -863,7 +867,7 @@ public: , _remote_sharder(make_remote_sharder()) , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) - , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_sys_dist_ks(), rs.get_view_update_generator())) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_sys_dist_ks(), rs.get_view_update_generator(), rs.get_view_builder())) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, netw::messaging_service::msg_addr addr) { auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard); From 57517d59874393861217437a21253bc41b566809 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:31:52 +0300 Subject: [PATCH 08/15] streaming: Proparage view_builder& down to make_streaming_consumer() Continuation of the previous patch. Repair itself doesn't need it, but streaming consumer does. Signed-off-by: Pavel Emelyanov --- repair/row_level.cc | 2 +- streaming/consumer.cc | 1 + streaming/consumer.hh | 2 ++ streaming/stream_session.cc | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 70a4e4b3e8..c9f9deb8f5 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -516,7 +516,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { auto erm = t.get_effective_replication_map(); auto& sharder = erm->get_sharder(*(w->schema())); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), t.stream_in_progress()).then([w, erm] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 533aaa569d..f55fb5dd3f 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -22,6 +22,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin sharded& db, sharded& sys_dist_ks, sharded& vug, + sharded& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, diff --git a/streaming/consumer.hh b/streaming/consumer.hh index 84e103a764..bee80c34f7 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -18,6 +18,7 @@ namespace db { class system_distributed_keyspace; namespace view { class view_update_generator; +class view_builder; } } @@ -27,6 +28,7 @@ std::function(flat_mutation_reader_v2)> make_streaming_consumer(sstring sharded& db, sharded& sys_dist_ks, sharded& vug, + sharded& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 9b0be3dacb..f177ed72d4 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -83,7 +83,7 @@ public: std::function(flat_mutation_reader_v2)> stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) { - return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); + return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); } void stream_manager::init_messaging_service_handler(abort_source& as) { From 92ff0d3fc380c5196f73ab3ae490a40eed2954d3 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:45:21 +0300 Subject: [PATCH 09/15] view: Make check_view_build_ongoing() helper a method of view_builder This helper checks if there's an ongoing build of a view, and it's in fact internal to view-builder, who keeps its status in one of its system tables. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 12 +++++------- db/view/view_builder.hh | 1 + db/view/view_update_checks.hh | 9 ++------- replica/distributed_loader.cc | 4 ++-- streaming/consumer.cc | 4 ++-- 5 files changed, 12 insertions(+), 18 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 0c2b0f6b79..676af44cd9 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2639,10 +2639,9 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac return std::max(backlog, _max.load(std::memory_order_relaxed)); } -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name, - const sstring& cf_name) { +future view_builder::check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name) { using view_statuses_type = std::unordered_map; - return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { + return _sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) { // Only consider status of known hosts. return view_status.second == "STARTED" && tm.get_endpoint_for_host_id_if_known(view_status.first); @@ -2650,17 +2649,16 @@ future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ }); } -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, - streaming::stream_reason reason) { +future check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); } if (reason == streaming::stream_reason::repair && !t.views().empty()) { return make_ready_future(true); } - return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) { + return do_with(t.views(), [&vb, &tm] (auto& views) { return map_reduce(views, - [&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); }, + [&vb, &tm] (const view_ptr& view) { return vb.check_view_build_ongoing(tm, view->ks_name(), view->cf_name()); }, false, std::logical_or()); }); diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 805f1950c5..493f599d78 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -221,6 +221,7 @@ public: // Can only be called on shard-0 future<> mark_existing_views_as_built(); + future check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name); private: build_step& get_or_create_build_step(table_id); diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index 77b7113c0d..c15fbba2ed 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -16,19 +16,14 @@ namespace replica { class table; } -namespace db { - -class system_distributed_keyspace; - -} - namespace locator { class token_metadata; } namespace db::view { +class view_builder; -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, +future check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason); } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index b6a6bd2207..71f92fe17c 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -183,7 +183,7 @@ distributed_loader::process_upload_dir(distributed& db, distr on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, &vb, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &view_update_generator, &vb, ks = std::move(ks), cf = std::move(cf)] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -232,7 +232,7 @@ distributed_loader::process_upload_dir(distributed& db, distr [] (const sstables::shared_sstable&) { return true; }).get(); // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); + const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator, &vb] (sstables::sstable_directory& dir) { return make_sstables_available(dir, db, view_update_generator, vb, use_view_update_path, ks, cf); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index f55fb5dd3f..4484cc917c 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -27,7 +27,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard) { - return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { + return [&db, &vug, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { @@ -37,7 +37,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); auto guard = service::topology_guard(frozen_guard); - auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); + auto use_view_update_path = co_await db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy(); From 66a8035b64198f29b153ec74d729470599148854 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:51:45 +0300 Subject: [PATCH 10/15] view: Make register_staging_sstable() a method of view_builder Callers of it had just checked if an sstable still has some views building, so the should talk to view-builder to register the sstable that's now considered to be staging. Effectively. this is to hide the view-update-generator from other services and make them communicate with the builder only. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 4 ++++ db/view/view_builder.hh | 1 + replica/distributed_loader.cc | 6 +++--- streaming/consumer.cc | 10 +++++----- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 676af44cd9..96dbb92fb4 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2649,6 +2649,10 @@ future view_builder::check_view_build_ongoing(const locator::token_metadat }); } +future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table) { + return _vug.register_staging_sstable(std::move(sst), std::move(table)); +} + future check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 493f599d78..dcc115a390 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -222,6 +222,7 @@ public: // Can only be called on shard-0 future<> mark_existing_views_as_built(); future check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name); + future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table); private: build_step& get_or_create_build_step(table_id); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 71f92fe17c..3e2c3823bb 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -31,7 +31,7 @@ #include "db/view/view_update_checks.hh" #include #include -#include "db/view/view_update_generator.hh" +#include "db/view/view_builder.hh" extern logging::logger dblog; @@ -163,9 +163,9 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh abort(); }); - co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> { + co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> { if (sst->requires_view_building()) { - co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); + co_await vb.local().register_staging_sstable(sst, table.shared_from_this()); } }); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 4484cc917c..01dbb1b509 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -11,7 +11,7 @@ #include "consumer.hh" #include "replica/database.hh" #include "mutation/mutation_source_metadata.hh" -#include "db/view/view_update_generator.hh" +#include "db/view/view_builder.hh" #include "db/view/view_update_checks.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" @@ -27,7 +27,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard) { - return [&db, &vug, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { + return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { @@ -45,7 +45,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin // means partition estimation shouldn't be adjusted. const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); reader_consumer_v2 consumer = - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) { + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) { sstables::shared_sstable sst; try { sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); @@ -67,11 +67,11 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin cf->enable_off_strategy_trigger(); } return cf->add_sstable_and_update_cache(sst, offstrategy); - }).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> { + }).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> { if (!use_view_update_path) { return make_ready_future<>(); } - return vug.local().register_staging_sstable(sst, std::move(cf)); + return vb.local().register_staging_sstable(sst, std::move(cf)); }); }; if (!offstrategy) { From b7288579548948ce042b9f6303860ff91daa109d Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:58:47 +0300 Subject: [PATCH 11/15] distributed_loader: Remove system_distributed_keyspace and view_update_generator Now all the code is happy with view_builder and can be shortened Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 11 +++++------ replica/distributed_loader.hh | 7 ++----- sstables_loader.cc | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3e2c3823bb..640765179b 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -139,7 +139,7 @@ distributed_loader::reshape(sharded& dir, sharded distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, - sharded& view_update_generator, sharded& vb, bool needs_view_update, sstring ks, sstring cf) { + sharded& vb, bool needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); @@ -173,8 +173,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh } future<> -distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sharded& vb, sstring ks, sstring cf) { +distributed_loader::process_upload_dir(distributed& db, sharded& vb, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); @@ -183,7 +182,7 @@ distributed_loader::process_upload_dir(distributed& db, distr on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async(std::move(attr), [&db, &view_update_generator, &vb, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &vb, ks = std::move(ks), cf = std::move(cf)] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -234,8 +233,8 @@ distributed_loader::process_upload_dir(distributed& db, distr // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get(); - size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator, &vb] (sstables::sstable_directory& dir) { - return make_sstables_available(dir, db, view_update_generator, vb, use_view_update_path, ks, cf); + size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 686340a78a..6693fec4c4 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -30,10 +30,8 @@ using column_family = table; namespace db { class config; -class system_distributed_keyspace; class system_keyspace; namespace view { -class view_update_generator; class view_builder; } } @@ -72,7 +70,7 @@ class distributed_loader { static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, - sharded& db, sharded& view_update_generator, sharded& vb, + sharded& db, sharded& vb, bool needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sharded& sys_ks, keyspace& ks, sstring ks_name); @@ -97,8 +95,7 @@ public: // The table UUID is returned too. static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); - static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sharded& vb, sstring ks_name, sstring cf_name); + static future<> process_upload_dir(distributed& db, sharded& vb, sstring ks_name, sstring cf_name); }; } diff --git a/sstables_loader.cc b/sstables_loader.cc index e80145e299..d719231688 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); }); } else { - co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, _view_builder, ks_name, cf_name); + co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name); } } catch (...) { llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}", From afa94d28376c44b83d4e8c64c8ce47807a48aab0 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:55:42 +0300 Subject: [PATCH 12/15] sstables_loader: Remove system_distributed_keyspace and view_update_generator Now all the code is happy with view_builder and can be shortened Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- sstables_loader.hh | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/main.cc b/main.cc index b5f7176639..c4af62c3ca 100644 --- a/main.cc +++ b/main.cc @@ -1795,7 +1795,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); supervisor::notify("starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(view_builder)).get(); + sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder)).get(); auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { sst_loader.stop().get(); }); diff --git a/sstables_loader.hh b/sstables_loader.hh index 5fc48428da..902aa8b380 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -20,9 +20,7 @@ class database; namespace netw { class messaging_service; } namespace db { -class system_distributed_keyspace; namespace view { -class view_update_generator; class view_builder; } } @@ -33,8 +31,6 @@ class view_builder; // system. Built on top of the distributed_loader functionality. class sstables_loader : public seastar::peering_sharded_service { sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; netw::messaging_service& _messaging; sharded& _view_builder; @@ -52,13 +48,9 @@ class sstables_loader : public seastar::peering_sharded_service public: sstables_loader(sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, netw::messaging_service& messaging, sharded& vb) : _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) , _messaging(messaging) , _view_builder(vb) { From ae2dcdc7c2fbd51d48a68ac5230e8dacf85efafa Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 12:58:16 +0300 Subject: [PATCH 13/15] streaming: Remove system_distributed_keyspace and view_update_generator Now all the code is happy with view_builder and can be shortened Signed-off-by: Pavel Emelyanov --- repair/row_level.cc | 2 +- streaming/consumer.cc | 2 -- streaming/consumer.hh | 4 ---- streaming/stream_session.cc | 2 +- 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index c9f9deb8f5..62c92f85c7 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -516,7 +516,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { auto erm = t.get_effective_replication_map(); auto& sharder = erm->get_sharder(*(w->schema())); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), t.stream_in_progress()).then([w, erm] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 01dbb1b509..53e8436093 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -20,8 +20,6 @@ namespace streaming { std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstring origin, sharded& db, - sharded& sys_dist_ks, - sharded& vug, sharded& vb, uint64_t estimated_partitions, stream_reason reason, diff --git a/streaming/consumer.hh b/streaming/consumer.hh index bee80c34f7..8ac526cb3b 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -15,9 +15,7 @@ class database; } namespace db { -class system_distributed_keyspace; namespace view { -class view_update_generator; class view_builder; } } @@ -26,8 +24,6 @@ namespace streaming { std::function(flat_mutation_reader_v2)> make_streaming_consumer(sstring origin, sharded& db, - sharded& sys_dist_ks, - sharded& vug, sharded& vb, uint64_t estimated_partitions, stream_reason reason, diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index f177ed72d4..9f846aab57 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -83,7 +83,7 @@ public: std::function(flat_mutation_reader_v2)> stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) { - return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); + return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); } void stream_manager::init_messaging_service_handler(abort_source& as) { From 84ef6a8179bae1956392fcdb838e5ef53a1fb20b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 13:02:55 +0300 Subject: [PATCH 14/15] repair: Remove system_distributed_keyspace and view_update_generator Now all the code is happy with view_builder and can be shortened Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- repair/repair.hh | 2 -- repair/row_level.cc | 18 +++--------------- repair/row_level.hh | 6 ------ 4 files changed, 4 insertions(+), 24 deletions(-) diff --git a/main.cc b/main.cc index c4af62c3ca..721e25357c 100644 --- a/main.cc +++ b/main.cc @@ -1624,7 +1624,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // both) supervisor::notify("starting repair service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); + repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); diff --git a/repair/repair.hh b/repair/repair.hh index 4f067445af..5e0f3e8a1d 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -42,10 +42,8 @@ class database; class repair_service; namespace db { namespace view { - class view_update_generator; class view_builder; } - class system_distributed_keyspace; } namespace netw { class messaging_service; } namespace service { diff --git a/repair/row_level.cc b/repair/row_level.cc index 62c92f85c7..0bb26834db 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -417,8 +417,6 @@ class repair_writer_impl : public repair_writer::impl { std::optional> _writer_done; mutation_fragment_queue _mq; sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; sharded& _view_builder; streaming::stream_reason _reason; flat_mutation_reader_v2 _queue_reader; @@ -427,8 +425,6 @@ public: schema_ptr schema, reader_permit permit, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, sharded& view_builder, streaming::stream_reason reason, mutation_fragment_queue queue, @@ -437,8 +433,6 @@ public: , _permit(std::move(permit)) , _mq(std::move(queue)) , _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) , _view_builder(view_builder) , _reason(reason) , _queue_reader(std::move(queue_reader)) @@ -533,12 +527,10 @@ lw_shared_ptr make_repair_writer( reader_permit permit, streaming::stream_reason reason, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, sharded& view_builder) { auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, permit, db, sys_dist_ks, view_update_generator, view_builder, reason, std::move(queue), std::move(queue_reader)); + auto i = std::make_unique(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader)); return make_lw_shared(schema, permit, std::move(i)); } @@ -867,7 +859,7 @@ public: , _remote_sharder(make_remote_sharder()) , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) - , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_sys_dist_ks(), rs.get_view_update_generator(), rs.get_view_builder())) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder())) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, netw::messaging_service::msg_addr addr) { auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard); @@ -2461,7 +2453,7 @@ future<> repair_service::init_ms_handlers() { auto from = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name, range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable { - if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) { + if (!local_repair._view_builder.local_is_initialized()) { return make_exception_future(std::runtime_error(format("Node {} is not fully initialized for repair, try again later", local_repair.my_address()))); } @@ -3183,9 +3175,7 @@ repair_service::repair_service(distributed& gossiper, sharded& sp, sharded& addr_map, sharded& bm, - sharded& sys_dist_ks, sharded& sys_ks, - sharded& vug, sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, @@ -3196,9 +3186,7 @@ repair_service::repair_service(distributed& gossiper, , _sp(sp) , _addr_map(addr_map) , _bm(bm) - , _sys_dist_ks(sys_dist_ks) , _sys_ks(sys_ks) - , _view_update_generator(vug) , _view_builder(vb) , _repair_module(seastar::make_shared(tm, *this, max_repair_memory)) , _mm(mm) diff --git a/repair/row_level.hh b/repair/row_level.hh index bf241c3259..42f6f22d82 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -92,9 +92,7 @@ class repair_service : public seastar::peering_sharded_service { sharded& _sp; sharded& _addr_map; sharded& _bm; - sharded& _sys_dist_ks; sharded& _sys_ks; - sharded& _view_update_generator; sharded& _view_builder; shared_ptr _repair_module; service::migration_manager& _mm; @@ -123,9 +121,7 @@ public: sharded& sp, sharded& addr_map, sharded& bm, - sharded& sys_dist_ks, sharded& sys_ks, - sharded& vug, sharded& vb, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory); @@ -182,8 +178,6 @@ public: netw::messaging_service& get_messaging() noexcept { return _messaging; } sharded& get_db() noexcept { return _db; } service::migration_manager& get_migration_manager() noexcept { return _mm; } - sharded& get_sys_dist_ks() noexcept { return _sys_dist_ks; } - sharded& get_view_update_generator() noexcept { return _view_update_generator; } sharded& get_view_builder() noexcept { return _view_builder; } gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); } size_t max_repair_memory() const { return _max_repair_memory; } From 8906126a2c51f16d3d780b84a8c8f1ed2fb0e144 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 16 May 2024 13:07:29 +0300 Subject: [PATCH 15/15] stream_manager: Remove system_distributed_keyspace and view_update_generator Now all the code is happy with view_builder and can be shortened Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- streaming/stream_manager.cc | 4 ---- streaming/stream_manager.hh | 6 ------ streaming/stream_session.cc | 2 +- test/lib/cql_test_env.cc | 2 +- 5 files changed, 3 insertions(+), 13 deletions(-) diff --git a/main.cc b/main.cc index 721e25357c..340e374f6d 100644 --- a/main.cc +++ b/main.cc @@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl debug::the_stream_manager = &stream_manager; supervisor::notify("starting streaming service"); - stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); + stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { // FIXME -- keep the instances alive, just call .stop on them stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index c9b337c164..a06a068da4 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -24,15 +24,11 @@ extern logging::logger sslog; stream_manager::stream_manager(db::config& cfg, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, sharded& view_builder, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg) : _db(db) - , _sys_dist_ks(sys_dist_ks) - , _view_update_generator(view_update_generator) , _view_builder(view_builder) , _ms(ms) , _mm(mm) diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 6fa1776ed6..210e83207c 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -27,9 +27,7 @@ namespace db { class config; -class system_distributed_keyspace; namespace view { -class view_update_generator; class view_builder; } } @@ -85,8 +83,6 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en */ private: sharded& _db; - sharded& _sys_dist_ks; - sharded& _view_update_generator; sharded& _view_builder; sharded& _ms; sharded& _mm; @@ -108,8 +104,6 @@ private: public: stream_manager(db::config& cfg, sharded& db, - sharded& sys_dist_ks, - sharded& view_update_generator, sharded& view_builder, sharded& ms, sharded& mm, diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 9f846aab57..91d00123da 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id); sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session); - if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) { + if (!_view_builder.local_is_initialized()) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", _db.local().get_token_metadata().get_topology().my_address()))); } diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 24ae4edada..6ddc0906f8 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -726,7 +726,7 @@ private: std::ref(_ms), std::ref(_fd)).get(); auto stop_raft_gr = deferred_stop(_group0_registry); - _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); + _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); auto stop_streaming = defer([this] { _stream_manager.stop().get(); }); _feature_service.invoke_on_all([] (auto& fs) {