Files
scylladb/db/view/view_building_worker.cc
Pavel Emelyanov 19ea05692c view_build_worker: Do not switch scheduling groups inside work_on_view_building_tasks
The handler appeared back in c9e710dca3. In this commit it performed the
"core" part of the task -- the do_build_range() method -- inside the
streaming sched group. The setup code looks seemingly was copied from the
view_builder::do_build_step() method and got the explicit switch of the
scheduling group.

The switch looks both -- justified and not. On one hand, it makes it
explict that the activity runs in the streaming scheduling group. On the
other hand, the verb already uses RPC index on 1, which is negotiated to
be run in streaming group anyway. On the "third hand", even though being
explicit the switch happens too late, as there exists a lot of other
activities performed by the handler that seems to also belong to the
same scheduling group, but which is not switched into explicitly.

By and large, it seems better to avoid the explicit switch and rely on
the RPC-level negotiation-based sched group switching.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#28397
2026-02-03 07:00:32 +02:00

824 lines
35 KiB
C++

/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <iterator>
#include <ranges>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/semaphore.hh>
#include <stdexcept>
#include <unordered_set>
#include <utility>
#include "db/view/view_building_worker.hh"
#include "db/view/view_consumer.hh"
#include "dht/token.hh"
#include "replica/database.hh"
#include "service/storage_proxy.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "schema/schema_fwd.hh"
#include "idl/view.dist.hh"
#include "sstables/sstables.hh"
#include "utils/exponential_backoff_retry.hh"
static logging::logger vbw_logger("view_building_worker");
namespace db {
namespace view {
// Called in the context of a seastar::thread.
class view_building_worker::consumer : public view_consumer {
replica::database& _db;
const std::vector<table_id> _views_ids;
lw_shared_ptr<replica::table> _base;
dht::decorated_key _current_key;
mutation_reader& _reader;
reader_permit _permit;
protected:
virtual void load_views_to_build() override {
_views_to_build = _views_ids | std::views::filter([this] (const auto& id) {
return _db.column_family_exists(id);
}) | std::views::transform([this] (const auto& id) {
return view_ptr(_db.find_schema(id));
}) | std::views::filter([this] (const view_ptr& view) {
return partition_key_matches(_db.as_data_dictionary(), *_reader.schema(), *view->view_info(), _current_key);
}) | std::ranges::to<std::vector>();
}
virtual void check_for_built_views() override {}
virtual bool should_stop_consuming_end_of_partition() override {
return false;
}
virtual dht::decorated_key& get_current_key() override {
return _current_key;
}
virtual void set_current_key(dht::decorated_key key) override {
_current_key = std::move(key);
}
virtual lw_shared_ptr<replica::table> base() override {
return _base;
}
virtual mutation_reader& reader() override {
return _reader;
}
virtual reader_permit& permit() override {
return _permit;
}
public:
consumer(replica::database& db, std::vector<table_id> views_ids, lw_shared_ptr<replica::table> base, mutation_reader& reader, reader_permit permit, shared_ptr<view_update_generator> gen, gc_clock::time_point now, abort_source& as)
: view_consumer(std::move(gen), now, as)
, _db(db)
, _views_ids(std::move(views_ids))
, _base(base)
, _current_key(dht::minimum_token(), partition_key::make_empty())
, _reader(reader)
, _permit(std::move(permit)) {}
dht::token consume_end_of_stream() {
return _current_key.token();
}
};
static shard_id get_sstable_shard_id(const sstables::sstable& sst) {
auto shards = sst.get_shards_for_this_sstable();
#ifdef SEASTAR_DEBUG
// Sstable from tablet-table should belong to only one shard
SCYLLA_ASSERT(shards.size() == 1);
#endif
return shards[0];
}
static locator::tablet_id get_sstable_tablet_id(const locator::tablet_map& tablet_map, const sstables::sstable& sst) {
auto last_token = sst.get_last_decorated_key().token();
auto tablet_id = tablet_map.get_tablet_id(last_token);
#ifdef SEASTAR_DEBUG
// Single sstable from tablet-table should contain data for only one tablet
auto first_token = sst.get_first_decorated_key().token();
auto first_token_tablet_id = tablet_map.get_tablet_id(first_token);
SCYLLA_ASSERT(tablet_id == first_token_tablet_id);
#endif
return tablet_id;
}
view_building_worker::view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier, service::raft_group0& group0, view_update_generator& vug, netw::messaging_service& ms, view_building_state_machine& vbsm)
: _db(db)
, _sys_ks(sys_ks)
, _mnotifier(mnotifier)
, _group0(group0)
, _vug(vug)
, _messaging(ms)
, _vb_state_machine(vbsm)
, _gate("view_building_worker_gate")
{
init_messaging_service();
}
future<> view_building_worker::init() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await discover_existing_staging_sstables();
_staging_sstables_registrator = run_staging_sstables_registrator();
_view_building_state_observer = run_view_building_state_observer();
_mnotifier.register_listener(this);
}
dht::token_range view_building_worker::get_tablet_token_range(table_id table_id, dht::token last_token) {
auto& cf = _db.find_column_family(table_id);
auto& tablet_map = cf.get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
return tablet_map.get_token_range(tablet_map.get_tablet_id(last_token));
}
future<> view_building_worker::drain() {
if (!_as.abort_requested()) {
_as.request_abort();
}
_state._mutex.broken();
_staging_sstables_mutex.broken();
_sstables_to_register_event.broken();
if (this_shard_id() == 0) {
auto sstable_registrator = std::exchange(_staging_sstables_registrator, make_ready_future<>());
co_await std::move(sstable_registrator);
auto state_observer = std::exchange(_view_building_state_observer, make_ready_future<>());
co_await std::move(state_observer);
co_await _mnotifier.unregister_listener(this);
}
co_await _state.clear();
co_await uninit_messaging_service();
}
future<> view_building_worker::stop() {
co_await drain();
co_await _gate.close();
}
void view_building_worker::on_drop_view(const sstring& ks_name, const sstring& view_name) {
(void)with_gate(_gate, [&, this] {
return _sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name);
});
}
future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstables::shared_sstable> ssts, table_id table_id) {
auto& tablet_map = _db.get_token_metadata().tablets().get_tablet_map(table_id);
auto staging_task_infos = ssts | std::views::as_rvalue | std::views::transform([&] (sstables::shared_sstable sst) {
auto tid = get_sstable_tablet_id(tablet_map, *sst);
return staging_sstable_task_info {
.table_id = table_id,
.shard = get_sstable_shard_id(*sst),
.last_token = tablet_map.get_last_token(tid),
.sst_foreign_ptr = make_foreign(std::move(sst))
};
}) | std::ranges::to<std::vector>();
co_await container().invoke_on(0, [staging_task_infos = std::move(staging_task_infos), table_id] (view_building_worker& local_vbw) mutable -> future<> {
try {
auto lock = co_await get_units(local_vbw._staging_sstables_mutex, 1, local_vbw._as);
vbw_logger.debug("Saving {} sstables for table {} to create view building tasks", staging_task_infos.size(), table_id);
auto& sstables_queue = local_vbw._sstables_to_register[table_id];
sstables_queue.insert(sstables_queue.end(), std::make_move_iterator(staging_task_infos.begin()), std::make_move_iterator(staging_task_infos.end()));
local_vbw._sstables_to_register_event.broadcast();
} catch (semaphore_aborted&) {
vbw_logger.warn("Semaphore was aborted while waiting to register {} sstables for table {}", staging_task_infos.size(), table_id);
}
});
}
future<> view_building_worker::run_staging_sstables_registrator() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
co_await create_staging_sstable_tasks();
lock.return_all();
_as.check();
co_await _sstables_to_register_event.when();
} catch (semaphore_aborted&) {
vbw_logger.warn("Got semaphore_aborted while creating staging sstable tasks");
} catch (broken_condition_variable&) {
vbw_logger.warn("Got broken_condition_variable while creating staging sstable tasks");
} catch (abort_requested_exception&) {
vbw_logger.warn("Got abort_requested_exception while creating staging sstable tasks");
} catch (service::group0_concurrent_modification&) {
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
} catch (raft::request_aborted&) {
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
} catch (...) {
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
sleep = true;
}
if (sleep) {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
}
}
}
future<> view_building_worker::create_staging_sstable_tasks() {
if (_sstables_to_register.empty()) {
co_return;
}
utils::chunked_vector<canonical_mutation> cmuts;
auto guard = co_await _group0.client().start_operation(_as);
auto my_host_id = _db.get_token_metadata().get_topology().my_host_id();
for (auto& [table_id, sst_infos]: _sstables_to_register) {
for (auto& sst_info: sst_infos) {
view_building_task task {
utils::UUID_gen::get_time_UUID(), view_building_task::task_type::process_staging, false,
table_id, ::table_id{}, {my_host_id, sst_info.shard}, sst_info.last_token
};
auto mut = co_await _sys_ks.make_view_building_task_mutation(guard.write_timestamp(), task);
cmuts.emplace_back(std::move(mut));
}
}
vbw_logger.debug("Creating {} process_staging view_building_tasks", cmuts.size());
auto cmd = _group0.client().prepare_command(service::write_mutations{std::move(cmuts)}, guard, "create view building tasks");
co_await _group0.client().add_entry(std::move(cmd), std::move(guard), _as);
// Move staging sstables from `_sstables_to_register` (on shard0) to `_staging_sstables` on corresponding shards.
// Firstly reorgenize `_sstables_to_register` for easier movement.
// This is done in separate loop after committing the group0 command, because we need to move values from `_sstables_to_register`
// (`staging_sstable_task_info` is non-copyable because of `foreign_ptr` field).
std::unordered_map<shard_id, std::unordered_map<table_id, std::vector<foreign_ptr<sstables::shared_sstable>>>> new_sstables_per_shard;
for (auto& [table_id, sst_infos]: _sstables_to_register) {
for (auto& sst_info: sst_infos) {
new_sstables_per_shard[sst_info.shard][table_id].push_back(std::move(sst_info.sst_foreign_ptr));
}
}
for (auto& [shard, sstables_per_table]: new_sstables_per_shard) {
co_await container().invoke_on(shard, [sstables_for_this_shard = std::move(sstables_per_table)] (view_building_worker& local_vbw) mutable {
for (auto& [tid, ssts]: sstables_for_this_shard) {
auto unwrapped_ssts = ssts | std::views::as_rvalue | std::views::transform([] (auto&& fptr) {
return fptr.unwrap_on_owner_shard();
}) | std::ranges::to<std::vector>();
auto& tid_ssts = local_vbw._staging_sstables[tid];
tid_ssts.insert(tid_ssts.end(), std::make_move_iterator(unwrapped_ssts.begin()), std::make_move_iterator(unwrapped_ssts.end()));
}
});
}
_sstables_to_register.clear();
}
future<> view_building_worker::discover_existing_staging_sstables() {
auto merge_maps = [] (auto& a, auto&& b) mutable {
for (auto& [tid, ssts]: b) {
auto& tid_ssts = a[tid];
tid_ssts.insert(tid_ssts.end(), std::make_move_iterator(ssts.begin()), std::make_move_iterator(ssts.end()));
}
};
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
auto new_staging_tasks = co_await container().map_reduce0([building_tasks = _vb_state_machine.building_state.tasks_state] (auto& vbw) -> future<std::unordered_map<table_id, std::vector<staging_sstable_task_info>>> {
auto new_tasks = vbw.discover_local_staging_sstables(std::move(building_tasks));
co_return new_tasks;
}, std::unordered_map<table_id, std::vector<staging_sstable_task_info>>{}, [&] (std::unordered_map<table_id, std::vector<staging_sstable_task_info>> a, std::unordered_map<table_id, std::vector<staging_sstable_task_info>>&& b) {
merge_maps(a, std::move(b));
return a;
});
merge_maps(_sstables_to_register, std::move(new_staging_tasks));
}
static bool staging_task_exists(const building_tasks& tasks, table_id table_id, const locator::tablet_replica& replica, dht::token last_token) {
if (!tasks.contains(table_id) || !tasks.at(table_id).contains(replica)) {
return false;
}
auto& replica_tasks = tasks.at(table_id).at(replica);
return std::ranges::any_of(replica_tasks.staging_tasks, [&last_token] (auto& t) {
return t.second.last_token == last_token;
});
}
// Because view building state lives only on shard0, the method needs to take copy of building tasks
// to determine whether a task for particular staging sstable exists or not.
std::unordered_map<table_id, std::vector<view_building_worker::staging_sstable_task_info>> view_building_worker::discover_local_staging_sstables(building_tasks building_tasks) {
std::unordered_map<table_id, std::vector<staging_sstable_task_info>> tasks_to_create;
auto my_host_id = _db.get_token_metadata().get_topology().my_host_id();
_db.get_tables_metadata().for_each_table([&] (table_id table_id, lw_shared_ptr<replica::table> table) {
if (!table->uses_tablets()) {
return;
}
// scylladb/scylladb#26403: Make sure to access the tablets map via the effective replication map of the table object.
// The token metadata object pointed to by the database (`_db.get_token_metadata()`) may not contain
// the tablets map of the currently processed table yet. After #24414 is fixed, this should not matter anymore.
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
auto sstables = table->get_sstables();
for (auto sstable: *sstables) {
if (!sstable->requires_view_building()) {
continue;
}
auto shard = get_sstable_shard_id(*sstable);
auto tid = get_sstable_tablet_id(tablet_map, *sstable);
auto last_token = tablet_map.get_last_token(tid);
if (!staging_task_exists(building_tasks, table_id, {my_host_id, shard}, last_token)) {
// For the future: we can check if the sstable needs to go through view building coordinator
// or maybe it can be registered to view_update_generator directly.
tasks_to_create[table_id].emplace_back(table_id, shard, last_token, make_foreign(std::move(sstable)));
} else {
_staging_sstables[table_id].push_back(std::move(sstable));
}
}
});
return tasks_to_create;
}
future<> view_building_worker::run_view_building_state_observer() {
auto abort = _as.subscribe([this] () noexcept {
_vb_state_machine.event.broadcast();
});
while (!_as.abort_requested()) {
bool sleep = false;
try {
vbw_logger.trace("view_building_state_observer() iteration");
auto read_apply_mutex_holder = co_await _group0.client().hold_read_apply_mutex(_as);
co_await update_built_views();
co_await check_for_aborted_tasks();
_as.check();
read_apply_mutex_holder.return_all();
co_await _vb_state_machine.event.wait();
} catch (abort_requested_exception&) {
} catch (broken_condition_variable&) {
} catch (...) {
vbw_logger.warn("view_building_state_observer failed with: {}", std::current_exception());
sleep = true;
}
if (sleep && !_as.abort_requested()) {
try {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
vbw_logger.warn("view_building_state_observer sleep failed: {}", std::current_exception());
}
}
}
}
// Compares tablet-based views entries in `system.view_build_status_v2`(group0 table)
// with data in `system.built_views`(local table), and updates the second table accordingly.
// When we observe that a view is built, we also remove entries in `system.scylla_views_builds_in_progress`.
future<> view_building_worker::update_built_views() {
auto id_to_name = [&] (table_id table_id) {
auto schema = _db.find_schema(table_id);
return std::make_pair(schema->ks_name(), schema->cf_name());
};
std::set<std::pair<sstring, sstring>> built_views;
for (auto& [id, statuses]: _vb_state_machine.views_state.status_map) {
if (std::ranges::all_of(statuses, [] (const auto& e) { return e.second == build_status::SUCCESS; })) {
built_views.insert(id_to_name(id));
}
}
auto local_built = co_await _sys_ks.load_built_views() | std::views::filter([&] (auto& v) {
return !_db.has_keyspace(v.first) || _db.find_keyspace(v.first).uses_tablets();
}) | std::ranges::to<std::set>();
// Remove dead entries
for (auto& view: local_built) {
if (!built_views.contains(view)) {
co_await _sys_ks.remove_built_view(view.first, view.second);
}
}
// Add new entries
for (auto& view: built_views) {
if (!local_built.contains(view)) {
co_await _sys_ks.mark_view_as_built(view.first, view.second);
co_await _sys_ks.remove_view_build_progress_across_all_shards(view.first, view.second);
}
}
}
// Must be executed on shard0
future<> view_building_worker::check_for_aborted_tasks() {
return container().invoke_on_all([building_state = _vb_state_machine.building_state] (view_building_worker& vbw) -> future<> {
auto lock = co_await get_units(vbw._state._mutex, 1, vbw._as);
co_await vbw._state.update_processing_base_table(vbw._db, building_state, vbw._as);
if (!vbw._state._batch) {
co_return;
}
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
auto it = vbw._state._batch->tasks.begin();
while (it != vbw._state._batch->tasks.end()) {
auto id = it->first;
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
++it; // Advance the iterator before potentially removing the entry from the map.
if (!task_opt || task_opt->get().aborted) {
co_await vbw._state._batch->abort_task(id);
}
}
if (vbw._state._batch->tasks.empty()) {
co_await vbw._state.clean_up_after_batch();
}
});
}
void view_building_worker::init_messaging_service() {
ser::view_rpc_verbs::register_work_on_view_building_tasks(&_messaging, [this] (raft::term_t term, shard_id shard, std::vector<utils::UUID> ids) -> future<std::vector<utils::UUID>> {
return container().invoke_on(shard, [term, ids = std::move(ids)] (auto& vbw) mutable -> future<std::vector<utils::UUID>> {
return vbw.work_on_tasks(term, std::move(ids));
});
});
}
future<> view_building_worker::uninit_messaging_service() {
return ser::view_rpc_verbs::unregister(&_messaging);
}
static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db, table_id table_id) {
return db.find_column_family(table_id).views() | std::views::transform([] (view_ptr vptr) {
return vptr->id();
}) | std::ranges::to<std::unordered_set>();;
}
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
// clear the state, save and flush new base table
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
if (processing_base_table != building_state.currently_processed_base_table) {
co_await clear();
if (building_state.currently_processed_base_table) {
co_await flush_base_table(db, *building_state.currently_processed_base_table, as);
}
processing_base_table = building_state.currently_processed_base_table;
}
}
// If `_batch` ptr points to valid object, co_await its `work` future, save completed tasks and delete the object
future<> view_building_worker::state::clean_up_after_batch() {
if (_batch) {
co_await std::move(_batch->work);
for (auto& [id, _]: _batch->tasks) {
completed_tasks.insert(id);
}
_batch = nullptr;
}
}
// Flush base table, set is as currently processing base table and save which views exist at the time of flush
future<> view_building_worker::state::flush_base_table(replica::database& db, table_id base_table_id, abort_source& as) {
auto cf = db.find_column_family(base_table_id).shared_from_this();
co_await when_all(cf->await_pending_writes(), cf->await_pending_streams());
co_await flush_base(cf, as);
processing_base_table = base_table_id;
flushed_views = get_ids_of_all_views(db, base_table_id);
}
future<> view_building_worker::state::clear() {
if (_batch) {
_batch->as.request_abort();
co_await std::move(_batch->work);
_batch = nullptr;
}
processing_base_table.reset();
completed_tasks.clear();
flushed_views.clear();
}
view_building_worker::batch::batch(sharded<view_building_worker>& vbw, std::unordered_map<utils::UUID, view_building_task> tasks, table_id base_id, locator::tablet_replica replica)
: base_id(base_id)
, replica(replica)
, tasks(std::move(tasks))
, _vbw(vbw) {}
void view_building_worker::batch::start() {
if (this_shard_id() != replica.shard) {
on_internal_error(vbw_logger, "view_building_worker::batch should be started on replica shard");
}
work = do_work().finally([this] {
promise.set_value();
});
}
future<> view_building_worker::batch::abort_task(utils::UUID id) {
tasks.erase(id);
if (tasks.empty()) {
co_await abort();
}
}
future<> view_building_worker::batch::abort() {
co_await smp::submit_to(replica.shard, [this] () {
as.request_abort();
});
}
future<> view_building_worker::batch::do_work() {
if (this_shard_id() != replica.shard) {
on_internal_error(vbw_logger, fmt::format("view_building_worker::batch::do_work() should be executed on tasks shard "));
}
// At this point we assume all tasks are validated to be executed in the same batch
vbw_logger.debug("Starting view building batch for tasks {}. Task type {}", tasks | std::views::keys, tasks.begin()->second.type);
std::exception_ptr eptr;
exponential_backoff_retry r(1s, 5min);
while (!as.abort_requested() && !tasks.empty()) {
if (eptr) {
try {
co_await r.retry(as);
} catch (const sleep_aborted&) {
break;
}
eptr = nullptr;
}
auto task = tasks.begin()->second;
auto type = task.type;
auto base_id = task.base_id;
auto last_token = task.last_token;
auto maybe_views_ids = tasks | std::views::values | std::views::transform(&view_building_task::view_id) | std::ranges::to<std::vector>();
try {
std::vector<table_id> views_ids;
switch (type) {
case view_building_task::task_type::build_range:
views_ids = maybe_views_ids | std::views::transform([] (const auto& i) { return *i; }) | std::ranges::to<std::vector>();
co_await _vbw.local().do_build_range(base_id, views_ids, last_token, as);
break;
case view_building_task::task_type::process_staging:
co_await _vbw.local().do_process_staging(base_id, last_token);
break;
}
} catch (seastar::abort_requested_exception&) {
vbw_logger.debug("Batch aborted");
} catch (...) {
eptr = std::current_exception();
}
if (eptr) {
vbw_logger.warn("Batch with tasks {} failed with error: {}", tasks | std::views::keys, eptr);
} else {
vbw_logger.debug("Batch with tasks {} finished", tasks | std::views::keys);
break;
}
}
}
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
utils::get_local_injector().inject("do_build_range_fail",
[] { throw std::runtime_error("do_build_range failed due to error injection"); });
return seastar::async([this, base_id, views_ids = std::move(views_ids), last_token, &as] {
gc_clock::time_point now = gc_clock::now();
auto base_cf = _db.find_column_family(base_id).shared_from_this();
reader_permit permit = _db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "build_views_range", db::no_timeout, {});
auto slice = make_partition_slice(*base_cf->schema());
auto range = get_tablet_token_range(base_id, last_token);
auto prange = dht::to_partition_range(range);
auto reader = base_cf->get_sstable_set().make_local_shard_sstable_reader(
base_cf->schema(),
permit,
prange,
slice,
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
auto compaction_state = make_lw_shared<compact_for_query_state>(
*reader.schema(),
now,
slice,
query::max_rows,
query::max_partitions,
base_cf->get_compaction_manager().get_tombstone_gc_state());
auto consumer = compact_for_query<view_building_worker::consumer>(compaction_state, view_building_worker::consumer(
_db,
views_ids,
base_cf,
reader,
permit,
_vug.shared_from_this(),
now,
as));
as.check();
for (auto& vid: views_ids) {
if (!_views_in_progress.contains(vid)) {
auto view = _db.find_schema(vid);
_sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), dht::minimum_token()).get();
_views_in_progress.insert(vid);
}
}
as.check();
std::exception_ptr eptr;
try {
utils::get_local_injector().inject("view_building_worker_pause_build_range_task", [&] (auto& handler) -> future<> {
bool should_wait = true;
auto maybe_raw_token = handler.template get<int64_t>("token");
if (maybe_raw_token) {
// Wait only if this range contains given token
should_wait = range.contains(dht::token(*maybe_raw_token), std::compare_three_way{});
}
if (should_wait) {
vbw_logger.info("do_build_range: paused, waiting for message");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes(5), &as);
}
}).get();
utils::get_local_injector().inject("view_building_worker_pause_before_consume", 5min, as).get();
vbw_logger.info("Starting range {} building for base table: {}.{}", range, base_cf->schema()->ks_name(), base_cf->schema()->cf_name());
auto end_token = reader.consume_in_thread(std::move(consumer));
vbw_logger.info("Built range {} for base table: {}.{}", dht::token_range(range.start(), end_token), base_cf->schema()->ks_name(), base_cf->schema()->cf_name());
} catch (seastar::abort_requested_exception&) {
eptr = std::current_exception();
vbw_logger.info("Building range {} for base table {} and views {} was aborted.", range, base_id, views_ids);
} catch (...) {
eptr = std::current_exception();
vbw_logger.warn("Error during processing range {} for base table {} and views {}: ", range, base_id, views_ids, eptr);
}
reader.close().get();
if (eptr) {
std::rethrow_exception(eptr);
}
});
}
future<> view_building_worker::do_process_staging(table_id table_id, dht::token last_token) {
if (_staging_sstables[table_id].empty()) {
co_return;
}
auto table = _db.get_tables_metadata().get_table(table_id).shared_from_this();
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
auto tid = tablet_map.get_tablet_id(last_token);
auto tablet_range = tablet_map.get_token_range(tid);
// Select sstables belonging to the tablet (identified by `last_token`)
std::vector<sstables::shared_sstable> sstables_to_process;
for (auto& sst: _staging_sstables[table_id]) {
auto sst_last_token = sst->get_last_decorated_key().token();
if (tablet_range.contains(sst_last_token, dht::token_comparator())) {
sstables_to_process.push_back(sst);
}
}
co_await _vug.process_staging_sstables(std::move(table), sstables_to_process);
try {
// Remove processed sstables from `_staging_sstables` map
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
std::unordered_set<sstables::shared_sstable> sstables_to_remove(sstables_to_process.begin(), sstables_to_process.end());
auto [first, last] = std::ranges::remove_if(_staging_sstables[table_id], [&] (auto& sst) {
return sstables_to_remove.contains(sst);
});
_staging_sstables[table_id].erase(first, last);
} catch (semaphore_aborted&) {
vbw_logger.warn("Semaphore was aborted while waiting to removed processed sstables for table {}", table_id);
}
}
void view_building_worker::load_sstables(table_id table_id, std::vector<sstables::shared_sstable> ssts) {
std::ranges::copy_if(std::move(ssts), std::back_inserter(_staging_sstables[table_id]), [] (auto& sst) {
return sst->state() == sstables::sstable_state::staging;
});
}
void view_building_worker::cleanup_staging_sstables(locator::effective_replication_map_ptr erm, table_id table_id, locator::tablet_id tid) {
auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(table_id);
auto tablet_range = tablet_map.get_token_range(tid);
auto [first, last] = std::ranges::remove_if(_staging_sstables[table_id], [&] (auto& sst) {
auto sst_last_token = sst->get_last_decorated_key().token();
return tablet_range.contains(sst_last_token, dht::token_comparator());
});
_staging_sstables[table_id].erase(first, last);
}
future<view_building_state> view_building_worker::get_latest_view_building_state(raft::term_t term) {
return smp::submit_to(0, [&sharded_vbw = container(), term] () -> future<view_building_state> {
auto& vbw = sharded_vbw.local();
// auto guard = vbw._group0.client().start_operation(vbw._as);
auto& raft_server = vbw._group0.group0_server();
auto group0_holder = vbw._group0.hold_group0_gate();
co_await raft_server.read_barrier(&vbw._as);
if (raft_server.get_current_term() != term) {
throw std::runtime_error(fmt::format("Invalid raft term. Got {} but current term is {}", term, raft_server.get_current_term()));
}
co_return vbw._vb_state_machine.building_state;
});
}
future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_t term, std::vector<utils::UUID> ids) {
auto collect_completed_tasks = [&] {
std::vector<utils::UUID> completed;
for (auto& id: ids) {
if (_state.completed_tasks.contains(id)) {
completed.push_back(id);
}
}
return completed;
};
auto lock = co_await get_units(_state._mutex, 1, _as);
// Firstly check if there is any batch that is finished but wasn't cleaned up.
if (_state._batch && _state._batch->promise.available()) {
co_await _state.clean_up_after_batch();
}
// Check if tasks were already completed.
// If only part of the tasks were finished, return the subset and don't execute the remaining tasks.
std::vector<utils::UUID> completed = collect_completed_tasks();
if (!completed.empty()) {
co_return completed;
}
lock.return_all();
auto building_state = co_await get_latest_view_building_state(term);
lock = co_await get_units(_state._mutex, 1, _as);
co_await _state.update_processing_base_table(_db, building_state, _as);
// If there is no running batch, create it.
if (!_state._batch) {
if (!_state.processing_base_table) {
throw std::runtime_error("view_building_worker::state::processing_base_table needs to be set to work on view building");
}
auto my_host_id = _db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
std::unordered_map<utils::UUID, view_building_task> tasks;
for (auto& id: ids) {
auto task_opt = building_state.get_task(*_state.processing_base_table, my_replica, id);
if (!task_opt) {
throw std::runtime_error(fmt::format("Task {} was not found for base table {} on replica {}", id, *building_state.currently_processed_base_table, my_replica));
}
tasks.insert({id, *task_opt});
}
#ifdef SEASTAR_DEBUG
{
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
}
#endif
// If any view was added after we did the initial flush, we need to do it again
if (std::ranges::any_of(tasks | std::views::values, [&] (const view_building_task& t) {
return t.view_id && !_state.flushed_views.contains(*t.view_id);
})) {
co_await _state.flush_base_table(_db, *_state.processing_base_table, _as);
}
// Create and start the batch
_state._batch = std::make_unique<batch>(container(), std::move(tasks), *building_state.currently_processed_base_table, my_replica);
_state._batch->start();
}
if (std::ranges::all_of(ids, [&] (auto& id) { return !_state._batch->tasks.contains(id); })) {
throw std::runtime_error(fmt::format(
"None of the tasks requested to work on is executed in current view building batch. Batch executes: {}, the RPC requested: {}",
_state._batch->tasks | std::views::keys, ids));
}
auto batch_future = _state._batch->promise.get_shared_future();
lock.return_all();
co_await std::move(batch_future);
lock = co_await get_units(_state._mutex, 1, _as);
co_await _state.clean_up_after_batch();
co_return collect_completed_tasks();
}
}
}