Files
scylladb/db/view/view_builder.hh
Alex c44ad31d44 db/view: gate detached view-builder callbacks during shutdown
Detached migration callbacks (on_create_view, on_update_view, on_drop_view)
  can race with view_builder::drain() teardown.

  Add a lifetime gate to view_builder and wire callback launches through
  _ops_gate.hold() so each detached dispatch future is tracked until it
  completes (finally keeps the hold alive). During shutdown, drain()
  now waits for all tracked callback work with _ops_gate.close().

  This ensures drain does not proceed past callback lifetime while shutdown is in
  progress, and ignores only gate_closed_exception at callback entry as the
  expected shutdown path.
2026-02-18 11:56:41 +02:00

322 lines
15 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "query/query-request.hh"
#include "service/migration_listener.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/cross-shard-barrier.hh"
#include "replica/database.hh"
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <optional>
#include <unordered_map>
#include <vector>
namespace db {
class system_keyspace;
class system_distributed_keyspace;
}
namespace db {
using system_keyspace_view_name = std::pair<sstring, sstring>;
class system_keyspace_view_build_progress;
}
namespace service {
class migration_manager;
} // namespace service
namespace replica {
class database;
}
class exponential_backoff_retry;
namespace db::view {
class view_update_generator;
/**
* The view_builder is a sharded service responsible for building all defined materialized views.
* This process entails walking over the existing data in a given base table, and using it to
* calculate and insert the respective entries for one or more views.
*
* We employ a mutation_reader for each base table for which we're building views.
*
* We aim to be resource-conscious. On a given shard, at any given moment, we consume at most
* from one reader. We also strive for fairness, in that each build step inserts entries for
* the views of a different base. Each build step reads and generates updates for batch_size rows.
*
* We lack a controller, which could potentially allow us to go faster (to execute multiple steps at
* the same time, or consume more rows per batch), and also which would apply backpressure, so we
* could, for example, delay executing a build step.
*
* View building is necessarily a sharded process. That means that on restart, if the number of shards
* has changed, we need to calculate the most conservative token range that has been built, and build
* the remainder.
*
* Interaction with the system tables:
* - When we start building a view, we add an entry to the scylla_views_builds_in_progress
* system table. If the node restarts at this point, we'll consider these newly inserted
* views as having made no progress, and we'll treat them as new views;
* - When we finish a build step, we update the progress of the views that we built during
* this step by writing the next token to the scylla_views_builds_in_progress table. If
* the node restarts here, we'll start building the views at the token in the next_token column.
* - When we finish building a view, we mark it as completed in the built views system table, and
* remove it from the in-progress system table. Under failure, the following can happen:
* * When we fail to mark the view as built, we'll redo the last step upon node reboot;
* * When we fail to delete the in-progress record, upon reboot we'll remove this record.
* A view is marked as completed only when all shards have finished their share of the work, that is,
* if a view is not built, then all shards will still have an entry in the in-progress system table,
* - A view that a shard finished building, but not all other shards, remains in the in-progress system
* table, with first_token == next_token.
* Interaction with the distributed system table (view_build_status):
* - When we start building a view, we mark the view build as being in-progress;
* - When we finish building a view, we mark the view as being built. Upon failure,
* we ensure that if the view is in the in-progress system table, then it may not
* have been written to this table. We don't load the built views from this table
* when starting. When starting, the following happens:
* * If the view is in the system.built_views table and not the in-progress
* system table, then it will be in view_build_status;
* * If the view is in the system.built_views table and not in this one, it
* will still be in the in-progress system table - we detect this and mark
* it as built in this table too, keeping the invariant;
* * If the view is in this table but not in system.built_views, then it will
* also be in the in-progress system table - we don't detect this and will
* redo the missing step, for simplicity.
*/
class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service<view_builder> {
//aliasing for semaphore units that will be used throughout the class
using view_builder_units = semaphore_units<named_semaphore_exception_factory>;
//aliasing for optional semaphore units that will be used throughout the class
using view_builder_units_opt = std::optional<view_builder_units>;
/**
* Keeps track of the build progress for a particular view.
* When the view is built, next_token == first_token.
*/
struct view_build_status final {
view_ptr view;
dht::token first_token;
std::optional<dht::token> next_token;
};
/**
* view build progress status that is loaded from the table and used during initialization.
* similar to view_build_status except it may have null first_token if the shard didn't register itself yet.
*/
struct view_build_init_status final {
view_ptr view;
std::optional<dht::token> first_token;
std::optional<dht::token> next_token;
};
struct stats {
uint64_t steps_performed = 0;
uint64_t steps_failed = 0;
};
/**
* Keeps track of the build progress for all the views of a particular
* base table. Each execution of the build step comprises a query of
* the base table for the selected range.
*
* We pin the set of sstables that potentially contain data that should be added to a
* view (they are pinned by the mutation_reader). Adding a view v' overwrites the
* set of pinned sstables, regardless of there being another view v'' being built. The
* new set will potentially contain new data already in v'', written as part of the write
* path. We assume this case is rare and optimize for fewer disk space in detriment of
* network bandwidth.
*/
struct build_step final {
// Ensure we pin the column_family. It may happen that all views are removed,
// and that the base table is too before we can detect it.
lw_shared_ptr<replica::column_family> base;
query::partition_slice pslice;
dht::partition_range prange;
mutation_reader reader{nullptr};
dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()};
std::vector<view_build_status> build_status;
const dht::token& current_token() const {
return current_key.token();
}
};
using base_to_build_step_type = std::unordered_map<table_id, build_step>;
replica::database& _db;
db::system_keyspace& _sys_ks;
db::system_distributed_keyspace& _sys_dist_ks;
service::raft_group0_client& _group0_client;
cql3::query_processor& _qp;
service::migration_notifier& _mnotifier;
view_update_generator& _vug;
reader_permit _permit;
base_to_build_step_type _base_to_build_step;
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
condition_variable _build_step;
static constexpr size_t view_builder_semaphore_units = 1;
// Ensures bookkeeping operations are serialized, meaning that while we execute
// a build step we don't consider newly added or removed views. This simplifies
// the algorithms. Also synchronizes an operation wrt. a call to stop().
// Semaphore usage invariants:
// - One unit of _sem serializes all per-shard bookkeeping that mutates view-builder state
// (_base_to_build_step, _built_views, build_status, reader resets).
// - The unit is held for the whole operation, including the async chain, until the state
// is stable for the next operation on that shard.
// - Cross-shard operations acquire _sem on shard 0 for the duration of the broadcast.
// Other shards acquire their own _sem only around their local handling; shard 0 skips
// the local acquire because it already holds the unit from the dispatcher.
// Guard the whole startup routine with a semaphore so that it's not intercepted by
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::gate _ops_gate;
seastar::abort_source _as;
future<> _step_fiber = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view.
std::unordered_set<table_id> _built_views;
// Used for testing.
std::unordered_map<std::pair<sstring, sstring>, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers;
stats _stats;
metrics::metric_groups _metrics;
enum class view_build_status_location { sys_dist_ks, group0, both };
view_build_status_location _view_build_status_on = view_build_status_location::sys_dist_ks;
bool _init_virtual_table_on_upgrade = false;
utils::phased_barrier _upgrade_phaser;
struct view_builder_init_state {
std::vector<future<>> bookkeeping_ops;
std::vector<std::vector<view_build_init_status>> status_per_shard;
std::unordered_set<table_id> built_views;
};
future<> start_in_background(service::migration_manager&, utils::cross_shard_barrier b);
public:
// The view builder processes the base table in steps of batch_size rows.
// However, if the individual rows are large, there is no real need to
// collect batch_size of them in memory at once. Rather, as soon as we've
// collected batch_memory_max bytes, we can process the rows read so far.
static constexpr size_t batch_size = 128;
static constexpr size_t batch_memory_max = 1024*1024;
replica::database& get_db() noexcept { return _db; }
db::system_keyspace& get_sys_ks() noexcept { return _sys_ks; }
public:
view_builder(replica::database&, db::system_keyspace&, db::system_distributed_keyspace&, service::migration_notifier&, view_update_generator& vug,
service::raft_group0_client& group0_client, cql3::query_processor& qp);
view_builder(view_builder&&) = delete;
/**
* Loads the state stored in the system tables to resume building the existing views.
* Requires that all views have been loaded from the system tables and are accessible
* through the database, and that the commitlog has been replayed.
*/
future<> start(service::migration_manager&, utils::cross_shard_barrier b = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}));
/**
* Drains view building in order to prepare it for shutdown.
*/
future<> drain();
/**
* Stops the view building process.
*/
future<> stop();
static future<> generate_mutations_on_node_left(replica::database& db, db::system_keyspace& sys_ks, api::timestamp_type timestamp, locator::host_id host_id, utils::chunked_vector<canonical_mutation>& muts);
static future<> migrate_to_v1_5(locator::token_metadata_ptr tmptr, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as, service::group0_guard guard);
static future<> migrate_to_v2(locator::token_metadata_ptr tmptr, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as, service::group0_guard guard);
future<> upgrade_to_v1_5();
future<> upgrade_to_v2();
void init_virtual_table();
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
// For tests
future<> wait_until_built(const sstring& ks_name, const sstring& view_name);
future<std::unordered_map<sstring, sstring>> view_build_statuses(sstring keyspace, sstring view_name, const gms::gossiper& g) const;
// Can only be called on shard-0
future<> mark_existing_views_as_built();
future<bool> check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name);
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
private:
build_step& get_or_create_build_step(table_id);
future<> initialize_reader_at_current_token(build_step&);
void load_view_status(view_build_init_status, std::unordered_set<table_id>&);
void reshard(std::vector<std::vector<view_build_init_status>>, std::unordered_set<table_id>&);
void setup_shard_build_step(view_builder_init_state& vbi, std::vector<system_keyspace_view_name>, std::vector<system_keyspace_view_build_progress>);
future<> calculate_shard_build_step(view_builder_init_state& vbi);
future<> add_new_view(view_ptr, build_step&);
future<> run_in_background();
void execute(build_step&, exponential_backoff_retry);
future<> maybe_mark_view_as_built(view_ptr, dht::token);
future<> mark_as_built(view_ptr);
void setup_metrics();
future<> dispatch_create_view(sstring ks_name, sstring view_name);
future<> dispatch_update_view(sstring ks_name, sstring view_name);
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
future<> handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name);
future<> handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units);
future<> handle_drop_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units);
future<> handle_drop_view_global_cleanup(const sstring& ks_name, const sstring& view_name);
future<view_builder_units> get_or_adopt_view_builder_lock(view_builder_units_opt units);
template <typename Func1, typename Func2>
future<> write_view_build_status(Func1&& fn_group0, Func2&& fn_sys_dist) {
auto op = _upgrade_phaser.start();
// read locally so it doesn't change between async calls
const auto v = _view_build_status_on;
if (v == view_build_status_location::group0 || v == view_build_status_location::both) {
co_await fn_group0();
}
if (v == view_build_status_location::sys_dist_ks || v == view_build_status_location::both) {
co_await fn_sys_dist();
}
}
future<> mark_view_build_started(sstring ks_name, sstring view_name);
future<> mark_view_build_success(sstring ks_name, sstring view_name);
future<> remove_view_build_status(sstring ks_name, sstring view_name);
future<std::unordered_map<locator::host_id, sstring>> view_status(sstring ks_name, sstring view_name) const;
struct consumer;
};
}