Merge 'Rework view services and system-distributed-keyspace dependencies' from Pavel Emelyanov
The system-distributed-keyspace and view-update-generator often go in pair, because streaming, repair and sstables-loader (via distributed-loader) need them booth to check if sstable is staging and register it if it's such. The check is performed by messing directly with system_distributed.view_build_status table, and the registration happens via view-update-generator. That's not nice, other services shouldn't know that view status is kept in system table. Also view-update-generator is a service to generae and push view updates, the fact that it keeps staging sstables list is the implementation detail. This PR replaces dependencies on the mentioned pair of services with the single dependency on view-builder (repair, sstables-loader and stream-manager are enlightened) and hides the view building-vs-staging details inside the view_builder. Along the way, some simplification of repair_writer_impl class is done. Closes scylladb/scylladb#18706 * github.com:scylladb/scylladb: stream_manager: Remove system_distributed_keyspace and view_update_generator repair: Remove system_distributed_keyspace and view_update_generator streaming: Remove system_distributed_keyspace and view_update_generator sstables_loader: Remove system_distributed_keyspace and view_update_generator distributed_loader: Remove system_distributed_keyspace and view_update_generator view: Make register_staging_sstable() a method of view_builder view: Make check_view_build_ongoing() helper a method of view_builder streaming: Proparage view_builder& down to make_streaming_consumer() repair: Keep view_builder& on repair_writer_impl distributed_loader: Propagate view_builder& via process_upload_dir() stream_manager: Add view builder dependency repair_service: Add view builder dependency sstables_loader: Add view_bulder dependency main: Start sstables loader later repair: Remove unwanted local references from repair_meta
This commit is contained in:
@@ -2639,10 +2639,9 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac
|
||||
return std::max(backlog, _max.load(std::memory_order_relaxed));
|
||||
}
|
||||
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name,
|
||||
const sstring& cf_name) {
|
||||
future<bool> view_builder::check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name) {
|
||||
using view_statuses_type = std::unordered_map<locator::host_id, sstring>;
|
||||
return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
|
||||
return _sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
|
||||
return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
|
||||
// Only consider status of known hosts.
|
||||
return view_status.second == "STARTED" && tm.get_endpoint_for_host_id_if_known(view_status.first);
|
||||
@@ -2650,17 +2649,20 @@ future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason) {
|
||||
future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table) {
|
||||
return _vug.register_staging_sstable(std::move(sst), std::move(table));
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) {
|
||||
if (is_internal_keyspace(t.schema()->ks_name())) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) {
|
||||
return do_with(t.views(), [&vb, &tm] (auto& views) {
|
||||
return map_reduce(views,
|
||||
[&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); },
|
||||
[&vb, &tm] (const view_ptr& view) { return vb.check_view_build_ongoing(tm, view->ks_name(), view->cf_name()); },
|
||||
false,
|
||||
std::logical_or<bool>());
|
||||
});
|
||||
|
||||
@@ -221,6 +221,8 @@ public:
|
||||
|
||||
// Can only be called on shard-0
|
||||
future<> mark_existing_views_as_built();
|
||||
future<bool> check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name);
|
||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
|
||||
|
||||
private:
|
||||
build_step& get_or_create_build_step(table_id);
|
||||
|
||||
@@ -16,19 +16,14 @@ namespace replica {
|
||||
class table;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_distributed_keyspace;
|
||||
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
class view_builder;
|
||||
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason);
|
||||
|
||||
}
|
||||
|
||||
25
main.cc
25
main.cc
@@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
supervisor::notify("starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
@@ -1624,7 +1624,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// both)
|
||||
supervisor::notify("starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
repair.stop().get();
|
||||
});
|
||||
@@ -1683,17 +1683,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
api::unset_server_task_manager_test(ctx).get();
|
||||
});
|
||||
#endif
|
||||
supervisor::notify("starting sstables loader");
|
||||
sst_loader.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging)).get();
|
||||
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
|
||||
sst_loader.stop().get();
|
||||
});
|
||||
api::set_server_sstables_loader(ctx, sst_loader).get();
|
||||
auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] {
|
||||
api::unset_server_sstables_loader(ctx).get();
|
||||
});
|
||||
|
||||
|
||||
gossiper.local().register_(ss.local().shared_from_this());
|
||||
auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] {
|
||||
gossiper.local().unregister_(ss.local().shared_from_this()).get();
|
||||
@@ -1802,6 +1791,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
view_builder.stop().get();
|
||||
});
|
||||
|
||||
supervisor::notify("starting sstables loader");
|
||||
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder)).get();
|
||||
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
|
||||
sst_loader.stop().get();
|
||||
});
|
||||
api::set_server_sstables_loader(ctx, sst_loader).get();
|
||||
auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] {
|
||||
api::unset_server_sstables_loader(ctx).get();
|
||||
});
|
||||
|
||||
/*
|
||||
* FIXME. In bb07678346 commit the API toggle for autocompaction was
|
||||
* (partially) delayed until system prepared to join the ring. Probably
|
||||
|
||||
@@ -42,9 +42,8 @@ class database;
|
||||
class repair_service;
|
||||
namespace db {
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
class view_builder;
|
||||
}
|
||||
class system_distributed_keyspace;
|
||||
}
|
||||
namespace netw { class messaging_service; }
|
||||
namespace service {
|
||||
|
||||
@@ -418,8 +418,7 @@ class repair_writer_impl : public repair_writer::impl {
|
||||
std::optional<future<>> _writer_done;
|
||||
mutation_fragment_queue _mq;
|
||||
sharded<replica::database>& _db;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
streaming::stream_reason _reason;
|
||||
flat_mutation_reader_v2 _queue_reader;
|
||||
public:
|
||||
@@ -427,8 +426,7 @@ public:
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
streaming::stream_reason reason,
|
||||
mutation_fragment_queue queue,
|
||||
flat_mutation_reader_v2 queue_reader)
|
||||
@@ -436,8 +434,7 @@ public:
|
||||
, _permit(std::move(permit))
|
||||
, _mq(std::move(queue))
|
||||
, _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _view_builder(view_builder)
|
||||
, _reason(reason)
|
||||
, _queue_reader(std::move(queue_reader))
|
||||
{}
|
||||
@@ -514,7 +511,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
|
||||
auto erm = t.get_effective_replication_map();
|
||||
auto& sharder = erm->get_sharder(*(w->schema()));
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
|
||||
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
|
||||
@@ -531,11 +528,10 @@ lw_shared_ptr<repair_writer> make_repair_writer(
|
||||
reader_permit permit,
|
||||
streaming::stream_reason reason,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator) {
|
||||
sharded<db::view::view_builder>& view_builder) {
|
||||
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
|
||||
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
|
||||
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
|
||||
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader));
|
||||
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
|
||||
}
|
||||
|
||||
@@ -735,8 +731,6 @@ private:
|
||||
repair_service& _rs;
|
||||
seastar::sharded<replica::database>& _db;
|
||||
netw::messaging_service& _messaging;
|
||||
seastar::sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
seastar::sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
dht::token_range _range;
|
||||
@@ -852,8 +846,6 @@ public:
|
||||
: _rs(rs)
|
||||
, _db(rs.get_db())
|
||||
, _messaging(rs.get_messaging())
|
||||
, _sys_dist_ks(rs.get_sys_dist_ks())
|
||||
, _view_update_generator(rs.get_view_update_generator())
|
||||
, _schema(s)
|
||||
, _permit(std::move(permit))
|
||||
, _range(range)
|
||||
@@ -868,7 +860,7 @@ public:
|
||||
, _remote_sharder(make_remote_sharder())
|
||||
, _same_sharding_config(is_same_sharding_config(cf))
|
||||
, _nr_peer_nodes(nr_peer_nodes)
|
||||
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator))
|
||||
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder()))
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, netw::messaging_service::msg_addr addr) {
|
||||
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
|
||||
@@ -2462,7 +2454,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
|
||||
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
|
||||
if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) {
|
||||
if (!local_repair._view_builder.local_is_initialized()) {
|
||||
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
|
||||
local_repair.my_address())));
|
||||
}
|
||||
@@ -3184,9 +3176,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& sp,
|
||||
sharded<service::raft_address_map>& addr_map,
|
||||
sharded<db::batchlog_manager>& bm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory)
|
||||
@@ -3196,9 +3187,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
, _sp(sp)
|
||||
, _addr_map(addr_map)
|
||||
, _bm(bm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _sys_ks(sys_ks)
|
||||
, _view_update_generator(vug)
|
||||
, _view_builder(vb)
|
||||
, _repair_module(seastar::make_shared<repair::task_manager_module>(tm, *this, max_repair_memory))
|
||||
, _mm(mm)
|
||||
, _node_ops_metrics(_repair_module)
|
||||
|
||||
@@ -92,9 +92,8 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<service::storage_proxy>& _sp;
|
||||
sharded<service::raft_address_map>& _addr_map;
|
||||
sharded<db::batchlog_manager>& _bm;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
shared_ptr<repair::task_manager_module> _repair_module;
|
||||
service::migration_manager& _mm;
|
||||
node_ops_metrics _node_ops_metrics;
|
||||
@@ -122,9 +121,8 @@ public:
|
||||
sharded<service::storage_proxy>& sp,
|
||||
sharded<service::raft_address_map>& addr_map,
|
||||
sharded<db::batchlog_manager>& bm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm, size_t max_repair_memory);
|
||||
~repair_service();
|
||||
@@ -180,8 +178,7 @@ public:
|
||||
netw::messaging_service& get_messaging() noexcept { return _messaging; }
|
||||
sharded<replica::database>& get_db() noexcept { return _db; }
|
||||
service::migration_manager& get_migration_manager() noexcept { return _mm; }
|
||||
sharded<db::system_distributed_keyspace>& get_sys_dist_ks() noexcept { return _sys_dist_ks; }
|
||||
sharded<db::view::view_update_generator>& get_view_update_generator() noexcept { return _view_update_generator; }
|
||||
sharded<db::view::view_builder>& get_view_builder() noexcept { return _view_builder; }
|
||||
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
|
||||
size_t max_repair_memory() const { return _max_repair_memory; }
|
||||
seastar::semaphore& memory_sem() { return _memory_sem; }
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
#include "db/view/view_update_checks.hh"
|
||||
#include <unordered_map>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "db/view/view_update_generator.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
|
||||
extern logging::logger dblog;
|
||||
|
||||
@@ -137,7 +137,7 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<r
|
||||
// Loads SSTables into the main directory (or staging) and returns how many were loaded
|
||||
future<size_t>
|
||||
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<replica::database>& db,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, bool needs_view_update, sstring ks, sstring cf) {
|
||||
sharded<db::view::view_builder>& vb, bool needs_view_update, sstring ks, sstring cf) {
|
||||
|
||||
auto& table = db.local().find_column_family(ks, cf);
|
||||
auto new_sstables = std::vector<sstables::shared_sstable>();
|
||||
@@ -161,9 +161,9 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
|
||||
abort();
|
||||
});
|
||||
|
||||
co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst->requires_view_building()) {
|
||||
co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
co_await vb.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
}
|
||||
});
|
||||
|
||||
@@ -171,8 +171,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
|
||||
}
|
||||
|
||||
future<>
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
distributed<db::view::view_update_generator>& view_update_generator, sstring ks, sstring cf) {
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf) {
|
||||
seastar::thread_attributes attr;
|
||||
attr.sched_group = db.local().get_streaming_scheduling_group();
|
||||
|
||||
@@ -181,7 +180,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
|
||||
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
|
||||
}
|
||||
|
||||
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] {
|
||||
return seastar::async(std::move(attr), [&db, &vb, ks = std::move(ks), cf = std::move(cf)] {
|
||||
auto global_table = get_table_on_all_shards(db, ks, cf).get();
|
||||
|
||||
sharded<sstables::sstable_directory> directory;
|
||||
@@ -230,10 +229,10 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
|
||||
[] (const sstables::shared_sstable&) { return true; }).get();
|
||||
|
||||
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
|
||||
|
||||
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &view_update_generator] (sstables::sstable_directory& dir) {
|
||||
return make_sstables_available(dir, db, view_update_generator, use_view_update_path, ks, cf);
|
||||
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) {
|
||||
return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf);
|
||||
}, size_t(0), std::plus<size_t>()).get();
|
||||
|
||||
dblog.info("Loaded {} SSTables", loaded);
|
||||
|
||||
@@ -30,10 +30,9 @@ using column_family = table;
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
class system_distributed_keyspace;
|
||||
class system_keyspace;
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
class view_builder;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +70,7 @@ class distributed_loader {
|
||||
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
|
||||
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring cf_name);
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<replica::database>& db, sharded<db::view::view_builder>& vb,
|
||||
bool needs_view_update, sstring ks, sstring cf);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name);
|
||||
|
||||
@@ -96,8 +95,7 @@ public:
|
||||
// The table UUID is returned too.
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -430,7 +430,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
|
||||
});
|
||||
} else {
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name);
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
|
||||
}
|
||||
} catch (...) {
|
||||
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
|
||||
|
||||
@@ -20,9 +20,8 @@ class database;
|
||||
|
||||
namespace netw { class messaging_service; }
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
class view_builder;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,9 +31,8 @@ class view_update_generator;
|
||||
// system. Built on top of the distributed_loader functionality.
|
||||
class sstables_loader : public seastar::peering_sharded_service<sstables_loader> {
|
||||
sharded<replica::database>& _db;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
netw::messaging_service& _messaging;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
|
||||
// Note that this is obviously only valid for the current shard. Users of
|
||||
// this facility should elect a shard to be the coordinator based on any
|
||||
@@ -50,13 +48,11 @@ class sstables_loader : public seastar::peering_sharded_service<sstables_loader>
|
||||
|
||||
public:
|
||||
sstables_loader(sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
netw::messaging_service& messaging)
|
||||
netw::messaging_service& messaging,
|
||||
sharded<db::view::view_builder>& vb)
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _messaging(messaging)
|
||||
, _view_builder(vb)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
#include "consumer.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "mutation/mutation_source_metadata.hh"
|
||||
#include "db/view/view_update_generator.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "db/view/view_update_checks.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
@@ -20,13 +20,12 @@ namespace streaming {
|
||||
|
||||
std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
service::frozen_topology_guard frozen_guard) {
|
||||
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
|
||||
return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
|
||||
@@ -36,7 +35,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
|
||||
|
||||
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
|
||||
auto guard = service::topology_guard(frozen_guard);
|
||||
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
|
||||
auto use_view_update_path = co_await db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *cf, reason);
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
auto metadata = mutation_source_metadata{};
|
||||
auto& cs = cf->get_compaction_strategy();
|
||||
@@ -44,7 +43,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
|
||||
// means partition estimation shouldn't be adjusted.
|
||||
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
|
||||
reader_consumer_v2 consumer =
|
||||
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) {
|
||||
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) {
|
||||
sstables::shared_sstable sst;
|
||||
try {
|
||||
sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
|
||||
@@ -66,11 +65,11 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
|
||||
cf->enable_off_strategy_trigger();
|
||||
}
|
||||
return cf->add_sstable_and_update_cache(sst, offstrategy);
|
||||
}).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> {
|
||||
}).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> {
|
||||
if (!use_view_update_path) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return vug.local().register_staging_sstable(sst, std::move(cf));
|
||||
return vb.local().register_staging_sstable(sst, std::move(cf));
|
||||
});
|
||||
};
|
||||
if (!offstrategy) {
|
||||
|
||||
@@ -15,9 +15,8 @@ class database;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
class view_builder;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,8 +24,7 @@ namespace streaming {
|
||||
|
||||
std::function<future<>(flat_mutation_reader_v2)> make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
|
||||
@@ -24,14 +24,12 @@ extern logging::logger sslog;
|
||||
|
||||
stream_manager::stream_manager(db::config& cfg,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper, scheduling_group sg)
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _view_builder(view_builder)
|
||||
, _ms(ms)
|
||||
, _mm(mm)
|
||||
, _gossiper(gossiper)
|
||||
|
||||
@@ -27,9 +27,8 @@
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
class system_distributed_keyspace;
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
class view_builder;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,8 +83,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
|
||||
*/
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
sharded<netw::messaging_service>& _ms;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
gms::gossiper& _gossiper;
|
||||
@@ -106,8 +104,7 @@ private:
|
||||
|
||||
public:
|
||||
stream_manager(db::config& cfg, sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper, scheduling_group sg);
|
||||
|
||||
@@ -83,7 +83,7 @@ public:
|
||||
|
||||
std::function<future<>(flat_mutation_reader_v2)>
|
||||
stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) {
|
||||
return streaming::make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
}
|
||||
|
||||
void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
@@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
|
||||
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
|
||||
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
|
||||
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
|
||||
if (!_view_builder.local_is_initialized()) {
|
||||
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
|
||||
_db.local().get_token_metadata().get_topology().my_address())));
|
||||
}
|
||||
|
||||
@@ -726,7 +726,7 @@ private:
|
||||
std::ref(_ms), std::ref(_fd)).get();
|
||||
auto stop_raft_gr = deferred_stop(_group0_registry);
|
||||
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer([this] { _stream_manager.stop().get(); });
|
||||
|
||||
_feature_service.invoke_on_all([] (auto& fs) {
|
||||
|
||||
Reference in New Issue
Block a user