Merge 'Rework view services and system-distributed-keyspace dependencies' from Pavel Emelyanov

The system-distributed-keyspace and view-update-generator often go in pair, because streaming, repair and sstables-loader (via distributed-loader) need them booth to check if sstable is staging and register it if it's such. The check is performed by messing directly with system_distributed.view_build_status table, and the registration happens via view-update-generator.

That's not nice, other services shouldn't know that view status is kept in system table. Also view-update-generator is a service to generae and push view updates, the fact that it keeps staging sstables list is the implementation detail.

This PR replaces dependencies on the mentioned pair of services with the single dependency on view-builder (repair, sstables-loader and stream-manager are enlightened) and hides the view building-vs-staging details inside the view_builder.

Along the way, some simplification of repair_writer_impl class is done.

Closes scylladb/scylladb#18706

* github.com:scylladb/scylladb:
  stream_manager: Remove system_distributed_keyspace and view_update_generator
  repair: Remove system_distributed_keyspace and view_update_generator
  streaming: Remove system_distributed_keyspace and view_update_generator
  sstables_loader: Remove system_distributed_keyspace and view_update_generator
  distributed_loader: Remove system_distributed_keyspace and view_update_generator
  view: Make register_staging_sstable() a method of view_builder
  view: Make check_view_build_ongoing() helper a method of view_builder
  streaming: Proparage view_builder& down to make_streaming_consumer()
  repair: Keep view_builder& on repair_writer_impl
  distributed_loader: Propagate view_builder& via process_upload_dir()
  stream_manager: Add view builder dependency
  repair_service: Add view builder dependency
  sstables_loader: Add view_bulder dependency
  main: Start sstables loader later
  repair: Remove unwanted local references from repair_meta
This commit is contained in:
Botond Dénes
2024-05-27 10:51:11 +03:00
17 changed files with 74 additions and 105 deletions

View File

@@ -2639,10 +2639,9 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac
return std::max(backlog, _max.load(std::memory_order_relaxed));
}
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name,
const sstring& cf_name) {
future<bool> view_builder::check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name) {
using view_statuses_type = std::unordered_map<locator::host_id, sstring>;
return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
return _sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
// Only consider status of known hosts.
return view_status.second == "STARTED" && tm.get_endpoint_for_host_id_if_known(view_status.first);
@@ -2650,17 +2649,20 @@ future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_
});
}
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
streaming::stream_reason reason) {
future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table) {
return _vug.register_staging_sstable(std::move(sst), std::move(table));
}
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) {
if (is_internal_keyspace(t.schema()->ks_name())) {
return make_ready_future<bool>(false);
}
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
return make_ready_future<bool>(true);
}
return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) {
return do_with(t.views(), [&vb, &tm] (auto& views) {
return map_reduce(views,
[&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); },
[&vb, &tm] (const view_ptr& view) { return vb.check_view_build_ongoing(tm, view->ks_name(), view->cf_name()); },
false,
std::logical_or<bool>());
});

View File

@@ -221,6 +221,8 @@ public:
// Can only be called on shard-0
future<> mark_existing_views_as_built();
future<bool> check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name);
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
private:
build_step& get_or_create_build_step(table_id);

View File

@@ -16,19 +16,14 @@ namespace replica {
class table;
}
namespace db {
class system_distributed_keyspace;
}
namespace locator {
class token_metadata;
}
namespace db::view {
class view_builder;
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t,
streaming::stream_reason reason);
}

25
main.cc
View File

