/* * Copyright (C) 2018-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include "sstables/shared_sstable.hh" #include "db/timeout_clock.hh" #include "utils/chunked_vector.hh" #include "schema/schema_fwd.hh" #include "gc_clock.hh" #include #include #include #include #include #include using namespace seastar; struct frozen_mutation_and_schema; class mutation; class reader_permit; class mutation_reader; class view_ptr; using mutation_reader_opt = optimized_optional; namespace dht { class token; } namespace tracing { class trace_state_ptr; } namespace replica { class database; class table; struct cf_stats; } namespace service { class storage_proxy; struct allow_hints_tag; using allow_hints = bool_class; } namespace db::view { class stats; struct wait_for_all_updates_tag {}; using wait_for_all_updates = bool_class; class view_update_generator : public async_sharded_service { public: static constexpr size_t registration_queue_size = 100; private: replica::database& _db; sharded& _proxy; seastar::abort_source _as; future<> _started = make_ready_future<>(); seastar::condition_variable _pending_sstables; named_semaphore _registration_sem{registration_queue_size, named_semaphore_exception_factory{"view update generator"}}; std::unordered_map, std::vector> _sstables_with_tables; std::unordered_map, std::vector> _sstables_to_move; metrics::metric_groups _metrics; class progress_tracker; std::unique_ptr _progress_tracker; optimized_optional _early_abort_subscription; void do_abort() noexcept; public: view_update_generator(replica::database& db, sharded& proxy, abort_source& as); ~view_update_generator(); future<> start(); future<> drain(); future<> stop(); future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table); // Generate view updates from staging sstables instantly and move those sstables to table's base directory future<> process_staging_sstables(lw_shared_ptr table, std::vector sstables); replica::database& get_db() noexcept { return _db; } const sharded& get_storage_proxy() const noexcept { return _proxy; }; private: future<> mutate_MV( schema_ptr base, dht::token base_token, utils::chunked_vector view_updates, db::view::stats& stats, replica::cf_stats& cf_stats, tracing::trace_state_ptr tr_state, db::timeout_semaphore_units pending_view_update_memory_units, service::allow_hints allow_hints, wait_for_all_updates wait_for_all); std::pair generate_updates_from_staging_sstables(lw_shared_ptr table, std::vector& sstables); public: ssize_t available_register_units() const { return _registration_sem.available_units(); } size_t queued_batches_count() const { return _sstables_with_tables.size(); } // Reader's schema must be the same as the base schema of each of the views. future<> populate_views(const replica::table& base, std::vector, dht::token base_token, mutation_reader&&, gc_clock::time_point); future<> generate_and_propagate_view_updates(const replica::table& table, const schema_ptr& base, reader_permit permit, std::vector&& views, mutation&& m, mutation_reader_opt existings, tracing::trace_state_ptr tr_state, gc_clock::time_point now, db::timeout_clock::time_point timeout); private: bool should_throttle() const; void setup_metrics(); void discover_staging_sstables(); }; }