@@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
debug::the_stream_manager = &stream_manager;
supervisor::notify("starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
@@ -1624,7 +1624,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// both)
supervisor::notify("starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});
@@ -1683,17 +1683,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::unset_server_task_manager_test(ctx).get();
});
#endif
supervisor::notify("starting sstables loader");
sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get();
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
sst_loader.stop().get();
});
api::set_server_sstables_loader(ctx, sst_loader).get();
auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] {
api::unset_server_sstables_loader(ctx).get();
});
gossiper.local().register_(ss.local().shared_from_this());
auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] {
gossiper.local().unregister_(ss.local().shared_from_this()).get();
@@ -1802,6 +1791,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
view_builder.stop().get();
});
supervisor::notify("starting sstables loader");
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder)).get();
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
sst_loader.stop().get();
});
api::set_server_sstables_loader(ctx, sst_loader).get();
auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] {
api::unset_server_sstables_loader(ctx).get();
});
/*
* FIXME. In bb07678346 commit the API toggle for autocompaction was
* (partially) delayed until system prepared to join the ring. Probably

View File

@@ -42,9 +42,8 @@ class database;
class repair_service;
namespace db {
namespace view {
class view_update_generator;
class view_builder;
}
class system_distributed_keyspace;
}
namespace netw { class messaging_service; }
namespace service {

View File

@@ -418,8 +418,7 @@ class repair_writer_impl : public repair_writer::impl {
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<db::view::view_builder>& _view_builder;
streaming::stream_reason _reason;
flat_mutation_reader_v2 _queue_reader;
public:
@@ -427,8 +426,7 @@ public:
schema_ptr schema,
reader_permit permit,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<db::view::view_builder>& view_builder,
streaming::stream_reason reason,
mutation_fragment_queue queue,
flat_mutation_reader_v2 queue_reader)
@@ -436,8 +434,7 @@ public:
, _permit(std::move(permit))
, _mq(std::move(queue))
, _db(db)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
, _view_builder(view_builder)
, _reason(reason)
, _queue_reader(std::move(queue_reader))
{}
@@ -514,7 +511,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(*(w->schema()));
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
@@ -531,11 +528,10 @@ lw_shared_ptr<repair_writer> make_repair_writer(
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator) {
sharded<db::view::view_builder>& view_builder) {
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader));
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}
@@ -735,8 +731,6 @@ private:
repair_service& _rs;
seastar::sharded<replica::database>& _db;
netw::messaging_service& _messaging;
seastar::sharded<db::system_distributed_keyspace>& _sys_dist_ks;
seastar::sharded<db::view::view_update_generator>& _view_update_generator;
schema_ptr _schema;
reader_permit _permit;
dht::token_range _range;
@@ -852,8 +846,6 @@ public:
: _rs(rs)
, _db(rs.get_db())
, _messaging(rs.get_messaging())
, _sys_dist_ks(rs.get_sys_dist_ks())
, _view_update_generator(rs.get_view_update_generator())
, _schema(s)
, _permit(std::move(permit))
, _range(range)
@@ -868,7 +860,7 @@ public:
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config(cf))
, _nr_peer_nodes(nr_peer_nodes)
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator))
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder()))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, netw::messaging_service::msg_addr addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
@@ -2462,7 +2454,7 @@ future<> repair_service::init_ms_handlers() {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) {
if (!local_repair._view_builder.local_is_initialized()) {
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
local_repair.my_address())));
}
@@ -3184,9 +3176,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& sp,
sharded<service::raft_address_map>& addr_map,
sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
sharded<db::view::view_builder>& vb,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
@@ -3196,9 +3187,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
, _sp(sp)
, _addr_map(addr_map)
, _bm(bm)
, _sys_dist_ks(sys_dist_ks)
, _sys_ks(sys_ks)
, _view_update_generator(vug)
, _view_builder(vb)
, _repair_module(seastar::make_shared<repair::task_manager_module>(tm, *this, max_repair_memory))
, _mm(mm)
, _node_ops_metrics(_repair_module)

View File

@@ -92,9 +92,8 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::storage_proxy>& _sp;
sharded<service::raft_address_map>& _addr_map;
sharded<db::batchlog_manager>& _bm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::system_keyspace>& _sys_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<db::view::view_builder>& _view_builder;
shared_ptr<repair::task_manager_module> _repair_module;
service::migration_manager& _mm;
node_ops_metrics _node_ops_metrics;
@@ -122,9 +121,8 @@ public:
sharded<service::storage_proxy>& sp,
sharded<service::raft_address_map>& addr_map,
sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
sharded<db::view::view_builder>& vb,
tasks::task_manager& tm,
service::migration_manager& mm, size_t max_repair_memory);
~repair_service();
@@ -180,8 +178,7 @@ public:
netw::messaging_service& get_messaging() noexcept { return _messaging; }
sharded<replica::database>& get_db() noexcept { return _db; }
service::migration_manager& get_migration_manager() noexcept { return _mm; }
sharded<db::system_distributed_keyspace>& get_sys_dist_ks() noexcept { return _sys_dist_ks; }
sharded<db::view::view_update_generator>& get_view_update_generator() noexcept { return _view_update_generator; }
sharded<db::view::view_builder>& get_view_builder() noexcept { return _view_builder; }
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
size_t max_repair_memory() const { return _max_repair_memory; }
seastar::semaphore& memory_sem() { return _memory_sem; }

View File

@@ -30,7 +30,7 @@
#include "db/view/view_update_checks.hh"
#include <unordered_map>
#include <boost/range/adaptor/map.hpp>
#include "db/view/view_update_generator.hh"
#include "db/view/view_builder.hh"
extern logging::logger dblog;
@@ -137,7 +137,7 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<r
// Loads SSTables into the main directory (or staging) and returns how many were loaded
future<size_t>
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<replica::database>& db,
sharded<db::view::view_update_generator>& view_update_generator, bool needs_view_update, sstring ks, sstring cf) {
sharded<db::view::view_builder>& vb, bool needs_view_update, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf);
auto new_sstables = std::vector<sstables::shared_sstable>();
@@ -161,9 +161,9 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
abort();
});
co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> {
if (sst->requires_view_building()) {
co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
co_await vb.local().register_staging_sstable(sst, table.shared_from_this());
}
});
@@ -171,8 +171,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
}
future<>
distributed_loader::process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks, sstring cf) {
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf) {
seastar::thread_attributes attr;
attr.sched_group = db.local().get_streaming_scheduling_group();
@@ -181,7 +180,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
}
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] {
return seastar::async(std::move(attr), [&db, &vb, ks = std::move(ks), cf = std::move(cf)] {
auto global_table = get_table_on_all_shards(db, ks, cf).get();
sharded<sstables::sstable_directory> directory;
@@ -230,10 +229,10 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
[] (const sstables::shared_sstable&) { return true; }).get();
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf);
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf);
}, size_t(0), std::plus<size_t>()).get();
dblog.info("Loaded {} SSTables", loaded);

View File

@@ -30,10 +30,9 @@ using column_family = table;
namespace db {
class config;
class system_distributed_keyspace;
class system_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -71,7 +70,7 @@ class distributed_loader {
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring cf_name);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
sharded<replica::database>& db, sharded<db::view::view_builder>& vb,
bool needs_view_update, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name);
@@ -96,8 +95,7 @@ public:
// The table UUID is returned too.
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<> process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
};
}

View File

@@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name);
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",

View File

@@ -20,9 +20,8 @@ class database;
namespace netw { class messaging_service; }
namespace db {
class system_distributed_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -32,9 +31,8 @@ class view_update_generator;
// system. Built on top of the distributed_loader functionality.
class sstables_loader : public seastar::peering_sharded_service<sstables_loader> {
sharded<replica::database>& _db;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
netw::messaging_service& _messaging;
sharded<db::view::view_builder>& _view_builder;
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
@@ -50,13 +48,11 @@ class sstables_loader : public seastar::peering_sharded_service<sstables_loader>
public:
sstables_loader(sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
netw::messaging_service& messaging)
netw::messaging_service& messaging,
sharded<db::view::view_builder>& vb)
: _db(db)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
, _messaging(messaging)
, _view_builder(vb)
{
}

View File

@@ -11,7 +11,7 @@
#include "consumer.hh"
#include "replica/database.hh"
#include "mutation/mutation_source_metadata.hh"
#include "db/view/view_update_generator.hh"
#include "db/view/view_builder.hh"
#include "db/view/view_update_checks.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
@@ -20,13 +20,12 @@ namespace streaming {
std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstring origin,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
sharded<db::view::view_builder>& vb,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy,
service::frozen_topology_guard frozen_guard) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
std::exception_ptr ex;
try {
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
@@ -36,7 +35,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto guard = service::topology_guard(frozen_guard);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
@@ -44,7 +43,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
// means partition estimation shouldn't be adjusted.
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
reader_consumer_v2 consumer =
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) {
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) {
sstables::shared_sstable sst;
try {
sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
@@ -66,11 +65,11 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
cf->enable_off_strategy_trigger();
}
return cf->add_sstable_and_update_cache(sst, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> {
}).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return vug.local().register_staging_sstable(sst, std::move(cf));
return vb.local().register_staging_sstable(sst, std::move(cf));
});
};
if (!offstrategy) {

View File

@@ -15,9 +15,8 @@ class database;
}
namespace db {
class system_distributed_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -25,8 +24,7 @@ namespace streaming {
std::function<future<>(flat_mutation_reader_v2)> make_streaming_consumer(sstring origin,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
sharded<db::view::view_builder>& vb,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy,

View File

@@ -24,14 +24,12 @@ extern logging::logger sslog;
stream_manager::stream_manager(db::config& cfg,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<db::view::view_builder>& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg)
: _db(db)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
, _view_builder(view_builder)
, _ms(ms)
, _mm(mm)
, _gossiper(gossiper)

View File

@@ -27,9 +27,8 @@
namespace db {
class config;
class system_distributed_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -84,8 +83,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
*/
private:
sharded<replica::database>& _db;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<db::view::view_builder>& _view_builder;
sharded<netw::messaging_service>& _ms;
sharded<service::migration_manager>& _mm;
gms::gossiper& _gossiper;
@@ -106,8 +104,7 @@ private:
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<db::view::view_builder>& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg);

View File

@@ -83,7 +83,7 @@ public:
std::function<future<>(flat_mutation_reader_v2)>
stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) {
return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
}
void stream_manager::init_messaging_service_handler(abort_source& as) {
@@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
if (!_view_builder.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
_db.local().get_token_metadata().get_topology().my_address())));
}

View File

@@ -726,7 +726,7 @@ private:
std::ref(_ms), std::ref(_fd)).get();
auto stop_raft_gr = deferred_stop(_group0_registry);
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer([this] { _stream_manager.stop().get(); });
_feature_service.invoke_on_all([] (auto& fs) {