Previously the test test_interrupt_view_build_shard_registration stopped the node ungracefully and used commitlog periodic mode to persist the view build progress in a not very reliable way. It can happen that due to timing issues, the view build progress is not persisted, or some of it is persisted in a different ordering than expected. To make the test more reliable we change it to stop the node gracefully, so the commitlog is persisted in a graceful and consistent way, without using the periodic mode delay. We need to also change the injection for the shutdown to not get stuck. Fixes SCYLLADB-1005 Closes scylladb/scylladb#29008
3718 lines
181 KiB
C++
3718 lines
181 KiB
C++
/*
|
|
* Copyright (C) 2017-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include <chrono>
|
|
#include <deque>
|
|
#include <exception>
|
|
#include <functional>
|
|
#include <optional>
|
|
#include <ranges>
|
|
#include <stdexcept>
|
|
#include <unordered_set>
|
|
#include <vector>
|
|
#include <algorithm>
|
|
|
|
#include <fmt/ranges.h>
|
|
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/all.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <flat_map>
|
|
|
|
#include "db/config.hh"
|
|
#include "db/view/base_info.hh"
|
|
#include "db/view/view_build_status.hh"
|
|
#include "db/view/view_consumer.hh"
|
|
#include "mutation/canonical_mutation.hh"
|
|
#include "replica/database.hh"
|
|
#include "keys/clustering_bounds_comparator.hh"
|
|
#include "cql3/statements/select_statement.hh"
|
|
#include "cql3/util.hh"
|
|
#include "cql3/restrictions/statement_restrictions.hh"
|
|
#include "cql3/expr/expr-utils.hh"
|
|
#include "cql3/untyped_result_set.hh"
|
|
#include "cql3/expr/evaluate.hh"
|
|
#include "db/view/view.hh"
|
|
#include "db/view/view_builder.hh"
|
|
#include "db/view/view_updating_consumer.hh"
|
|
#include "db/view/view_update_generator.hh"
|
|
#include "db/view/regular_column_transformation.hh"
|
|
#include "db/system_keyspace_view_types.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
#include "db/tags/utils.hh"
|
|
#include "db/tags/extension.hh"
|
|
#include "dht/sharder.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "keys/keys.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/network_topology_strategy.hh"
|
|
#include "mutation/mutation.hh"
|
|
#include "mutation/mutation_partition.hh"
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include "service/migration_manager.hh"
|
|
#include "service/raft/raft_group0_client.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/small_vector.hh"
|
|
#include "view_builder.hh"
|
|
#include "view_info.hh"
|
|
#include "view_update_checks.hh"
|
|
#include "types/list.hh"
|
|
#include "types/map.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/exponential_backoff_retry.hh"
|
|
#include "utils/labels.hh"
|
|
#include "query/query-result-writer.hh"
|
|
#include "readers/from_fragments.hh"
|
|
#include "readers/evictable.hh"
|
|
#include "readers/multishard.hh"
|
|
#include "readers/filtering.hh"
|
|
#include "delete_ghost_rows_visitor.hh"
|
|
#include "locator/host_id.hh"
|
|
#include "cartesian_product.hh"
|
|
#include "idl/view.dist.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
static logging::logger vlogger("view");
|
|
|
|
static inline void inject_failure(std::string_view operation) {
|
|
utils::get_local_injector().inject(operation,
|
|
[operation] { throw std::runtime_error(std::string(operation)); });
|
|
}
|
|
|
|
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info, schema_ptr base_schema)
|
|
: _schema(schema)
|
|
, _raw(raw_view_info)
|
|
, _base_info(make_base_dependent_view_info(*base_schema))
|
|
{ }
|
|
|
|
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info, db::view::base_dependent_view_info base_info)
|
|
: _schema(schema)
|
|
, _raw(raw_view_info)
|
|
, _base_info(std::move(base_info))
|
|
{ }
|
|
|
|
cql3::statements::select_statement& view_info::select_statement(data_dictionary::database db) const {
|
|
if (!_select_statement) {
|
|
std::unique_ptr<cql3::statements::raw::select_statement> raw;
|
|
// FIXME(sarna): legacy code, should be removed after "computed_columns" feature is guaranteed
|
|
// to be available on every node. Then, we won't need to check if this view is backing a secondary index.
|
|
const column_definition* legacy_token_column = nullptr;
|
|
if (db.find_column_family(base_id()).get_index_manager().is_global_index(_schema)) {
|
|
if (!_schema.clustering_key_columns().empty()) {
|
|
legacy_token_column = &_schema.clustering_key_columns().front();
|
|
}
|
|
}
|
|
|
|
if (legacy_token_column || std::ranges::any_of(_schema.all_columns(), std::mem_fn(&column_definition::is_computed))) {
|
|
auto real_columns = _schema.all_columns() | std::views::filter([legacy_token_column] (const column_definition& cdef) {
|
|
return &cdef != legacy_token_column && !cdef.is_computed();
|
|
});
|
|
schema::columns_type columns = std::ranges::to<schema::columns_type>(std::move(real_columns));
|
|
raw = cql3::util::build_select_statement(base_name(), where_clause(), include_all_columns(), columns);
|
|
} else {
|
|
raw = cql3::util::build_select_statement(base_name(), where_clause(), include_all_columns(), _schema.all_columns());
|
|
}
|
|
raw->prepare_keyspace(_schema.ks_name());
|
|
raw->set_bound_variables({});
|
|
cql3::cql_stats ignored;
|
|
auto prepared = raw->prepare(db, ignored, true);
|
|
_select_statement = static_pointer_cast<cql3::statements::select_statement>(prepared->statement);
|
|
}
|
|
return *_select_statement;
|
|
}
|
|
|
|
const query::partition_slice& view_info::partition_slice(data_dictionary::database db) const {
|
|
if (!_partition_slice) {
|
|
_partition_slice = select_statement(db).make_partition_slice(cql3::query_options({ }));
|
|
}
|
|
return *_partition_slice;
|
|
}
|
|
|
|
const column_definition* view_info::view_column(const schema& base, column_kind kind, column_id base_id) const {
|
|
// FIXME: Map base column_ids to view_column_ids, which can be something like
|
|
// a boost::small_vector where the position is the base column_id, and the
|
|
// value is either empty or the view's column_id.
|
|
return view_column(base.column_at(kind, base_id));
|
|
}
|
|
|
|
const column_definition* view_info::view_column(const column_definition& base_def) const {
|
|
return _schema.get_column_definition(base_def.name());
|
|
}
|
|
|
|
void view_info::reset_view_info() {
|
|
// Forget the cached objects which may refer to the base schema.
|
|
_select_statement = nullptr;
|
|
_partition_slice = std::nullopt;
|
|
}
|
|
|
|
// A constructor for a base info that can facilitate reads and writes from the materialized view.
|
|
db::view::base_dependent_view_info::base_dependent_view_info(bool has_computed_column_depending_on_base_non_primary_key,
|
|
bool is_partition_key_permutation_of_base_partition_key,
|
|
bool has_base_non_pk_columns_in_view_pk)
|
|
: has_computed_column_depending_on_base_non_primary_key{has_computed_column_depending_on_base_non_primary_key}
|
|
, is_partition_key_permutation_of_base_partition_key{is_partition_key_permutation_of_base_partition_key}
|
|
, has_base_non_pk_columns_in_view_pk{has_base_non_pk_columns_in_view_pk}
|
|
{ }
|
|
|
|
db::view::base_dependent_view_info view_info::make_base_dependent_view_info(const schema& base) const {
|
|
bool is_partition_key_permutation_of_base_partition_key =
|
|
std::ranges::all_of(_schema.partition_key_columns(), [&base] (const column_definition& view_col) {
|
|
const column_definition* base_col = base.get_column_definition(view_col.name());
|
|
return base_col && base_col->is_partition_key();
|
|
})
|
|
&& _schema.partition_key_size() == base.partition_key_size();
|
|
|
|
bool has_computed_column_depending_on_base_non_primary_key = false;
|
|
bool has_base_non_pk_columns_in_view_pk = false;
|
|
for (auto&& view_col : _schema.primary_key_columns()) {
|
|
if (view_col.is_computed()) {
|
|
// we are not going to find it in the base table...
|
|
if (view_col.get_computation().depends_on_non_primary_key_column()) {
|
|
has_computed_column_depending_on_base_non_primary_key = true;
|
|
}
|
|
continue;
|
|
}
|
|
const bytes& view_col_name = view_col.name();
|
|
auto* base_col = base.get_column_definition(view_col_name);
|
|
if (base_col && base_col->is_regular()) {
|
|
has_base_non_pk_columns_in_view_pk = true;
|
|
} else if (base_col && base_col->is_static()) {
|
|
has_base_non_pk_columns_in_view_pk = true;
|
|
} else if (!base_col) {
|
|
has_base_non_pk_columns_in_view_pk = true;
|
|
}
|
|
}
|
|
return db::view::base_dependent_view_info(has_computed_column_depending_on_base_non_primary_key,
|
|
is_partition_key_permutation_of_base_partition_key, has_base_non_pk_columns_in_view_pk);
|
|
}
|
|
|
|
bool view_info::has_base_non_pk_columns_in_view_pk() const {
|
|
return _base_info.has_base_non_pk_columns_in_view_pk;
|
|
}
|
|
|
|
bool view_info::has_computed_column_depending_on_base_non_primary_key() const {
|
|
return _base_info.has_computed_column_depending_on_base_non_primary_key;
|
|
}
|
|
|
|
bool view_info::is_partition_key_permutation_of_base_partition_key() const {
|
|
return _base_info.is_partition_key_permutation_of_base_partition_key;
|
|
}
|
|
|
|
clustering_row db::view::clustering_or_static_row::as_clustering_row(const schema& s) const {
|
|
if (!is_clustering_row()) {
|
|
on_internal_error(vlogger, "Tried to interpret a static row as a clustering row");
|
|
}
|
|
return clustering_row(*_key, tomb(), marker(), row(s, column_kind::regular_column, cells()));
|
|
}
|
|
|
|
static_row db::view::clustering_or_static_row::as_static_row(const schema& s) const {
|
|
if (!is_static_row()) {
|
|
on_internal_error(vlogger, "Tried to interpret a clustering row as a static row");
|
|
}
|
|
return static_row(s, cells());
|
|
}
|
|
|
|
namespace db {
|
|
|
|
namespace view {
|
|
stats::stats(const sstring& category, label_instance ks_label, label_instance cf_label) :
|
|
service::storage_proxy_stats::write_stats(category, false),
|
|
_ks_label(ks_label),
|
|
_cf_label(cf_label) {
|
|
}
|
|
|
|
void stats::register_stats() {
|
|
namespace ms = seastar::metrics;
|
|
namespace sp_stats = service::storage_proxy_stats;
|
|
_metrics.add_group("column_family", {
|
|
ms::make_total_operations("view_updates_pushed_remote", view_updates_pushed_remote, ms::description("Number of updates (mutations) pushed to remote view replicas"),
|
|
{_cf_label, _ks_label}),
|
|
ms::make_total_operations("view_updates_failed_remote", view_updates_failed_remote, ms::description("Number of updates (mutations) that failed to be pushed to remote view replicas"),
|
|
{_cf_label, _ks_label}),
|
|
ms::make_total_operations("view_updates_pushed_local", view_updates_pushed_local, ms::description("Number of updates (mutations) pushed to local view replicas"),
|
|
{_cf_label, _ks_label}),
|
|
ms::make_total_operations("view_updates_failed_local", view_updates_failed_local, ms::description("Number of updates (mutations) that failed to be pushed to local view replicas"),
|
|
{_cf_label, _ks_label}),
|
|
ms::make_gauge("view_updates_pending", ms::description("Number of updates pushed to view and are still to be completed"),
|
|
{_cf_label, _ks_label}, writes),
|
|
});
|
|
}
|
|
|
|
bool partition_key_matches(data_dictionary::database db, const schema& base, const view_info& view, const dht::decorated_key& key) {
|
|
const cql3::expr::expression& pk_restrictions = view.select_statement(db).get_restrictions()->get_partition_key_restrictions();
|
|
std::vector<bytes> exploded_pk = key.key().explode();
|
|
std::vector<bytes> exploded_ck;
|
|
std::vector<const column_definition*> pk_columns;
|
|
pk_columns.reserve(base.partition_key_size());
|
|
for (const column_definition& column : base.partition_key_columns()) {
|
|
pk_columns.push_back(&column);
|
|
}
|
|
auto selection = cql3::selection::selection::for_columns(base.shared_from_this(), pk_columns);
|
|
uint64_t zero = 0;
|
|
auto dummy_row = query::result_row_view(ser::qr_row_view{simple_memory_input_stream(reinterpret_cast<const char*>(&zero), 8)});
|
|
auto dummy_options = cql3::query_options({ });
|
|
// FIXME: pass nullptrs for some of these dummies
|
|
return cql3::expr::is_satisfied_by(
|
|
pk_restrictions,
|
|
cql3::expr::evaluation_inputs{
|
|
.partition_key = exploded_pk,
|
|
.clustering_key = exploded_ck,
|
|
.static_and_regular_columns = {}, // partition key filtering only
|
|
.selection = selection.get(),
|
|
.options = &dummy_options,
|
|
});
|
|
}
|
|
|
|
bool clustering_prefix_matches(data_dictionary::database db, const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck) {
|
|
const cql3::expr::expression& r = view.select_statement(db).get_restrictions()->get_clustering_columns_restrictions();
|
|
std::vector<bytes> exploded_pk = key.explode();
|
|
std::vector<bytes> exploded_ck = ck.explode();
|
|
std::vector<const column_definition*> ck_columns;
|
|
ck_columns.reserve(base.clustering_key_size());
|
|
for (const column_definition& column : base.clustering_key_columns()) {
|
|
ck_columns.push_back(&column);
|
|
}
|
|
auto selection = cql3::selection::selection::for_columns(base.shared_from_this(), ck_columns);
|
|
uint64_t zero = 0;
|
|
auto dummy_options = cql3::query_options({ });
|
|
// FIXME: pass nullptrs for some of these dummies
|
|
return cql3::expr::is_satisfied_by(
|
|
r,
|
|
cql3::expr::evaluation_inputs{
|
|
.partition_key = exploded_pk,
|
|
.clustering_key = exploded_ck,
|
|
.static_and_regular_columns = {}, // clustering key only filtering here
|
|
.selection = selection.get(),
|
|
.options = &dummy_options,
|
|
});
|
|
}
|
|
|
|
bool may_be_affected_by(data_dictionary::database db, const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update) {
|
|
// We can guarantee that the view won't be affected if:
|
|
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
|
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
|
// pre-existing entry) - note that the upper layers should already have checked the partition key;
|
|
return clustering_prefix_matches(db, base, view, key.key(), update.key());
|
|
}
|
|
|
|
static bool update_requires_read_before_write(data_dictionary::database db, const schema& base,
|
|
const std::vector<view_ptr>& views,
|
|
const dht::decorated_key& key,
|
|
const rows_entry& update) {
|
|
for (auto&& v : views) {
|
|
view_info& vf = *v->view_info();
|
|
if (may_be_affected_by(db, base, vf, key, update)) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Checks if the result matches the provided view filter.
|
|
// It's currently assumed that the result consists of just a single row.
|
|
class view_filter_checking_visitor {
|
|
data_dictionary::database _db;
|
|
const schema& _base;
|
|
const view_info& _view;
|
|
::shared_ptr<cql3::selection::selection> _selection;
|
|
std::vector<bytes> _pk;
|
|
|
|
bool _matches_view_filter = true;
|
|
public:
|
|
view_filter_checking_visitor(data_dictionary::database db, const schema& base, const view_info& view)
|
|
: _db(std::move(db))
|
|
, _base(base)
|
|
, _view(view)
|
|
, _selection(cql3::selection::selection::for_columns(_base.shared_from_this(),
|
|
_base.regular_columns() | std::views::transform([] (const column_definition& cdef) { return &cdef; })
|
|
| std::ranges::to<std::vector<const column_definition*>>()
|
|
)
|
|
)
|
|
{}
|
|
|
|
void accept_new_partition(const partition_key& key, uint64_t row_count) {
|
|
_pk = key.explode();
|
|
}
|
|
void accept_new_partition(uint64_t row_count) {
|
|
throw std::logic_error("view_filter_checking_visitor expects an explicit partition key");
|
|
}
|
|
|
|
void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) {
|
|
_matches_view_filter = _matches_view_filter && check_if_matches(key, static_row, row);
|
|
}
|
|
|
|
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
|
throw std::logic_error("view_filter_checking_visitor expects an explicit clustering key");
|
|
}
|
|
void accept_partition_end(const query::result_row_view& static_row) {}
|
|
|
|
bool check_if_matches(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) const {
|
|
std::vector<bytes> ck = key.explode();
|
|
return std::ranges::all_of(
|
|
_view.select_statement(_db).get_restrictions()->get_non_pk_restriction() | std::views::values,
|
|
[&] (auto&& r) {
|
|
// FIXME: move outside all_of(). However, crashes.
|
|
auto static_and_regular_columns = cql3::expr::get_non_pk_values(*_selection, static_row, &row);
|
|
// FIXME: pass dummy_options as nullptr
|
|
auto dummy_options = cql3::query_options({});
|
|
return cql3::expr::is_satisfied_by(
|
|
r,
|
|
cql3::expr::evaluation_inputs{
|
|
.partition_key = _pk,
|
|
.clustering_key = ck,
|
|
.static_and_regular_columns = static_and_regular_columns,
|
|
.selection = _selection.get(),
|
|
.options = &dummy_options,
|
|
});
|
|
}
|
|
);
|
|
}
|
|
|
|
bool matches_view_filter() const {
|
|
return _matches_view_filter;
|
|
}
|
|
};
|
|
|
|
class data_query_result_builder {
|
|
public:
|
|
using result_type = query::result;
|
|
|
|
private:
|
|
query::result::builder _res_builder;
|
|
query_result_builder _builder;
|
|
|
|
public:
|
|
data_query_result_builder(const schema& s, const query::partition_slice& slice)
|
|
: _res_builder(slice, query::result_options::only_result(), query::result_memory_accounter{query::result_memory_limiter::unlimited_result_size}, query::max_tombstones)
|
|
, _builder(s, _res_builder) { }
|
|
|
|
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
|
|
void consume(tombstone t) { _builder.consume(t); }
|
|
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
|
|
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
|
|
stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); }
|
|
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
|
|
result_type consume_end_of_stream() {
|
|
_builder.consume_end_of_stream();
|
|
return _res_builder.build();
|
|
}
|
|
};
|
|
|
|
bool matches_view_filter(data_dictionary::database db, const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
|
// TODO: Filtering is only supported in materialized views which don't support
|
|
// static rows yet. Skip the whole function if it is a static row update.
|
|
if (update.is_static_row()) {
|
|
return true;
|
|
}
|
|
|
|
auto slice = make_partition_slice(base);
|
|
|
|
data_query_result_builder builder(base, slice);
|
|
builder.consume_new_partition(dht::decorate_key(base, key));
|
|
builder.consume(clustering_row(base, update.as_clustering_row(base)), row_tombstone{}, update.is_live(base, tombstone(), now));
|
|
builder.consume_end_of_partition();
|
|
auto result = builder.consume_end_of_stream();
|
|
view_filter_checking_visitor visitor(db, base, view);
|
|
query::result_view::consume(result, slice, visitor);
|
|
|
|
return clustering_prefix_matches(db, base, view, key, *update.key())
|
|
&& visitor.matches_view_filter();
|
|
}
|
|
|
|
view_updates::view_updates(view_ptr v, schema_ptr base)
|
|
: _view(std::move(v))
|
|
, _view_info(*_view->view_info())
|
|
, _base(std::move(base))
|
|
, _base_info(_view_info.base_info())
|
|
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view))
|
|
{
|
|
for (auto&& view_col : _view->primary_key_columns()) {
|
|
if (view_col.is_computed()) {
|
|
continue;
|
|
}
|
|
const bytes& view_col_name = view_col.name();
|
|
auto* base_col = _base->get_column_definition(view_col_name);
|
|
if (base_col && base_col->is_regular()) {
|
|
_base_regular_columns_in_view_pk.push_back(base_col->id);
|
|
} else if (base_col && base_col->is_static()) {
|
|
_base_static_columns_in_view_pk.push_back(base_col->id);
|
|
} else if (!base_col) {
|
|
on_internal_error(vlogger, format("Column {} in view {}.{} was not found in the base table {}.{}",
|
|
view_col_name, _view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name()));
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> view_updates::move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) {
|
|
mutations.reserve(mutations.size() + _updates.size());
|
|
for (auto it = _updates.begin(); it != _updates.end(); it = _updates.erase(it)) {
|
|
auto&& m = std::move(*it);
|
|
auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second));
|
|
mutations.emplace_back(frozen_mutation_and_schema{freeze(mut), _view});
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
_op_count = 0;
|
|
}
|
|
|
|
mutation_partition& view_updates::partition_for(partition_key&& key) {
|
|
auto it = _updates.find(key);
|
|
if (it != _updates.end()) {
|
|
return it->second;
|
|
}
|
|
return _updates.emplace(std::move(key), mutation_partition(*_view)).first->second;
|
|
}
|
|
|
|
size_t view_updates::op_count() const {
|
|
return _op_count;
|
|
}
|
|
|
|
namespace {
|
|
// The following struct is identical to view_key_with_action, except the key
|
|
// is stored as a managed_bytes_view instead of bytes.
|
|
struct view_managed_key_view_and_action {
|
|
managed_bytes_view _key_view;
|
|
view_key_and_action::action _action;
|
|
view_managed_key_view_and_action(managed_bytes_view key_view, view_key_and_action::action action)
|
|
: _key_view(key_view)
|
|
, _action(action)
|
|
{}
|
|
view_managed_key_view_and_action(managed_bytes_view key_view)
|
|
: _key_view(key_view)
|
|
{}
|
|
view_managed_key_view_and_action(view_key_and_action&& bwa, std::deque<bytes>& linearized_values)
|
|
: view_managed_key_view_and_action(managed_bytes_view(linearized_values.emplace_back(std::move(bwa._key_bytes))), bwa._action)
|
|
{}
|
|
static managed_bytes_view get_key_view(const view_managed_key_view_and_action& bvwa) {
|
|
return bvwa._key_view;
|
|
}
|
|
};
|
|
|
|
// value_getter is used to extract values for specific columns during view update.
|
|
struct value_getter {
|
|
// linearized_values hold bytes for values of computed columns, for which we later store references in the form of managed_bytes_view.
|
|
// deque doesn't invalidate references at emplace_back.
|
|
std::deque<bytes> linearized_values;
|
|
|
|
// Index of column being currently processed.
|
|
size_t column_position = 0;
|
|
// Discovered index of collection computed column.
|
|
std::optional<size_t> collection_column_position;
|
|
private:
|
|
// Schemas of base table and view.
|
|
const schema& _base;
|
|
|
|
const partition_key& _base_key;
|
|
const clustering_or_static_row& _update;
|
|
const std::optional<clustering_or_static_row>& _existing;
|
|
|
|
public:
|
|
value_getter(const schema& base, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing)
|
|
: _base(base)
|
|
, _base_key(base_key)
|
|
, _update(update)
|
|
, _existing(existing)
|
|
{
|
|
}
|
|
|
|
using vector_type = utils::small_vector<view_managed_key_view_and_action, 1>;
|
|
vector_type operator()(const column_definition& cdef) {
|
|
column_position++;
|
|
|
|
// Note that in the following lines, if the key column exists in the
|
|
// base table, then it will be used and the requested computed column
|
|
// will be outright ignored.
|
|
// I'm not sure why we chose this surprising logic, but it turns out
|
|
// to be useful in Alternator when combining an LSI (which puts its
|
|
// key attribute a real base column) and GSI (which uses a computed
|
|
// column) - and this logic means the GSI will read the real column
|
|
// stored by the LSI, which turns out to be the right thing to do
|
|
// (see the test test_gsi.py::test_gsi_and_lsi_same_key).
|
|
auto* base_col = _base.get_column_definition(cdef.name());
|
|
if (!base_col) {
|
|
return handle_computed_column(cdef);
|
|
}
|
|
switch (base_col->kind) {
|
|
case column_kind::partition_key:
|
|
return {_base_key.get_component(_base, base_col->position())};
|
|
case column_kind::clustering_key:
|
|
if (_update.is_static_row()) {
|
|
on_internal_error(vlogger, "Tried to get view row value for a static row update in a view with partition key having clustering columns from original table");
|
|
}
|
|
return {_update.key()->get_component(_base, base_col->position())};
|
|
default:
|
|
if (base_col->kind != _update.column_kind()) {
|
|
on_internal_error(vlogger, format("Tried to get a {} column {} from a {} row update, which is impossible",
|
|
to_sstring(base_col->kind), base_col->name_as_text(), _update.is_clustering_row() ? "clustering" : "static"));
|
|
}
|
|
auto& c = _update.cells().cell_at(base_col->id);
|
|
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
|
|
return {managed_bytes_view{value_view}};
|
|
}
|
|
}
|
|
|
|
private:
|
|
vector_type handle_computed_column(const column_definition& cdef) {
|
|
if (!cdef.is_computed()) {
|
|
throw std::logic_error{format(
|
|
"Detected legacy non-computed token column {} in view for table {}.{}",
|
|
cdef.name_as_text(), _base.ks_name(), _base.cf_name())};
|
|
}
|
|
|
|
auto& computation = cdef.get_computation();
|
|
if (auto* collection_computation = dynamic_cast<const collection_column_computation*>(&computation)) {
|
|
return handle_collection_column_computation(collection_computation);
|
|
}
|
|
|
|
// TODO: we already calculated this computation in updatable_view_key_cols,
|
|
// so perhaps we should pass it here and not re-compute it. But this will
|
|
// mean computed columns will only work for view key columns (currently
|
|
// we assume that anyway)
|
|
if (auto* c = dynamic_cast<const regular_column_transformation*>(&computation)) {
|
|
regular_column_transformation::result after =
|
|
c->compute_value(_base, _base_key, _update);
|
|
if (after.has_value()) {
|
|
return {managed_bytes_view(linearized_values.emplace_back(after.get_value()))};
|
|
}
|
|
// We only get to this function when we know the _update row
|
|
// exists and call it to read its key columns, so we don't expect
|
|
// to see a missing value for any of those columns
|
|
on_internal_error(vlogger, fmt::format("unexpected call to handle_computed_column {} missing in update", cdef.name_as_text()));
|
|
}
|
|
|
|
auto computed_value = computation.compute_value(_base, _base_key);
|
|
return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))};
|
|
}
|
|
|
|
vector_type handle_collection_column_computation(const collection_column_computation* collection_computation) {
|
|
vector_type ret;
|
|
if (collection_column_position.has_value()) {
|
|
on_internal_error(vlogger, format("Multiple columns in view (either pk or ck) are collection computed columns. Current is {}, the previous one found was {}", column_position - 1, *collection_column_position));
|
|
}
|
|
collection_column_position = column_position - 1;
|
|
|
|
for (auto& bwa : collection_computation->compute_values_with_action(_base, _base_key, _update, _existing)) {
|
|
ret.push_back({std::move(bwa), linearized_values});
|
|
}
|
|
return ret;
|
|
}
|
|
};
|
|
}
|
|
|
|
|
|
std::vector<view_updates::view_row_entry>
|
|
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, row_tombstone row_delete_tomb) {
|
|
value_getter getter(*_base, base_key, update, existing);
|
|
auto get_value = std::views::transform(std::ref(getter));
|
|
|
|
|
|
std::vector<value_getter::vector_type> pk_elems, ck_elems;
|
|
std::ranges::copy(_view->partition_key_columns() | get_value, std::back_inserter(pk_elems));
|
|
// If no collection column was found, each of the actions will contain no_action,
|
|
// in particular, it does not harm to use column 0.
|
|
const bool had_multiple_values_in_pk = bool(getter.collection_column_position);
|
|
const size_t action_column = getter.collection_column_position.value_or(0);
|
|
// Allow for at most one collection computed column in pk and in ck.
|
|
getter.collection_column_position.reset();
|
|
std::ranges::copy(_view->clustering_key_columns() | get_value, std::back_inserter(ck_elems));
|
|
const bool had_multiple_values_in_ck = bool(getter.collection_column_position);
|
|
|
|
|
|
std::vector<view_updates::view_row_entry> ret;
|
|
auto compute_row = [&]<typename Range>(Range&& pk, Range&& ck) {
|
|
partition_key pkey = partition_key::from_range(std::views::transform(pk, view_managed_key_view_and_action::get_key_view));
|
|
clustering_key ckey = clustering_key::from_range(std::views::transform(ck, view_managed_key_view_and_action::get_key_view));
|
|
auto action = (action_column < pk.size() ? pk[action_column] : ck[action_column - pk.size()])._action;
|
|
mutation_partition& partition = partition_for(std::move(pkey));
|
|
|
|
// Skip adding the row if we already wrote a partition tombstone for this partition, and the update
|
|
// is deleting the row with an equal row tombstone. This means the entire partition is deleted
|
|
// so we don't need to generate updates for individual rows.
|
|
if (partition.partition_tombstone() && partition.partition_tombstone() == row_delete_tomb.tomb()) {
|
|
return;
|
|
}
|
|
ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action});
|
|
};
|
|
|
|
if (had_multiple_values_in_pk) {
|
|
// cartesian_product expects std::vector<std::vector<>>, while we have std::vector<small_vector>.
|
|
std::vector<std::vector<view_managed_key_view_and_action>> pk_elems_, ck_elems_;
|
|
auto std_vector_from_small_vector = std::views::transform([](const auto& vector) {
|
|
return std::vector<view_managed_key_view_and_action>{vector.begin(), vector.end()};
|
|
});
|
|
std::ranges::copy(pk_elems | std_vector_from_small_vector, std::back_inserter(pk_elems_));
|
|
std::ranges::copy(ck_elems | std_vector_from_small_vector, std::back_inserter(ck_elems_));
|
|
|
|
auto cartesian_product_pk = cartesian_product(pk_elems_),
|
|
cartesian_product_ck = cartesian_product(ck_elems_);
|
|
auto ck_it = cartesian_product_ck.begin();
|
|
|
|
if (had_multiple_values_in_ck) {
|
|
// The computed collection column in clustering key was associated with the computed collection column from the partition key.
|
|
// This is a case for indexes over collection values.
|
|
|
|
auto throw_length_error = [&] {
|
|
size_t pk_size = cartesian_product_size(pk_elems_),
|
|
ck_size = cartesian_product_size(ck_elems_);
|
|
on_internal_error(vlogger, format("Computed sizes of possible partition keys and clustering keys don't match: {} != {}", pk_size, ck_size));
|
|
};
|
|
for (std::vector<view_managed_key_view_and_action>& pk : cartesian_product_pk) {
|
|
if (ck_it == cartesian_product_ck.end()) {
|
|
throw_length_error();
|
|
}
|
|
compute_row(pk, *ck_it);
|
|
++ck_it;
|
|
}
|
|
if (ck_it != cartesian_product_ck.end()) {
|
|
throw_length_error();
|
|
}
|
|
} else {
|
|
for (std::vector<view_managed_key_view_and_action>& pk : cartesian_product_pk) {
|
|
for (std::vector<view_managed_key_view_and_action>& ck : cartesian_product_ck) {
|
|
compute_row(pk, ck);
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Here it's the old regular index over regular values. Each vector has just one element.
|
|
auto get_front = std::views::transform([](const auto& v) { return v.front(); });
|
|
compute_row(pk_elems | get_front, ck_elems | get_front);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static const column_definition* view_column(const schema& base, const schema& view, column_kind kind, column_id base_id) {
|
|
// FIXME: Map base column_ids to view_column_ids, which can be something like
|
|
// a boost::small_vector where the position is the base column_id, and the
|
|
// value is either empty or the view's column_id.
|
|
return view.get_column_definition(base.column_at(kind, base_id).name());
|
|
}
|
|
|
|
// Utility function for taking an existing cell, and creating a copy with an
|
|
// empty value instead of the original value, but with the original liveness
|
|
// information (expiration and deletion time) unchanged.
|
|
static atomic_cell make_empty(const atomic_cell_view& ac) {
|
|
if (ac.is_live_and_has_ttl()) {
|
|
return atomic_cell::make_live(*empty_type, ac.timestamp(), bytes_view{}, ac.expiry(), ac.ttl());
|
|
} else if (ac.is_live()) {
|
|
return atomic_cell::make_live(*empty_type, ac.timestamp(), bytes_view{});
|
|
} else {
|
|
return atomic_cell::make_dead(ac.timestamp(), ac.deletion_time());
|
|
}
|
|
}
|
|
|
|
// Utility function for taking an existing collection which has both keys and
|
|
// values (i.e., either a list or map, but not a set), and creating a copy of
|
|
// this collection with all the values replaced by empty values.
|
|
// The make_empty() function above is used to ensure that liveness information
|
|
// is copied unchanged.
|
|
static collection_mutation make_empty(
|
|
const collection_mutation_view& cm,
|
|
const abstract_type& type) {
|
|
collection_mutation_description n;
|
|
cm.with_deserialized(type, [&] (collection_mutation_view_description m_view) {
|
|
n.tomb = m_view.tomb;
|
|
for (auto&& c : m_view.cells) {
|
|
n.cells.emplace_back(c.first, make_empty(c.second));
|
|
}
|
|
});
|
|
return n.serialize(type);
|
|
}
|
|
|
|
// In some cases, we need to copy to a view table even columns which have not
|
|
// been SELECTed. For these columns we only need to save liveness information
|
|
// (timestamp, deletion, ttl), but not the value. We call these columns
|
|
// "virtual columns", and the reason why we need them is explained in
|
|
// issue #3362. The following function, maybe_make_virtual() takes a full
|
|
// value c (taken from the base table) for the given column col, and if that
|
|
// column is a virtual column it modifies c to remove the unwanted value.
|
|
// The function create_virtual_column(), below, creates the virtual column in
|
|
// the view schema, that maybe_make_virtual() will fill.
|
|
static void maybe_make_virtual(atomic_cell_or_collection& c, const column_definition* col) {
|
|
if (!col->is_view_virtual()) {
|
|
// This is a regular selected column. Leave c untouched.
|
|
return;
|
|
}
|
|
if (col->type->is_atomic()) {
|
|
// A virtual cell for an atomic value or frozen collection. Its
|
|
// value is empty (of type empty_type).
|
|
if (col->type != empty_type) {
|
|
throw std::logic_error("Virtual cell has wrong type");
|
|
}
|
|
c = make_empty(c.as_atomic_cell(*col));
|
|
} else if (col->type->is_collection()) {
|
|
auto ctype = static_pointer_cast<const collection_type_impl>(col->type);
|
|
if (ctype->is_list()) {
|
|
// A list has timeuuids as keys, and values (the list's items).
|
|
// We just need to build a list with the same keys (and liveness
|
|
// information), but empty values.
|
|
auto ltype = static_cast<const list_type_impl*>(col->type.get());
|
|
if (ltype->get_elements_type() != empty_type) {
|
|
throw std::logic_error("Virtual cell has wrong list type");
|
|
}
|
|
c = make_empty(c.as_collection_mutation(), *ctype);
|
|
} else if (ctype->is_map()) {
|
|
// A map has keys and values. We just need to build a map with
|
|
// the same keys (and liveness information), but empty values.
|
|
auto mtype = static_cast<const map_type_impl*>(col->type.get());
|
|
if (mtype->get_values_type() != empty_type) {
|
|
throw std::logic_error("Virtual cell has wrong map type");
|
|
}
|
|
c = make_empty(c.as_collection_mutation(), *ctype);
|
|
} else if (ctype->is_set()) {
|
|
// A set has just keys (and liveness information). We need
|
|
// all of it as a virtual column, unfortunately, so we
|
|
// leave c unmodified.
|
|
} else {
|
|
// A collection can't be anything but a list, map or set...
|
|
throw std::logic_error("Virtual cell has unexpected collection type");
|
|
}
|
|
} else if (col->type->is_user_type()) {
|
|
// We leave c unmodified. See the comment in create_virtual_column regarding user types.
|
|
} else {
|
|
throw std::logic_error("Virtual cell is neither atomic nor collection, nor user type");
|
|
}
|
|
|
|
}
|
|
|
|
void create_virtual_column(schema_builder& builder, const bytes& name, const data_type& type) {
|
|
if (type->is_atomic()) {
|
|
builder.with_column(name, empty_type, column_kind::regular_column, column_view_virtual::yes);
|
|
return;
|
|
}
|
|
// A multi-cell collection or user type (a frozen collection
|
|
// or user type is a single cell and handled in the is_atomic() case above).
|
|
// The virtual version can't be just one cell, it has to be
|
|
// itself a collection of cells.
|
|
if (type->is_collection()) {
|
|
auto ctype = static_pointer_cast<const collection_type_impl>(type);
|
|
if (ctype->is_list()) {
|
|
// A list has timeuuids as keys, and values (the list's items).
|
|
// We just need these timeuuids, i.e., a list of empty items.
|
|
builder.with_column(name, list_type_impl::get_instance(empty_type, true),
|
|
column_kind::regular_column, column_view_virtual::yes);
|
|
} else if (ctype->is_map()) {
|
|
// A map has keys and values. We don't need these values,
|
|
// and can use empty values instead.
|
|
auto mtype = static_pointer_cast<const map_type_impl>(type);
|
|
builder.with_column(name, map_type_impl::get_instance(mtype->get_keys_type(), empty_type, true),
|
|
column_kind::regular_column, column_view_virtual::yes);
|
|
} else if (ctype->is_set()) {
|
|
// A set's cell has nothing beyond the keys, so the
|
|
// virtual version of a set is, unfortunately, a complete
|
|
// copy of the set.
|
|
builder.with_column(name, type, column_kind::regular_column, column_view_virtual::yes);
|
|
} else {
|
|
// A collection can't be anything but a list, map or set...
|
|
abort();
|
|
}
|
|
} else if (type->is_user_type()) {
|
|
// FIXME (kbraun): we currently use the original type itself for the virtual version.
|
|
// Instead we could try to:
|
|
// 1. use a modified UDT with all value types replaced with empty_type,
|
|
// which would require creating and storing a completely new type in the DB
|
|
// just for the purpose of virtual columns,
|
|
// 2. or use a map, which would require the make_empty function above
|
|
// to receive both the original type (UDT in this case) and virtual type (map in this case)
|
|
// to perform conversion correctly.
|
|
builder.with_column(name, type, column_kind::regular_column, column_view_virtual::yes);
|
|
} else {
|
|
throw exceptions::invalid_request_exception(
|
|
format("Unsupported unselected multi-cell non-collection, non-UDT column {} for Materialized View", name));
|
|
}
|
|
}
|
|
|
|
static void add_cells_to_view(const schema& base, const schema& view, column_kind kind, row base_cells, row& view_cells) {
|
|
base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) {
|
|
auto* view_col = view_column(base, view, kind, id);
|
|
if (view_col && !view_col->is_primary_key()) {
|
|
maybe_make_virtual(c, view_col);
|
|
view_cells.append_cell(view_col->id, std::move(c));
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates a view entry corresponding to the provided base row.
|
|
* This method checks that the base row does match the view filter before applying anything.
|
|
*/
|
|
void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now, row_marker update_marker) {
|
|
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
|
return;
|
|
}
|
|
|
|
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
|
const auto kind = update.column_kind();
|
|
for (const auto& [r, action]: view_rows) {
|
|
if (auto rm = std::get_if<row_marker>(&action)) {
|
|
r->apply(*rm);
|
|
} else {
|
|
r->apply(update_marker);
|
|
}
|
|
r->apply(update.tomb());
|
|
add_cells_to_view(*_base, *_view, kind, row(*_base, kind, update.cells()), r->cells());
|
|
}
|
|
_op_count += view_rows.size();
|
|
}
|
|
|
|
/**
|
|
* Deletes the view entry corresponding to the provided base row.
|
|
* This method checks that the base row does match the view filter before bothering.
|
|
*/
|
|
void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) {
|
|
// Before deleting an old entry, make sure it was matching the view filter
|
|
// (otherwise there is nothing to delete)
|
|
if (matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
|
do_delete_old_entry(base_key, existing, update, now, deletion_ts);
|
|
}
|
|
}
|
|
|
|
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) {
|
|
auto view_rows = get_view_rows(base_key, existing, std::nullopt, update.tomb());
|
|
const auto kind = existing.column_kind();
|
|
for (const auto& [r, action] : view_rows) {
|
|
const auto& col_ids = existing.is_clustering_row()
|
|
? _base_regular_columns_in_view_pk
|
|
: _base_static_columns_in_view_pk;
|
|
if (!col_ids.empty() || _view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
|
// The view key could have been modified because it contains or
|
|
// depends on a non-primary-key. The fact that this function was
|
|
// called instead of update_entry() means the caller knows it
|
|
// wants to delete the old row (with the given deletion_ts) and
|
|
// will create a different one. So let's honor this.
|
|
r->apply(shadowable_tombstone(deletion_ts, now));
|
|
} else {
|
|
// "update" caused the base row to have been deleted, and !col_id
|
|
// means view row is the same - so it needs to be deleted as well
|
|
// using the same deletion timestamps for the individual cells.
|
|
r->apply(update.marker());
|
|
auto diff = update.cells().difference(*_base, kind, existing.cells());
|
|
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
|
|
}
|
|
r->apply(update.tomb());
|
|
}
|
|
_op_count += view_rows.size();
|
|
}
|
|
|
|
/*
|
|
* Atomic cells have equal liveness if they're either both dead, or both non-expiring,
|
|
* or have exactly the same expiration. Comparing liveness is useful for view-virtual
|
|
* cells, as generating updates from them is not needed if their livenesses match.
|
|
*/
|
|
static bool atomic_cells_liveness_equal(atomic_cell_view left, atomic_cell_view right) {
|
|
if (left.is_live() != right.is_live()) {
|
|
return false;
|
|
}
|
|
if (left.is_live()) {
|
|
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
|
|
return false;
|
|
}
|
|
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool view_updates::can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const {
|
|
const row& existing_row = existing.cells();
|
|
const row& updated_row = update.cells();
|
|
|
|
return std::ranges::all_of(_base->regular_columns(), [this, &updated_row, &existing_row] (const column_definition& cdef) {
|
|
const auto view_it = _view->columns_by_name().find(cdef.name());
|
|
const bool column_is_selected = view_it != _view->columns_by_name().end();
|
|
|
|
// If the view has a regular column (i.e. a column that's NOT part of the base table's PK)
|
|
// as part of its PK, there are NO virtual columns corresponding to the unselected columns in the view.
|
|
// Because of that, we don't generate view updates when the value in an unselected column is created
|
|
// or changes.
|
|
if (!column_is_selected) {
|
|
return true;
|
|
}
|
|
|
|
// We cannot skip if the value was created or deleted
|
|
const auto* existing_cell = existing_row.find_cell(cdef.id);
|
|
const auto* updated_cell = updated_row.find_cell(cdef.id);
|
|
if (existing_cell == nullptr || updated_cell == nullptr) {
|
|
return existing_cell == updated_cell;
|
|
}
|
|
|
|
if (!cdef.is_atomic()) {
|
|
return existing_cell->as_collection_mutation().data == updated_cell->as_collection_mutation().data;
|
|
}
|
|
|
|
atomic_cell_view existing_cell_view = existing_cell->as_atomic_cell(cdef);
|
|
atomic_cell_view updated_cell_view = updated_cell->as_atomic_cell(cdef);
|
|
|
|
// We cannot skip when a selected column is changed
|
|
if (view_it->second->is_view_virtual()) {
|
|
return atomic_cells_liveness_equal(existing_cell_view, updated_cell_view);
|
|
}
|
|
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates the updates to apply to the existing view entry given the base table row before
|
|
* and after the update, assuming that the update hasn't changed to which view entry the
|
|
* row corresponds (that is, we know the columns composing the view PK haven't changed).
|
|
*
|
|
* This method checks that the base row (before and after) matches the view filter before
|
|
* applying anything.
|
|
*/
|
|
void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now, row_marker update_marker) {
|
|
// While we know update and existing correspond to the same view entry,
|
|
// they may not match the view filter.
|
|
if (!matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
|
create_entry(db, base_key, update, now, update_marker);
|
|
return;
|
|
}
|
|
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
|
do_delete_old_entry(base_key, existing, update, now, update_marker.timestamp());
|
|
return;
|
|
}
|
|
|
|
if (can_skip_view_updates(update, existing)) {
|
|
return;
|
|
}
|
|
|
|
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
|
|
|
const auto kind = update.column_kind();
|
|
for (const auto& [r, action] : view_rows) {
|
|
if (auto rm = std::get_if<row_marker>(&action)) {
|
|
r->apply(*rm);
|
|
} else {
|
|
r->apply(update_marker);
|
|
}
|
|
r->apply(update.tomb());
|
|
|
|
auto diff = update.cells().difference(*_base, kind, existing.cells());
|
|
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
|
|
}
|
|
_op_count += view_rows.size();
|
|
}
|
|
|
|
// Note: despite the general-sounding name of this function, it is used
|
|
// just for the case of collection indexing.
|
|
void view_updates::update_entry_for_computed_column(
|
|
const partition_key& base_key,
|
|
const clustering_or_static_row& update,
|
|
const std::optional<clustering_or_static_row>& existing,
|
|
gc_clock::time_point now) {
|
|
auto view_rows = get_view_rows(base_key, update, existing, {});
|
|
for (const auto& [r, action] : view_rows) {
|
|
struct visitor {
|
|
deletable_row* row;
|
|
gc_clock::time_point now;
|
|
void operator()(view_key_and_action::no_action) {}
|
|
void operator()(view_key_and_action::shadowable_tombstone_tag t) {
|
|
row->apply(t.into_shadowable_tombstone(now));
|
|
}
|
|
void operator()(row_marker rm) {
|
|
row->apply(rm);
|
|
}
|
|
};
|
|
std::visit(visitor{r, now}, action);
|
|
}
|
|
}
|
|
|
|
// view_updates::generate_update() is the main function for taking an update
|
|
// to a base table row - consisting of existing and updated versions of row -
|
|
// and creating from it zero or more updates to a given materialized view.
|
|
// These view updates may consist of updating an existing view row, deleting
|
|
// an old view row, and/or creating a new view row.
|
|
// There are several distinct cases depending on how many of the view's key
|
|
// columns are "new key columns", i.e., were regular key columns in the base
|
|
// or are a computed column based on a regular column (these computed columns
|
|
// are used by, for example, Alternator's GSI):
|
|
//
|
|
// Zero new key columns:
|
|
// The view rows key is composed only from base key columns, and those can't
|
|
// be changed in an update, so the view row remains alive as long as the
|
|
// base row is alive. The row marker for the view needs to be set to the
|
|
// same row marker in the base - to keep an empty view row alive for as long
|
|
// as an empty base row exists.
|
|
// Note that in this case, if there are *unselected* base columns, we may
|
|
// need to keep an empty view row alive even without a row marker because
|
|
// the base row (which has additional columns) is still alive. For that we
|
|
// have the "virtual columns" feature: In the zero new key columns case, we
|
|
// put unselected columns in the view as empty columns, to keep the view
|
|
// row alive.
|
|
//
|
|
// One new key column:
|
|
// In this case, there is a regular base column that is part of the view
|
|
// key. This regular column can be added or deleted in an update, or its
|
|
// expiration be set, and those can cause the view row - including its row
|
|
// marker - to need to appear or disappear as well. So the liveness of cell
|
|
// of this one column determines the liveness of the view row and the row
|
|
// marker that we set for it.
|
|
//
|
|
// Two or more new key columns:
|
|
// This case is explicitly NOT supported in CQL - one cannot create a view
|
|
// with more than one base-regular columns in its key. In general picking
|
|
// one liveness (timestamp and expiration) is not possible if there are
|
|
// multiple regular base columns in the view key, asthose can have different
|
|
// liveness.
|
|
// However, we do allow this case for Alternator - we need to allow the case
|
|
// of two (but not more) because the DynamoDB API allows creating a GSI
|
|
// whose two key columns (hash and range key) were regular columns. We can
|
|
// support this case in Alternator because it doesn't use expiration (the
|
|
// "TTL" it does support is different), and doesn't support user-defined
|
|
// timestamps. But, the two columns can still have different timestamps -
|
|
// this happens if an update modifies just one of them. In this case the
|
|
// timestamp of the view update (and that of the row marker) is the later
|
|
// of these two updated columns.
|
|
void view_updates::generate_update(
|
|
data_dictionary::database db,
|
|
const partition_key& base_key,
|
|
const clustering_or_static_row& update,
|
|
const std::optional<clustering_or_static_row>& existing,
|
|
gc_clock::time_point now) {
|
|
// FIXME: The following if() is old code which may be related to COMPACT
|
|
// STORAGE. If this is a real case, refer to a test that demonstrates it.
|
|
// If it's not a real case, remove this if().
|
|
if (update.is_clustering_row()) {
|
|
if (!update.key()->is_full(*_base)) {
|
|
return;
|
|
}
|
|
}
|
|
// If the view key depends on any regular column in the base, the update
|
|
// may change the view key and may require deleting an old view row and
|
|
// inserting a new row. The other case, which we'll handle here first,
|
|
// is easier and require just modifying one view row.
|
|
if (!_base_info.has_base_non_pk_columns_in_view_pk &&
|
|
!_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
|
if (update.is_static_row()) {
|
|
// TODO: support static rows in views with pk only including columns from base pk
|
|
return;
|
|
}
|
|
// The view key is necessarily the same pre and post update.
|
|
if (existing && existing->is_live(*_base)) {
|
|
if (update.is_live(*_base)) {
|
|
update_entry(db, base_key, update, *existing, now, update.marker());
|
|
} else {
|
|
delete_old_entry(db, base_key, *existing, update, now, api::missing_timestamp);
|
|
}
|
|
} else if (update.is_live(*_base)) {
|
|
create_entry(db, base_key, update, now, update.marker());
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Find the view key columns that may be changed by an update.
|
|
// This case is interesting because a change to the view key means that
|
|
// we may need to delete an old view row and/or create a new view row.
|
|
// The columns we look for are view key columns that are neither base key
|
|
// columns nor computed columns based just on key columns. In other words,
|
|
// we look here for columns which were regular columns or static columns
|
|
// in the base table, or computed columns based on regular columns.
|
|
struct updatable_view_key_col {
|
|
column_id view_col_id;
|
|
regular_column_transformation::result before;
|
|
regular_column_transformation::result after;
|
|
};
|
|
std::vector<updatable_view_key_col> updatable_view_key_cols;
|
|
for (const column_definition& view_col : _view->primary_key_columns()) {
|
|
if (view_col.is_computed()) {
|
|
const column_computation& computation = view_col.get_computation();
|
|
if (computation.depends_on_non_primary_key_column()) {
|
|
// Column is a computed column that does not depend just on
|
|
// the base key, so it may change in the update.
|
|
if (auto* c = dynamic_cast<const regular_column_transformation*>(&computation)) {
|
|
updatable_view_key_cols.emplace_back(view_col.id,
|
|
existing ? c->compute_value(*_base, base_key, *existing) : regular_column_transformation::result(),
|
|
c->compute_value(*_base, base_key, update));
|
|
} else {
|
|
// The only other column_computation we have which has
|
|
// depends_on_non_primary_key_column is
|
|
// collection_column_computation, and we have a special
|
|
// function to handle that case:
|
|
return update_entry_for_computed_column(base_key, update, existing, now);
|
|
}
|
|
}
|
|
} else {
|
|
const column_definition* base_col = _base->get_column_definition(view_col.name());
|
|
if (!base_col) {
|
|
on_internal_error(vlogger, fmt::format("Column {} in view {}.{} was not found in the base table {}.{}",
|
|
view_col.name(), _view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name()));
|
|
}
|
|
// If the view key column was also a base primary key column, then
|
|
// it can't possibly change in this update. But the column was not
|
|
// not a primary key column - i.e., a regular column or static
|
|
// column, the update might have changed it and we need to list it
|
|
// on updatable_view_key_cols.
|
|
// We check base_col->kind == update.column_kind() instead of just
|
|
// !base_col->is_primary_key() because when update is a static row
|
|
// we know it can't possibly update a regular column (and vice
|
|
// versa).
|
|
if (base_col->kind == update.column_kind()) {
|
|
// This is view key, so we know it is atomic
|
|
std::optional<atomic_cell_view> after;
|
|
auto afterp = update.cells().find_cell(base_col->id);
|
|
if (afterp) {
|
|
after = afterp->as_atomic_cell(*base_col);
|
|
}
|
|
std::optional<atomic_cell_view> before;
|
|
if (existing) {
|
|
auto beforep = existing->cells().find_cell(base_col->id);
|
|
if (beforep) {
|
|
before = beforep->as_atomic_cell(*base_col);
|
|
}
|
|
}
|
|
updatable_view_key_cols.emplace_back(view_col.id,
|
|
before ? regular_column_transformation::result(*before) : regular_column_transformation::result(),
|
|
after ? regular_column_transformation::result(*after) : regular_column_transformation::result());
|
|
}
|
|
}
|
|
}
|
|
// If we reached here, the view has a non-primary-key column from the base
|
|
// table as its primary key. That means it's either a regular or static
|
|
// column. If we are currently processing an update which does not
|
|
// correspond to the column's kind, updatable_view_key_cols will be empty
|
|
// and we can just stop here.
|
|
if (updatable_view_key_cols.empty()) {
|
|
return;
|
|
}
|
|
|
|
// Use updatable_view_key_cols - the before and after values of the
|
|
// view key columns that may have changed - to determine if the update
|
|
// changes an existing view row, deletes an old row or creates a new row.
|
|
bool has_old_row = true;
|
|
bool has_new_row = true;
|
|
bool same_row = true; // undefined if either has_old_row or has_new_row are false
|
|
for (const auto& u : updatable_view_key_cols) {
|
|
if (u.before.has_value()) {
|
|
if (u.after.has_value()) {
|
|
if (compare_unsigned(u.before.get_value(), u.after.get_value()) != 0) {
|
|
same_row = false;
|
|
}
|
|
} else {
|
|
has_new_row = false;
|
|
}
|
|
} else {
|
|
has_old_row = false;
|
|
if (!u.after.has_value()) {
|
|
has_new_row = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
// If has_new_row, calculate a row marker for this view row - i.e., a
|
|
// timestamp and ttl - based on those of the updatable view key column
|
|
// (or, in an Alternator-only extension, more than one).
|
|
row_marker new_row_rm; // only set if has_new_row
|
|
if (has_new_row) {
|
|
// Note:
|
|
// 1. By reaching here we know that updatable_view_key_cols has at
|
|
// least one member (in CQL, it's always one, in Alternator it
|
|
// may be two).
|
|
// 2. Because has_new_row, we know all elements in that array have
|
|
// after.has_value() true, so we can use after.get_ts() et al.
|
|
api::timestamp_type new_row_ts = updatable_view_key_cols[0].after.get_ts();
|
|
// This is the Alternator-only support for *two* regular base columns
|
|
// that become view key columns. The timestamp we use is the *maximum*
|
|
// of the two key columns, as explained in pull-request #17172.
|
|
if (updatable_view_key_cols.size() > 1) {
|
|
auto second_ts = updatable_view_key_cols[1].after.get_ts();
|
|
new_row_ts = std::max(new_row_ts, second_ts);
|
|
// Alternator isn't supposed to have more than two updatable view key columns!
|
|
if (updatable_view_key_cols.size() != 2) [[unlikely]] {
|
|
utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size()));
|
|
}
|
|
}
|
|
// We assume that either updatable_view_key_cols has just one column
|
|
// (the only situation allowed in CQL) or if there is more then one
|
|
// they have the same expiry information (in Alternator, there is
|
|
// never a CQL TTL set).
|
|
new_row_rm = row_marker(new_row_ts, updatable_view_key_cols[0].after.get_ttl(), updatable_view_key_cols[0].after.get_expiry());
|
|
}
|
|
|
|
if (has_old_row) {
|
|
// As explained in #19977, when there is one updatable_view_key_cols
|
|
// (the only case allowed in CQL) the deletion timestamp is before's
|
|
// timestamp. As explained in #17119, if there are two of them (only
|
|
// possible in Alternator), we take the maximum.
|
|
// Note:
|
|
// 1. By reaching here we know that updatable_view_key_cols has at
|
|
// least one member (in CQL, it's always one, in Alternator it
|
|
// may be two).
|
|
// 2. Because has_old_row, we know all elements in that array have
|
|
// before.has_value() true, so we can use before.get_ts().
|
|
auto old_row_ts = updatable_view_key_cols[0].before.get_ts();
|
|
if (updatable_view_key_cols.size() > 1) {
|
|
// This is the Alternator-only support for two regular base
|
|
// columns that become view key columns. See explanation in
|
|
// view_updates::compute_row_marker().
|
|
auto second_ts = updatable_view_key_cols[1].before.get_ts();
|
|
old_row_ts = std::max(old_row_ts, second_ts);
|
|
// Alternator isn't supposed to have more than two updatable view key columns!
|
|
if (updatable_view_key_cols.size() != 2) [[unlikely]] {
|
|
utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size()));
|
|
}
|
|
}
|
|
if (has_new_row) {
|
|
if (same_row) {
|
|
update_entry(db, base_key, update, *existing, now, new_row_rm);
|
|
} else {
|
|
// The following code doesn't work if the old and new view row
|
|
// have the same key, because if they do we can get both data
|
|
// and tombstone for the same timestamp and the tombstone
|
|
// wins. This is why we need the "same_row" case above - it's
|
|
// not just a performance optimization.
|
|
delete_old_entry(db, base_key, *existing, update, now, old_row_ts);
|
|
create_entry(db, base_key, update, now, new_row_rm);
|
|
}
|
|
} else {
|
|
delete_old_entry(db, base_key, *existing, update, now, old_row_ts);
|
|
}
|
|
} else if (has_new_row) {
|
|
create_entry(db, base_key, update, now, new_row_rm);
|
|
}
|
|
|
|
}
|
|
|
|
bool view_updates::is_partition_key_permutation_of_base_partition_key() const {
|
|
return _view_info.is_partition_key_permutation_of_base_partition_key();
|
|
}
|
|
|
|
std::optional<partition_key> view_updates::construct_view_partition_key_from_base(const partition_key& base_pk)
|
|
{
|
|
// We check that the view partition key is a permutation of the
|
|
// base partition key. If so, we can construct the corresponding
|
|
// view partition key from the base key and apply an optimized
|
|
// partition level update. Otherwise, we return std::nullopt.
|
|
|
|
if (!is_partition_key_permutation_of_base_partition_key()) {
|
|
return std::nullopt;
|
|
}
|
|
|
|
auto base_exploded_pk = base_pk.explode();
|
|
std::vector<bytes> view_exploded_pk(_view->partition_key_size());
|
|
|
|
// Construct the view partition key by finding each component
|
|
// in the base partition key.
|
|
for (const column_definition& view_cdef : _view->partition_key_columns()) {
|
|
const column_definition* base_cdef = _base->get_column_definition(view_cdef.name());
|
|
if (base_cdef && base_cdef->is_partition_key()) {
|
|
view_exploded_pk[view_cdef.id] = base_exploded_pk[base_cdef->id];
|
|
} else {
|
|
// This shouldn't happen because we already checked that all
|
|
// the view partition key columns appear in the base partition key.
|
|
on_internal_error(vlogger, format("Unexpected failure to construct view partition update for view {}.{} of {}.{}, ",
|
|
_view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name()));
|
|
}
|
|
}
|
|
|
|
partition_key view_pk = partition_key::from_exploded(view_exploded_pk);
|
|
return view_pk;
|
|
}
|
|
|
|
bool view_updates::generate_partition_tombstone_update(
|
|
data_dictionary::database db,
|
|
const partition_key& base_key,
|
|
tombstone partition_tomb) {
|
|
|
|
// Try to construct the view partition key from the base partition key.
|
|
// This will succeed if the view partition key columns are a permutation
|
|
// of the base partition key columns. If it fails, we skip the optimization.
|
|
auto view_key_opt = construct_view_partition_key_from_base(base_key);
|
|
if (!view_key_opt) {
|
|
return false;
|
|
}
|
|
|
|
// Apply the partition tombstone on the view partition
|
|
mutation_partition& mp = partition_for(std::move(*view_key_opt));
|
|
mp.apply(partition_tomb);
|
|
|
|
_op_count++;
|
|
return true;
|
|
}
|
|
|
|
future<> view_update_builder::close() noexcept {
|
|
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
|
|
}
|
|
|
|
future<stop_iteration> view_update_builder::advance_all() {
|
|
auto existings_f = _existings ? (*_existings)() : make_ready_future<mutation_fragment_v2_opt>();
|
|
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
|
_update = std::move(std::get<0>(fragments).get());
|
|
_existing = std::move(std::get<1>(fragments).get());
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<stop_iteration> view_update_builder::advance_updates() {
|
|
return _updates().then([this] (auto&& update) mutable {
|
|
_update = std::move(update);
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<stop_iteration> view_update_builder::advance_existings() {
|
|
if (!_existings) {
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
return (*_existings)().then([this] (auto&& existing) mutable {
|
|
_existing = std::move(existing);
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<stop_iteration> view_update_builder::stop() const {
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
}
|
|
|
|
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_update_builder::build_some() {
|
|
(void)co_await advance_all();
|
|
if (!_update && !_existing) {
|
|
// Tell the caller there is no more data to build.
|
|
co_return std::nullopt;
|
|
}
|
|
bool do_advance_updates = false;
|
|
bool do_advance_existings = false;
|
|
bool is_partition_tombstone_applied_on_all_views = false;
|
|
if (_update && _update->is_partition_start()) {
|
|
_key = std::move(std::move(_update)->as_partition_start().key().key());
|
|
_update_partition_tombstone = _update->as_partition_start().partition_tombstone();
|
|
do_advance_updates = true;
|
|
|
|
if (_update_partition_tombstone) {
|
|
// For views that have the same partition key as base, generate an update of partition tombstone to delete
|
|
// the entire partition in one operation, instead of generating an update for each row.
|
|
is_partition_tombstone_applied_on_all_views = true;
|
|
for (auto&& v : _view_updates) {
|
|
bool is_applied = v.is_partition_key_permutation_of_base_partition_key() && v.generate_partition_tombstone_update(_db, _key, _update_partition_tombstone);
|
|
is_partition_tombstone_applied_on_all_views &= is_applied;
|
|
}
|
|
}
|
|
}
|
|
if (_existing && _existing->is_partition_start()) {
|
|
_existing_partition_tombstone = _existing->as_partition_start().partition_tombstone();
|
|
do_advance_existings = true;
|
|
}
|
|
if (do_advance_updates) {
|
|
co_await (do_advance_existings ? advance_all() : advance_updates());
|
|
} else if (do_advance_existings) {
|
|
co_await advance_existings();
|
|
}
|
|
if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) {
|
|
co_await seastar::sleep(std::chrono::milliseconds(10));
|
|
}
|
|
|
|
// If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over
|
|
// all the rows trying to generate row updates, because the partition tombstones already cover everything.
|
|
if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) {
|
|
_skip_row_updates = true;
|
|
}
|
|
|
|
while (!_skip_row_updates && co_await on_results() == stop_iteration::no) {};
|
|
|
|
utils::chunked_vector<frozen_mutation_and_schema> mutations;
|
|
for (auto& update : _view_updates) {
|
|
co_await update.move_to(mutations);
|
|
}
|
|
co_return mutations;
|
|
}
|
|
|
|
void view_update_builder::generate_update(clustering_row&& update, std::optional<clustering_row>&& existing) {
|
|
if (update.empty()) {
|
|
// An empty update row (no cells and no tombstone) is rare, but it is
|
|
// possible (see #15228): A mutation can modify a column that was
|
|
// later dropped, and upgrade()ing the mutation's schema in
|
|
// table::do_push_view_replica_updates() left an empty row.
|
|
return;
|
|
}
|
|
|
|
auto dk = dht::decorate_key(*_schema, _key);
|
|
const auto gc_state = _base.get_tombstone_gc_state();
|
|
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
|
|
|
|
// We allow existing to be disengaged, which we treat the same as an empty row.
|
|
if (existing) {
|
|
existing->marker().compact_and_expire(existing->tomb().tomb(), _now, always_gc, gc_before);
|
|
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, existing->tomb(), _now, always_gc, gc_before, existing->marker());
|
|
update.apply(*_schema, *existing);
|
|
}
|
|
|
|
update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before);
|
|
update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker());
|
|
|
|
const auto update_row = clustering_or_static_row(std::move(update));
|
|
const auto existing_row = existing
|
|
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
|
: std::optional<clustering_or_static_row>();
|
|
for (auto&& v : _view_updates) {
|
|
v.generate_update(_db, _key, update_row, existing_row, _now);
|
|
}
|
|
}
|
|
|
|
void view_update_builder::generate_update(static_row&& update, const tombstone& update_tomb,
|
|
std::optional<static_row>&& existing, const tombstone& existing_tomb) {
|
|
if (!update_tomb && update.empty()) {
|
|
throw std::logic_error("A materialized view update cannot be empty");
|
|
}
|
|
|
|
auto dk = dht::decorate_key(*_schema, _key);
|
|
const auto gc_state = _base.get_tombstone_gc_state();
|
|
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
|
|
|
|
// We allow existing to be disengaged, which we treat the same as an empty row.
|
|
if (existing) {
|
|
existing->cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(existing_tomb), _now, always_gc, gc_before);
|
|
update.apply(*_schema, static_row(*_schema, *existing));
|
|
}
|
|
|
|
update.cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(update_tomb), _now, always_gc, gc_before);
|
|
|
|
const auto update_row = clustering_or_static_row(std::move(update));
|
|
const auto existing_row = existing
|
|
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
|
: std::optional<clustering_or_static_row>();
|
|
for (auto&& v : _view_updates) {
|
|
v.generate_update(_db, _key, update_row, existing_row, _now);
|
|
}
|
|
}
|
|
|
|
future<stop_iteration> view_update_builder::on_results() {
|
|
constexpr size_t max_rows_for_view_updates = 100;
|
|
auto should_stop_updates = [this] () -> bool {
|
|
size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) {
|
|
return acc + vu.op_count();
|
|
});
|
|
return rows_for_view_updates >= max_rows_for_view_updates;
|
|
};
|
|
if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) {
|
|
auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
|
|
if (cmp < 0) {
|
|
// We have an update where there was nothing before
|
|
if (_update->is_range_tombstone_change()) {
|
|
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
|
} else if (_update->is_clustering_row()) {
|
|
auto update = std::move(*_update).as_clustering_row();
|
|
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
|
auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
|
auto existing = tombstone
|
|
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
|
: std::nullopt;
|
|
generate_update(std::move(update), std::move(existing));
|
|
} else if (_update->is_static_row()) {
|
|
auto update = std::move(*_update).as_static_row();
|
|
auto tombstone = _existing_partition_tombstone;
|
|
auto existing = tombstone
|
|
? std::optional<static_row>(std::in_place)
|
|
: std::nullopt;
|
|
generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
|
}
|
|
return should_stop_updates() ? stop() : advance_updates();
|
|
}
|
|
if (cmp > 0) {
|
|
// We have something existing but no update (which will happen either because it's a range tombstone marker in
|
|
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
|
|
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
|
|
// row is included, even though we didn't modify it.
|
|
if (_existing->is_range_tombstone_change()) {
|
|
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
|
} else if (_existing->is_clustering_row()) {
|
|
auto existing = std::move(*_existing).as_clustering_row();
|
|
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
|
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
|
// The way we build the read command used for existing rows, we should always have a non-empty
|
|
// tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the
|
|
// read method ever changes.
|
|
if (tombstone) {
|
|
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
|
generate_update(std::move(update), { std::move(existing) });
|
|
}
|
|
} else if (_existing->is_static_row()) {
|
|
auto existing = std::move(*_existing).as_static_row();
|
|
auto tombstone = _update_partition_tombstone;
|
|
// The static row might be unintentionally included when fetching existing clustering rows,
|
|
// even if the static row was not updated. We can detect it. A static row can be affected either by:
|
|
//
|
|
// 1. A static row in the update mutation
|
|
// 2. A partition tombstone in the update mutation
|
|
//
|
|
// If neither of those is present, this means that the static row is included accidentally.
|
|
// If we are here, this means that (1) is not present. The `if` that follows checks for (2).
|
|
if (tombstone) {
|
|
auto update = static_row();
|
|
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
|
}
|
|
}
|
|
return should_stop_updates() ? stop () : advance_existings();
|
|
}
|
|
// We're updating a row that had pre-existing data
|
|
if (_update->is_range_tombstone_change()) {
|
|
SCYLLA_ASSERT(_existing->is_range_tombstone_change());
|
|
_existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone();
|
|
_update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone();
|
|
} else if (_update->is_clustering_row()) {
|
|
SCYLLA_ASSERT(_existing->is_clustering_row());
|
|
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
|
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
|
});
|
|
_existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
|
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
|
});
|
|
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
|
|
} else if (_update->is_static_row()) {
|
|
if (!_existing->is_static_row()) {
|
|
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
|
|
mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing)));
|
|
}
|
|
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone);
|
|
|
|
}
|
|
return should_stop_updates() ? stop() : advance_all();
|
|
}
|
|
|
|
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
|
if (tombstone && _existing && !_existing->is_end_of_partition()) {
|
|
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
|
|
if (_existing->is_clustering_row()) {
|
|
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
|
|
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
|
generate_update(std::move(update), { std::move(existing) });
|
|
} else if (_existing->is_static_row()) {
|
|
auto existing = static_row(*_schema, _existing->as_static_row());
|
|
auto update = static_row();
|
|
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
|
}
|
|
return should_stop_updates() ? stop() : advance_existings();
|
|
}
|
|
|
|
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
|
|
if (_update && !_update->is_end_of_partition()) {
|
|
if (_update->is_clustering_row()) {
|
|
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
|
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
|
});
|
|
auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
|
auto existing = existing_tombstone
|
|
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
|
: std::nullopt;
|
|
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
|
} else if (_update->is_static_row()) {
|
|
auto existing_tombstone = _existing_partition_tombstone;
|
|
auto existing = existing_tombstone
|
|
? std::optional<static_row>(std::in_place)
|
|
: std::nullopt;
|
|
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
|
}
|
|
return should_stop_updates() ? stop() : advance_updates();
|
|
}
|
|
|
|
return stop();
|
|
}
|
|
|
|
view_update_builder make_view_update_builder(
|
|
data_dictionary::database db,
|
|
const replica::table& base_table,
|
|
const schema_ptr& base,
|
|
std::vector<view_ptr>&& views_to_update,
|
|
mutation_reader&& updates,
|
|
mutation_reader_opt&& existings,
|
|
gc_clock::time_point now) {
|
|
auto vs = views_to_update | std::views::transform([&] (view_ptr v) {
|
|
return view_updates(std::move(v), base);
|
|
}) | std::ranges::to<std::vector<view_updates>>();
|
|
return view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
|
|
}
|
|
|
|
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_dictionary::database db,
|
|
const schema& base,
|
|
const dht::decorated_key& key,
|
|
const mutation_partition& mp,
|
|
const std::vector<view_ptr>& views) {
|
|
// WARNING: interval<clustering_key_prefix_view> is unsafe - refer to scylladb#22817 and scylladb#21604
|
|
utils::chunked_vector<interval<clustering_key_prefix_view>> row_ranges;
|
|
utils::chunked_vector<interval<clustering_key_prefix_view>> view_row_ranges;
|
|
clustering_key_prefix_view::tri_compare cmp(base);
|
|
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
|
|
for (auto&& v : views) {
|
|
// FIXME: #2371
|
|
if (v->view_info()->select_statement(db).get_restrictions()->has_unrestricted_clustering_columns()) {
|
|
view_row_ranges.push_back(interval<clustering_key_prefix_view>::make_open_ended_both_sides());
|
|
break;
|
|
}
|
|
for (auto&& r : v->view_info()->partition_slice(db).default_row_ranges()) {
|
|
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
}
|
|
}
|
|
if (mp.partition_tombstone()) {
|
|
std::swap(row_ranges, view_row_ranges);
|
|
} else {
|
|
// FIXME: Optimize, as most often than not clustering keys will not be restricted.
|
|
for (auto&& rt : mp.row_tombstones()) {
|
|
interval<clustering_key_prefix_view> rtr(
|
|
bound_view::to_interval_bound<interval>(rt.start_bound()),
|
|
bound_view::to_interval_bound<interval>(rt.end_bound()));
|
|
for (auto&& vr : view_row_ranges) {
|
|
// WARNING: interval<clustering_key_prefix_view>::intersection can return incorrect results - refer to scylladb#8157 and scylladb#21604
|
|
auto overlap = rtr.intersection(vr, cmp);
|
|
if (overlap) {
|
|
row_ranges.push_back(std::move(overlap).value());
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
}
|
|
}
|
|
|
|
for (auto&& row : mp.clustered_rows()) {
|
|
if (update_requires_read_before_write(db, base, views, key, row)) {
|
|
row_ranges.emplace_back(row.key());
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
// Note that the views could have restrictions on regular columns,
|
|
// but even if that's the case we shouldn't apply those when we read,
|
|
// because even if an existing row doesn't match the view filter, the
|
|
// update can change that in which case we'll need to know the existing
|
|
// content, in case the view includes a column that is not included in
|
|
// this mutation.
|
|
|
|
query::clustering_row_ranges result_ranges;
|
|
// FIXME: scylladb#22817 - interval<clustering_key_prefix_view>::deoverlap can return incorrect results
|
|
auto deoverlapped_ranges = interval<clustering_key_prefix_view>::deoverlap(std::move(row_ranges), cmp);
|
|
result_ranges.reserve(deoverlapped_ranges.size());
|
|
for (auto&& r : deoverlapped_ranges) {
|
|
result_ranges.emplace_back(std::move(r).transform([] (auto&& ckv) { return clustering_key_prefix(ckv); }));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return result_ranges;
|
|
}
|
|
|
|
bool needs_static_row(const mutation_partition& mp, const std::vector<view_ptr>& views) {
|
|
// TODO: We could also check whether any of the views need static rows
|
|
// and return false if none of them do
|
|
return mp.partition_tombstone() || !mp.static_row().empty();
|
|
}
|
|
|
|
bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const locator::effective_replication_map_ptr& ermp, dht::token token) {
|
|
// Based on the computation in get_view_natural_endpoint, this is used
|
|
// to detect beforehand the case that we're a "normal" replica which is
|
|
// paired with a view replica and sends view updates to.
|
|
// A base replica can be paired with a view replica if it's a "normal" non-pending replica that is
|
|
// returned by get_replicas() or it could be a pending replica that is also a read replica
|
|
// and returned by get_replicas_for_reading().
|
|
// For a pending replica that is not ready for reading, for example, this will return false.
|
|
// Also, for the case of intra-node migration, we check that this shard is ready for reads.
|
|
|
|
const auto me = ermp->get_token_metadata_ptr()->get_topology().my_host_id();
|
|
const auto base_replicas = ermp->get_replicas(token);
|
|
const auto read_replicas = ermp->get_replicas_for_reading(token);
|
|
const auto shards = ermp->shards_ready_for_reads(*base, token);
|
|
|
|
return (std::ranges::contains(base_replicas, me) || std::ranges::contains(read_replicas, me))
|
|
&& std::ranges::contains(shards, this_shard_id());
|
|
}
|
|
|
|
static endpoints_to_update get_view_natural_endpoint_vnodes(
|
|
locator::host_id me,
|
|
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
|
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
|
locator::endpoint_dc_rack my_location,
|
|
const bool network_topology,
|
|
replica::cf_stats& cf_stats) {
|
|
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
|
node_vector base_endpoints, view_endpoints;
|
|
auto& my_datacenter = my_location.dc;
|
|
|
|
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
|
if (!network_topology || node.get().dc() == my_datacenter) {
|
|
nodes.emplace_back(node);
|
|
}
|
|
};
|
|
|
|
for (auto&& base_node : base_nodes) {
|
|
process_candidate(base_endpoints, base_node);
|
|
}
|
|
|
|
for (auto&& view_node : view_nodes) {
|
|
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
|
// If this base replica is also one of the view replicas, we use
|
|
// ourselves as the view replica.
|
|
// We don't return an extra endpoint, as it's only needed when
|
|
// using tablets (so !use_legacy_self_pairing)
|
|
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
|
return {.natural_endpoint = me};
|
|
}
|
|
|
|
// We have to remove any endpoint which is shared between the base
|
|
// and the view, as it will select itself and throw off the counts
|
|
// otherwise.
|
|
if (it != base_endpoints.end()) {
|
|
base_endpoints.erase(it);
|
|
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
|
|
view_endpoints.push_back(view_node);
|
|
}
|
|
}
|
|
|
|
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
|
if (base_it == base_endpoints.end()) {
|
|
// This node is not a base replica of this key, so we return empty
|
|
// FIXME: This case shouldn't happen, and if it happens, a view update
|
|
// would be lost.
|
|
++cf_stats.total_view_updates_on_wrong_node;
|
|
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
|
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
|
return {};
|
|
}
|
|
size_t idx = base_it - base_endpoints.begin();
|
|
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
|
|
}
|
|
|
|
static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
|
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
|
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
|
replica::cf_stats& cf_stats) {
|
|
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
|
|
for (auto&& base_node : base_nodes) {
|
|
if (base_dc_racks.contains(base_node.get().dc_rack())) {
|
|
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
|
++cf_stats.total_view_updates_failed_pairing;
|
|
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
|
|
base_node.get().dc(), base_node.get().rack());
|
|
return std::nullopt;
|
|
}
|
|
base_dc_racks.insert(base_node.get().dc_rack());
|
|
}
|
|
|
|
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
|
|
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
|
|
for (auto&& view_node : view_nodes) {
|
|
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
|
|
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
|
++cf_stats.total_view_updates_failed_pairing;
|
|
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
|
|
view_node.get().dc(), view_node.get().rack());
|
|
return std::nullopt;
|
|
}
|
|
// Track unpaired replicas in both sets
|
|
if (base_dc_racks.contains(view_node.get().dc_rack())) {
|
|
paired_view_dc_racks.insert(view_node.get().dc_rack());
|
|
} else {
|
|
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
|
|
}
|
|
}
|
|
|
|
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
|
// There are view replicas that can't be paired with any base replica
|
|
// This can happen as a result of an RF change when the view replica finishes streaming
|
|
// before the base replica.
|
|
// Because of this, a view replica might not get paired with any base replica, so we need
|
|
// to send an additional update to it.
|
|
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
|
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
|
|
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
|
|
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
|
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
|
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
|
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
|
unpaired_view_dc_rack_replicas | std::views::values);
|
|
}
|
|
return extra_replica;
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
// Calculate the node ("natural endpoint") to which this node should send
|
|
// a view update.
|
|
//
|
|
// A materialized view table is in the same keyspace as its base table,
|
|
// and in particular both have the same replication factor. Therefore it
|
|
// is possible, for a particular base partition and related view partition
|
|
// to "pair" between the base replicas and view replicas holding those
|
|
// partitions. The first (in ring order) base replica is paired with the
|
|
// first view replica, the second with the second, and so on. The purpose
|
|
// of this function is to find, assuming that this node is one of the base
|
|
// replicas for a given partition, the paired view replica.
|
|
//
|
|
// When using vnodes, we have an optimization called "self-pairing" - if a single
|
|
// node is both a base replica and a view replica for a write, the pairing is
|
|
// modified so that this node sends the update to itself and this node is removed
|
|
// from the lists of nodes paired by index. This self-pairing optimization can
|
|
// cause the pairing to change after view ranges are moved between nodes.
|
|
//
|
|
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
|
|
// we pair only nodes in the same datacenter.
|
|
//
|
|
// If the table uses tablets, then pairing is rack-aware. In this case, in each
|
|
// rack where we have a base replica there is also one replica of each view tablet.
|
|
// Therefore, the base replicas are naturally paired with the view replicas that
|
|
// are in the same rack.
|
|
//
|
|
// If the assumption that the given base token belongs to this replica
|
|
// does not hold, we return an empty optional.
|
|
//
|
|
// Aside from the paired replica, we may return another replica that we should
|
|
// send the update to - the extra replica is then returned as the second replica
|
|
// in the pair. The second replica is returned when it can't get paired with
|
|
// any base replica. This can happen when the base table is undergoing
|
|
// an increase in the replication factor. The view replica may finish the
|
|
// RF change before the base replica, and since we only pair replicas that
|
|
// finished the change, the view replica may not have a base replica to pair with.
|
|
// To make sure it gets the update regardless, each base replica which detects such
|
|
// a scenario will also send the update to the new view replica.
|
|
// If this scenario does not occur, we return an empty optional as the
|
|
// second replica.
|
|
// We don't need to return the second replica if we have more base replicas than
|
|
// view replicas - in that case the corresponding view replica will get the
|
|
// update anyway, because either the streaming didn't start yet, so the update
|
|
// will get streamed later, or it's still pending and it will get the update
|
|
// because we also send updates from all base replicas to the pending view replicas.
|
|
endpoints_to_update get_view_natural_endpoint(
|
|
locator::host_id me,
|
|
const locator::effective_replication_map_ptr& base_erm,
|
|
const locator::effective_replication_map_ptr& view_erm,
|
|
const bool network_topology,
|
|
const dht::token& base_token,
|
|
const dht::token& view_token,
|
|
bool use_tablets,
|
|
replica::cf_stats& cf_stats) {
|
|
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
|
|
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
|
|
auto& my_location = topology.get_location(me);
|
|
|
|
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
|
|
if (auto* np = topology.find_node(ep)) {
|
|
return *np;
|
|
}
|
|
throw std::runtime_error(format("get_view_natural_endpoint: {} replica {} not found in topology", is_view ? "view" : "base", ep));
|
|
};
|
|
|
|
// We need to use get_replicas() for pairing to be stable in case base or view tablet
|
|
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
|
|
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
|
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
|
|
return resolve(topology, ep, false);
|
|
}) | std::ranges::to<node_vector>();
|
|
auto view_nodes = view_erm->get_replicas(view_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
|
|
return resolve(view_topology, ep, true);
|
|
}) | std::ranges::to<node_vector>();
|
|
|
|
// if we're a pending base replica and we're ready for reading then we should generate view updates same as
|
|
// the base replica that we are about to replace in the base-view pairing.
|
|
if (!std::ranges::contains(base_nodes, me, std::mem_fn(&locator::node::host_id))) {
|
|
// if we got here it's probably because we're a pending base replica that is ready for reads.
|
|
auto base_reading_replicas = std::unordered_set(std::from_range, base_erm->get_replicas_for_reading(base_token));
|
|
if (base_reading_replicas.contains(me)) {
|
|
// find a normal base replica that is not a reading replica - it's a leaving base replica that we replace.
|
|
auto it = std::ranges::find_if(base_nodes, [&] (const locator::node& n) {
|
|
return !base_reading_replicas.contains(n.host_id());
|
|
});
|
|
if (it != base_nodes.end()) {
|
|
// call get_view_natural_endpoint recursively with the leaving base replica as 'me' to get the same
|
|
// view pairing as the leaving base replica.
|
|
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
|
|
auto leaving_base = it->get().host_id();
|
|
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, network_topology, base_token,
|
|
view_token, use_tablets, cf_stats);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!use_tablets) {
|
|
return get_view_natural_endpoint_vnodes(
|
|
me,
|
|
base_nodes,
|
|
view_nodes,
|
|
my_location,
|
|
network_topology,
|
|
cf_stats);
|
|
}
|
|
|
|
std::optional<locator::host_id> paired_replica;
|
|
for (auto&& view_node : view_nodes) {
|
|
if (view_node.get().dc_rack() == my_location) {
|
|
paired_replica = view_node.get().host_id();
|
|
break;
|
|
}
|
|
}
|
|
if (paired_replica && base_nodes.size() == view_nodes.size()) {
|
|
// We don't need to find any extra replicas, so we can return early
|
|
return {.natural_endpoint = paired_replica};
|
|
}
|
|
if (!paired_replica) {
|
|
// We couldn't find any view replica in our rack
|
|
++cf_stats.total_view_updates_failed_pairing;
|
|
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
|
|
me,
|
|
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
|
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
|
}
|
|
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
|
|
return {.natural_endpoint = paired_replica,
|
|
.endpoint_with_no_pairing = no_pairing_replica};
|
|
}
|
|
|
|
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
|
locator::host_id target, host_id_vector_topology_change pending_endpoints,
|
|
frozen_mutation_and_schema mut, const dht::token& base_token, const dht::token& view_token,
|
|
service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) {
|
|
// The "delay_before_remote_view_update" injection point can be
|
|
// used to add a short delay (currently 0.5 seconds) before a base
|
|
// replica sends its update to the remote view replica.
|
|
co_await utils::get_local_injector().inject("delay_before_remote_view_update", 500ms);
|
|
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
|
|
mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token);
|
|
co_await proxy.send_to_endpoint(
|
|
std::move(mut),
|
|
std::move(ermp),
|
|
target,
|
|
std::move(pending_endpoints),
|
|
db::write_type::VIEW,
|
|
std::move(tr_state),
|
|
allow_hints,
|
|
service::is_cancellable::yes);
|
|
while (utils::get_local_injector().enter("never_finish_remote_view_updates")) {
|
|
co_await seastar::sleep(100ms);
|
|
}
|
|
}
|
|
|
|
static bool should_update_synchronously(const schema& s) {
|
|
auto tag_opt = db::find_tag(s, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY);
|
|
if (!tag_opt.has_value()) {
|
|
return false;
|
|
}
|
|
|
|
return *tag_opt == "true";
|
|
}
|
|
|
|
size_t memory_usage_of(const frozen_mutation_and_schema& mut) {
|
|
// Overhead of sending a view mutation, in terms of data structures used by the storage_proxy, as well as possible background tasks
|
|
// allocated for a remote view update.
|
|
constexpr size_t base_overhead_bytes = 2288;
|
|
return base_overhead_bytes + mut.fm.representation().size();
|
|
}
|
|
|
|
// Take the view mutations generated by generate_view_updates(), which pertain
|
|
// to a modification of a single base partition, and apply them to the
|
|
// appropriate paired replicas. This is done asynchronously - we do not wait
|
|
// for the writes to complete.
|
|
future<> view_update_generator::mutate_MV(
|
|
schema_ptr base,
|
|
dht::token base_token,
|
|
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
|
|
db::view::stats& stats,
|
|
replica::cf_stats& cf_stats,
|
|
tracing::trace_state_ptr tr_state,
|
|
db::timeout_semaphore_units pending_view_update_memory_units,
|
|
service::allow_hints allow_hints,
|
|
wait_for_all_updates wait_for_all)
|
|
{
|
|
auto& ks = _db.find_keyspace(base->ks_name());
|
|
const bool uses_tablets = ks.uses_tablets();
|
|
const bool uses_nts = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy()) != nullptr;
|
|
// The object pointed by `ks` may disappear after preeemption. It should not be touched again after this comment.
|
|
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
|
|
auto get_erm = [&] (table_id id) {
|
|
auto it = erms.find(id);
|
|
if (it == erms.end()) {
|
|
it = erms.emplace(id, _db.find_column_family(id).get_effective_replication_map()).first;
|
|
}
|
|
return it->second;
|
|
};
|
|
auto base_ermp = get_erm(base->id());
|
|
for (const auto& mut : view_updates) {
|
|
(void)get_erm(mut.s->id());
|
|
}
|
|
auto me = base_ermp->get_topology().my_host_id();
|
|
static constexpr size_t max_concurrent_updates = 128;
|
|
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
|
co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> {
|
|
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
|
auto view_ermp = erms.at(mut.s->id());
|
|
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, uses_nts, base_token, view_token,
|
|
uses_tablets, cf_stats);
|
|
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
|
|
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
|
|
if (no_pairing_endpoint) {
|
|
// The no_pairing_endpoint can appear during a replication factor increase when the base tablet migration finished after
|
|
// the corresponding (for current token) view tablet migration. In this case, the view replica is no longer
|
|
// in migration (so it's not present in the pending endpoints list), but the base replica didn't start the migration yet
|
|
// or is still pending. We don't pair pending replicas, so no base replica will pair with this view replica and we need
|
|
// to send the update to it.
|
|
// FIXME: Here we may unnecessarily send the update to the no_pairing_endpoint from many base replicas,
|
|
// similarly to https://github.com/scylladb/scylladb/issues/7711
|
|
remote_endpoints.push_back(std::move(*no_pairing_endpoint));
|
|
}
|
|
|
|
const bool update_synchronously = should_update_synchronously(*mut.s);
|
|
if (update_synchronously) {
|
|
tracing::trace(tr_state, "Forcing {}.{} view update to be synchronous (synchronous_updates property was set)",
|
|
mut.s->ks_name(), mut.s->cf_name()
|
|
);
|
|
}
|
|
// If a view is marked with the synchronous_updates property, we should wait for all.
|
|
const bool apply_update_synchronously = wait_for_all || update_synchronously;
|
|
|
|
// First, find the local endpoint and ensure that if it exists,
|
|
// it will be the target endpoint. That way, all endpoints in the
|
|
// remote_endpoints list are guaranteed to be remote.
|
|
auto my_address = view_ermp->get_topology().my_host_id();
|
|
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), my_address);
|
|
if (remote_it != remote_endpoints.end()) {
|
|
if (!target_endpoint) {
|
|
target_endpoint = *remote_it;
|
|
remote_endpoints.erase(remote_it);
|
|
} else {
|
|
// Remove the duplicated entry
|
|
if (*target_endpoint == *remote_it) {
|
|
remote_endpoints.erase(remote_it);
|
|
} else {
|
|
auto target_remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
|
|
if (target_remote_it != remote_endpoints.end()) {
|
|
target_endpoint = *remote_it;
|
|
remote_endpoints.erase(remote_it);
|
|
} else {
|
|
std::swap(*target_endpoint, *remote_it);
|
|
}
|
|
}
|
|
}
|
|
} else if (target_endpoint) {
|
|
// It's still possible that a target endpoint is duplicated in the remote endpoints list,
|
|
// so let's get rid of the duplicate if it exists
|
|
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
|
|
if (remote_it != remote_endpoints.end()) {
|
|
remote_endpoints.erase(remote_it);
|
|
}
|
|
}
|
|
|
|
future<> local_view_update = make_ready_future<>();
|
|
if (target_endpoint && *target_endpoint == my_address) {
|
|
++stats.view_updates_pushed_local;
|
|
++cf_stats.total_view_updates_pushed_local;
|
|
++stats.writes;
|
|
auto mut_ptr = remote_endpoints.empty() ? std::make_unique<frozen_mutation>(std::move(mut.fm)) : std::make_unique<frozen_mutation>(mut.fm);
|
|
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
|
|
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
|
|
// For local view updates, we limit the number of concurrent view updates on each shard and for each service level
|
|
// to database::max_concurrent_local_view_updates. Local view updates are cpu-bound, so the cpu won't idle even
|
|
// if the concurrency is low and executing too many view updates concurrently can unnecessarily increase latency and memory usage.
|
|
auto count_units = co_await seastar::get_units(_db.get_view_update_concurrency_sem(), 1);
|
|
local_view_update = _proxy.local().mutate_mv_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped(
|
|
[s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr),
|
|
count_units = std::move(count_units), memory_units, this] (future<>&& f) mutable {
|
|
--stats.writes;
|
|
memory_units = nullptr;
|
|
_proxy.local().update_view_update_backlog();
|
|
if (f.failed()) {
|
|
++stats.view_updates_failed_local;
|
|
++cf_stats.total_view_updates_failed_local;
|
|
auto ep = f.get_exception();
|
|
tracing::trace(tr_state, "Failed to apply local view update for {}", my_address);
|
|
vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}",
|
|
my_address, s->ks_name(), s->cf_name(), base_token, view_token, ep);
|
|
return make_exception_future<>(std::move(ep));
|
|
}
|
|
tracing::trace(tr_state, "Successfully applied local view update for {}", my_address);
|
|
return make_ready_future<>();
|
|
});
|
|
// We just applied a local update to the target endpoint, so it should now be removed
|
|
// from the possible targets
|
|
target_endpoint.reset();
|
|
}
|
|
|
|
// If target endpoint is not engaged, but there are remote endpoints,
|
|
// one of the remote endpoints should become a primary target
|
|
if (!target_endpoint && !remote_endpoints.empty()) {
|
|
target_endpoint = std::move(remote_endpoints.back());
|
|
remote_endpoints.pop_back();
|
|
}
|
|
|
|
// If target_endpoint is engaged by this point, then either the update
|
|
// is not local, or the local update was already applied but we still
|
|
// have pending endpoints to send to.
|
|
if (target_endpoint) {
|
|
size_t updates_pushed_remote = remote_endpoints.size() + 1;
|
|
stats.view_updates_pushed_remote += updates_pushed_remote;
|
|
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
|
|
schema_ptr s = mut.s;
|
|
future<> remote_view_update = apply_to_remote_endpoints(_proxy.local(), std::move(view_ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
|
|
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
|
|
memory_units, apply_update_synchronously, this] (future<>&& f) mutable {
|
|
memory_units = nullptr;
|
|
_proxy.local().update_view_update_backlog();
|
|
if (f.failed()) {
|
|
stats.view_updates_failed_remote += updates_pushed_remote;
|
|
cf_stats.total_view_updates_failed_remote += updates_pushed_remote;
|
|
auto ep = f.get_exception();
|
|
tracing::trace(tr_state, "Failed to apply view update for {} and {} remote endpoints",
|
|
*target_endpoint, updates_pushed_remote);
|
|
|
|
// Printing an error on every failed view mutation would cause log spam, so a rate limit is needed.
|
|
static thread_local logger::rate_limit view_update_error_rate_limit(std::chrono::seconds(4));
|
|
vlogger.log(log_level::warn, view_update_error_rate_limit,
|
|
"Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}",
|
|
*target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep);
|
|
return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>();
|
|
}
|
|
tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints",
|
|
*target_endpoint, updates_pushed_remote);
|
|
return make_ready_future<>();
|
|
});
|
|
if (apply_update_synchronously) {
|
|
co_return co_await when_all_succeed(
|
|
std::move(local_view_update), std::move(remote_view_update)).discard_result();
|
|
} else {
|
|
// The update is sent to background in order to preserve availability,
|
|
// its parallelism is limited by view_update_concurrency_semaphore
|
|
(void)remote_view_update;
|
|
}
|
|
}
|
|
co_return co_await std::move(local_view_update);
|
|
});
|
|
}
|
|
|
|
view_builder::view_builder(replica::database& db, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks, service::migration_notifier& mn, view_update_generator& vug, service::raft_group0_client& group0_client, cql3::query_processor& qp)
|
|
: _db(db)
|
|
, _sys_ks(sys_ks)
|
|
, _sys_dist_ks(sys_dist_ks)
|
|
, _group0_client(group0_client)
|
|
, _qp(qp)
|
|
, _mnotifier(mn)
|
|
, _vug(vug)
|
|
, _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout, {}))
|
|
, _upgrade_phaser("view_builder::upgrade_phaser")
|
|
{
|
|
setup_metrics();
|
|
}
|
|
|
|
void view_builder::setup_metrics() {
|
|
namespace sm = seastar::metrics;
|
|
|
|
_metrics.add_group("view_builder", {
|
|
sm::make_gauge("pending_bookkeeping_ops",
|
|
sm::description("Number of tasks waiting to perform bookkeeping operations"),
|
|
[this] { return _sem.waiters(); }),
|
|
|
|
sm::make_counter("steps_performed",
|
|
sm::description("Number of performed build steps."),
|
|
_stats.steps_performed),
|
|
|
|
sm::make_counter("steps_failed",
|
|
sm::description("Number of failed build steps."),
|
|
_stats.steps_failed),
|
|
|
|
sm::make_gauge("builds_in_progress",
|
|
sm::description("Number of currently active view builds."),
|
|
[this] { return _base_to_build_step.size(); })(basic_level)
|
|
});
|
|
}
|
|
|
|
future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
|
|
auto step_fiber = make_ready_future<>();
|
|
try {
|
|
view_builder_init_state vbi;
|
|
auto fail = defer([&barrier] mutable { barrier.abort(); });
|
|
// Semaphore usage invariants:
|
|
// - One unit of _sem serializes all per-shard bookkeeping that mutates view-builder state
|
|
// (_base_to_build_step, _built_views, build_status, reader resets).
|
|
// - The unit is held for the whole operation, including the async chain, until the state
|
|
// is stable for the next operation on that shard.
|
|
// - Cross-shard operations acquire _sem on shard 0 for the duration of the broadcast.
|
|
// Other shards acquire their own _sem only around their local handling; shard 0 skips
|
|
// the local acquire because it already holds the unit from the dispatcher.
|
|
// Guard the whole startup routine with a semaphore so that it's not intercepted by
|
|
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
|
|
auto units = co_await get_units(_sem, view_builder_semaphore_units);
|
|
// Wait for schema agreement even if we're a seed node.
|
|
co_await mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as);
|
|
|
|
auto built = co_await _sys_ks.load_built_views();
|
|
auto in_progress = co_await _sys_ks.load_view_build_progress();
|
|
setup_shard_build_step(vbi, std::move(built), std::move(in_progress));
|
|
// All shards need to arrive at the same decisions on whether or not to
|
|
// restart a view build at some common token (reshard), and which token
|
|
// to restart at. So we need to wait until all shards have read the view
|
|
// build statuses before they can all proceed to make the (same) decision.
|
|
// If we don't synchronize here, a fast shard may make a decision, start
|
|
// building and finish a build step - before the slowest shard even read
|
|
// the view build information.
|
|
fail.cancel();
|
|
co_await barrier.arrive_and_wait();
|
|
|
|
_mnotifier.register_listener(this);
|
|
co_await calculate_shard_build_step(vbi);
|
|
_current_step = _base_to_build_step.begin();
|
|
|
|
// If preparation above fails, run_in_background() is not invoked, just
|
|
// the start_in_background() emits a warning into logs and resolves
|
|
step_fiber = run_in_background();
|
|
} catch (...) {
|
|
auto ex = std::current_exception();
|
|
auto ll = log_level::error;
|
|
try {
|
|
std::rethrow_exception(ex);
|
|
} catch (const seastar::sleep_aborted& e) {
|
|
ll = log_level::debug;
|
|
} catch (const seastar::abort_requested_exception& e) {
|
|
ll = log_level::debug;
|
|
} catch (const utils::barrier_aborted_exception& e) {
|
|
ll = log_level::debug;
|
|
}
|
|
vlogger.log(ll, "start aborted: {}", ex);
|
|
}
|
|
|
|
co_await std::move(step_fiber);
|
|
}
|
|
|
|
future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
|
|
_step_fiber = start_in_background(mm, std::move(barrier));
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> view_builder::drain() {
|
|
if (_as.abort_requested()) {
|
|
co_return;
|
|
}
|
|
vlogger.info("Draining view builder");
|
|
_as.request_abort();
|
|
co_await _mnotifier.unregister_listener(this);
|
|
co_await _ops_gate.close();
|
|
co_await _vug.drain();
|
|
co_await _sem.wait();
|
|
_sem.broken();
|
|
_build_step.broken();
|
|
co_await std::move(_step_fiber);
|
|
co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair<const table_id, build_step>& p) {
|
|
return p.second.reader.close();
|
|
});
|
|
}
|
|
|
|
future<> view_builder::stop() {
|
|
vlogger.info("Stopping view builder");
|
|
return drain();
|
|
}
|
|
|
|
view_builder::build_step& view_builder::get_or_create_build_step(table_id base_id) {
|
|
auto it = _base_to_build_step.find(base_id);
|
|
if (it == _base_to_build_step.end()) {
|
|
auto base = _db.find_column_family(base_id).shared_from_this();
|
|
auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())});
|
|
// Iterators could have been invalidated if there was rehashing, so just reset the cursor.
|
|
_current_step = p.first;
|
|
it = p.first;
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
future<> view_builder::initialize_reader_at_current_token(build_step& step) {
|
|
return step.reader.close().then([this, &step] {
|
|
step.pslice = make_partition_slice(*step.base->schema());
|
|
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
|
|
step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader(
|
|
step.base->schema(),
|
|
_permit,
|
|
step.prange,
|
|
step.pslice,
|
|
nullptr,
|
|
streamed_mutation::forwarding::no,
|
|
mutation_reader::forwarding::no);
|
|
});
|
|
}
|
|
|
|
void view_builder::load_view_status(view_builder::view_build_init_status status, std::unordered_set<table_id>& loaded_views) {
|
|
if (!status.first_token || !status.next_token) {
|
|
// No progress was made on this view, so we'll treat it as new.
|
|
return;
|
|
}
|
|
vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token);
|
|
loaded_views.insert(status.view->id());
|
|
if (*status.first_token == *status.next_token) {
|
|
// Completed, so nothing to do for this shard. Consider the view
|
|
// as loaded and not as a new view.
|
|
_built_views.emplace(status.view->id());
|
|
return;
|
|
}
|
|
get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(
|
|
view_build_status { status.view, *status.first_token, status.next_token });
|
|
}
|
|
|
|
void view_builder::reshard(
|
|
std::vector<std::vector<view_builder::view_build_init_status>> view_build_status_per_shard,
|
|
std::unordered_set<table_id>& loaded_views) {
|
|
// We must reshard. We aim for a simple algorithm, a step above not starting from scratch.
|
|
// Shards build entries at different paces, so both first and last tokens will differ. We
|
|
// want to be conservative when selecting the range that has been built. To do that, we
|
|
// select the intersection of all the previous shard's ranges for each view.
|
|
struct view_ptr_hash {
|
|
std::size_t operator()(const view_ptr& v) const noexcept {
|
|
return std::hash<table_id>()(v->id());
|
|
}
|
|
};
|
|
struct view_ptr_equals {
|
|
bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept {
|
|
return v1->id() == v2->id();
|
|
}
|
|
};
|
|
std::unordered_map<view_ptr, std::optional<interval<dht::token>>, view_ptr_hash, view_ptr_equals> my_status;
|
|
for (auto& shard_status : view_build_status_per_shard) {
|
|
for (auto& [view, first_token, next_token] : shard_status ) {
|
|
// We start from an open-ended range, which we'll try to restrict.
|
|
auto& my_range = my_status.emplace(
|
|
std::move(view),
|
|
interval<dht::token>::make_open_ended_both_sides()).first->second;
|
|
if (!first_token || !next_token || !my_range) {
|
|
// A previous shard made no progress, so for this view we'll start over.
|
|
my_range = std::nullopt;
|
|
continue;
|
|
}
|
|
if (*first_token == *next_token) {
|
|
// Completed, so don't consider this shard's progress. We know that if the view
|
|
// is marked as in-progress, then at least one shard will have a non-full range.
|
|
continue;
|
|
}
|
|
wrapping_interval<dht::token> other_range(*first_token, *next_token);
|
|
if (other_range.is_wrap_around(dht::token_comparator())) {
|
|
// The intersection of a wrapping range with a non-wrapping range may yield more
|
|
// multiple non-contiguous ranges. To avoid the complexity of dealing with more
|
|
// than one range, we'll just take one of the intersections.
|
|
auto [bottom_range, top_range] = other_range.unwrap();
|
|
if (auto bottom_int = my_range->intersection(interval(std::move(bottom_range)), dht::token_comparator())) {
|
|
my_range = std::move(bottom_int);
|
|
} else {
|
|
my_range = my_range->intersection(interval(std::move(top_range)), dht::token_comparator());
|
|
}
|
|
} else {
|
|
my_range = my_range->intersection(interval(std::move(other_range)), dht::token_comparator());
|
|
}
|
|
}
|
|
}
|
|
view_builder::base_to_build_step_type build_step;
|
|
for (auto& [view, opt_range] : my_status) {
|
|
if (!opt_range) {
|
|
continue; // Treat it as a new table.
|
|
}
|
|
auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token();
|
|
auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token();
|
|
auto s = view_build_init_status{std::move(view), std::move(start_bound), std::move(end_bound)};
|
|
load_view_status(std::move(s), loaded_views);
|
|
}
|
|
}
|
|
|
|
void view_builder::setup_shard_build_step(
|
|
view_builder_init_state& vbi,
|
|
std::vector<system_keyspace_view_name> built,
|
|
std::vector<system_keyspace_view_build_progress> in_progress) {
|
|
// Shard 0 makes cleanup changes to the system tables, but none that could conflict
|
|
// with the other shards; everyone is thus able to proceed independently.
|
|
auto maybe_fetch_view = [&, this] (system_keyspace_view_name& name) {
|
|
try {
|
|
auto s = _db.find_schema(name.first, name.second);
|
|
if (s->is_view()) {
|
|
auto view = view_ptr(std::move(s));
|
|
// This is a safety check in case this node missed a create MV statement
|
|
// but got a drop table for the base, and another node didn't get the
|
|
// drop notification and sent us the view schema.
|
|
if (_db.column_family_exists(view->view_info()->base_id())) {
|
|
return view;
|
|
}
|
|
}
|
|
// The view was dropped and a table was re-created with the same name,
|
|
// but the write to the view-related system tables didn't make it.
|
|
} catch (const replica::no_such_column_family&) {
|
|
// Fall-through
|
|
}
|
|
if (this_shard_id() == 0) {
|
|
vbi.bookkeeping_ops.push_back(remove_view_build_status(name.first, name.second));
|
|
vbi.bookkeeping_ops.push_back(_sys_ks.remove_built_view(name.first, name.second));
|
|
vbi.bookkeeping_ops.push_back(
|
|
_sys_ks.remove_view_build_progress_across_all_shards(
|
|
std::move(name.first),
|
|
std::move(name.second)));
|
|
}
|
|
return view_ptr(nullptr);
|
|
};
|
|
|
|
vbi.built_views = built
|
|
| std::views::transform(maybe_fetch_view)
|
|
| std::views::filter([] (const view_ptr& v) { return bool(v); })
|
|
| std::views::transform([] (const view_ptr& v) { return v->id(); })
|
|
| std::ranges::to<std::unordered_set<table_id>>();
|
|
|
|
for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) {
|
|
if (auto view = maybe_fetch_view(view_name)) {
|
|
if (vbi.built_views.contains(view->id())) {
|
|
if (this_shard_id() == 0) {
|
|
auto f = mark_view_build_success(std::move(view_name.first), std::move(view_name.second)).then([this, view = std::move(view)] {
|
|
return _sys_ks.remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
|
|
});
|
|
vbi.bookkeeping_ops.push_back(std::move(f));
|
|
}
|
|
continue;
|
|
}
|
|
vbi.status_per_shard.resize(std::max(vbi.status_per_shard.size(), size_t(cpu_id + 1)));
|
|
vbi.status_per_shard[cpu_id].emplace_back(view_build_init_status{
|
|
std::move(view),
|
|
std::move(first_token),
|
|
std::move(next_token_opt)});
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> view_builder::calculate_shard_build_step(view_builder_init_state& vbi) {
|
|
std::unordered_set<table_id> loaded_views;
|
|
if (vbi.status_per_shard.size() != smp::count) {
|
|
reshard(std::move(vbi.status_per_shard), loaded_views);
|
|
} else if (!vbi.status_per_shard.empty()) {
|
|
for (auto& status : vbi.status_per_shard[this_shard_id()]) {
|
|
load_view_status(std::move(status), loaded_views);
|
|
}
|
|
}
|
|
|
|
for (auto& [_, build_step] : _base_to_build_step) {
|
|
std::ranges::sort(build_step.build_status, std::ranges::less(), [] (const view_build_status& s) {
|
|
return *s.next_token;
|
|
});
|
|
if (!build_step.build_status.empty()) {
|
|
build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()};
|
|
}
|
|
}
|
|
|
|
auto all_views = _db.get_views();
|
|
auto doesnt_use_tablets = [&] (const view_ptr& v) {
|
|
return !_db.find_keyspace(v->ks_name()).uses_tablets();
|
|
};
|
|
auto is_new = [&] (const view_ptr& v) {
|
|
// This is a safety check in case this node missed a create MV statement
|
|
// but got a drop table for the base, and another node didn't get the
|
|
// drop notification and sent us the view schema.
|
|
return _db.column_family_exists(v->view_info()->base_id()) && !loaded_views.contains(v->id())
|
|
&& !vbi.built_views.contains(v->id());
|
|
};
|
|
for (auto&& view : all_views | std::views::filter(doesnt_use_tablets) | std::views::filter(is_new)) {
|
|
vbi.bookkeeping_ops.push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id())));
|
|
}
|
|
|
|
return parallel_for_each(_base_to_build_step, [this] (auto& p) {
|
|
return initialize_reader_at_current_token(p.second);
|
|
}).then([&vbi] {
|
|
return seastar::when_all_succeed(vbi.bookkeeping_ops.begin(), vbi.bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
|
|
vlogger.warn("Failed to update materialized view bookkeeping while synchronizing view builds on all shards ({}), continuing anyway.", ep);
|
|
});
|
|
});
|
|
}
|
|
|
|
service::query_state& view_builder_query_state() {
|
|
using namespace std::chrono_literals;
|
|
const auto t = 10s;
|
|
static timeout_config tc{ t, t, t, t, t, t, t };
|
|
static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
|
|
static thread_local service::query_state qs(cs, empty_service_permit());
|
|
return qs;
|
|
};
|
|
|
|
static future<> announce_with_raft(
|
|
cql3::query_processor& qp,
|
|
::service::raft_group0_client& group0_client,
|
|
seastar::abort_source& as,
|
|
noncopyable_function<future<mutation>(api::timestamp_type)> mutation_gen,
|
|
std::string_view description) {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
|
|
while (true) {
|
|
as.check();
|
|
|
|
auto guard = co_await group0_client.start_operation(as);
|
|
auto timestamp = guard.write_timestamp();
|
|
|
|
auto mut = co_await mutation_gen(guard.write_timestamp());
|
|
utils::chunked_vector<canonical_mutation> cmuts;
|
|
cmuts.emplace_back(std::move(mut));
|
|
|
|
auto group0_cmd = group0_client.prepare_command(
|
|
::service::write_mutations{
|
|
.mutations{std::move(cmuts)},
|
|
},
|
|
guard,
|
|
description
|
|
);
|
|
|
|
try {
|
|
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as, ::service::raft_timeout{});
|
|
} catch (::service::group0_concurrent_modification&) {
|
|
// retry
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_name) {
|
|
co_await write_view_build_status(
|
|
[this, ks_name, view_name] () -> future<> {
|
|
auto host_id = _db.get_token_metadata().get_my_id();
|
|
co_await announce_with_raft(_qp, _group0_client, _as, [this, ks_name = std::move(ks_name), view_name = std::move(view_name), host_id] (auto ts) {
|
|
return _sys_ks.make_view_build_status_mutation(ts, {ks_name, view_name}, host_id, build_status::STARTED);
|
|
}, "view builder: mark view build STARTED");
|
|
},
|
|
[this, ks_name, view_name] () -> future<> {
|
|
co_await _sys_dist_ks.start_view_build(std::move(ks_name), std::move(view_name));
|
|
}
|
|
);
|
|
}
|
|
|
|
future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_name) {
|
|
co_await write_view_build_status(
|
|
[this, ks_name, view_name] () -> future<> {
|
|
auto host_id = _db.get_token_metadata().get_my_id();
|
|
co_await announce_with_raft(_qp, _group0_client, _as, [this, ks_name = std::move(ks_name), view_name = std::move(view_name), host_id] (auto ts) {
|
|
return _sys_ks.make_view_build_status_update_mutation(ts, {ks_name, view_name}, host_id, build_status::SUCCESS);
|
|
}, "view builder: mark view build SUCCESS");
|
|
},
|
|
[this, ks_name, view_name] () -> future<> {
|
|
co_await _sys_dist_ks.finish_view_build(std::move(ks_name), std::move(view_name));
|
|
}
|
|
);
|
|
}
|
|
|
|
future<> view_builder::remove_view_build_status(sstring ks_name, sstring view_name) {
|
|
co_await write_view_build_status(
|
|
[this, ks_name, view_name] () -> future<> {
|
|
co_await announce_with_raft(_qp, _group0_client, _as, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (auto ts) {
|
|
return _sys_ks.make_remove_view_build_status_mutation(ts, {ks_name, view_name});
|
|
}, "view builder: delete view build status");
|
|
},
|
|
[this, ks_name, view_name] () -> future<> {
|
|
co_await _sys_dist_ks.remove_view(std::move(ks_name), std::move(view_name));
|
|
}
|
|
);
|
|
}
|
|
|
|
static future<std::unordered_map<locator::host_id, sstring>>
|
|
view_status_common(cql3::query_processor& qp, sstring ks_name, sstring cf_name, sstring view_ks_name, sstring view_name, db::consistency_level cl) {
|
|
return qp.execute_internal(
|
|
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", ks_name, cf_name),
|
|
cl,
|
|
view_builder_query_state(),
|
|
{ std::move(view_ks_name), std::move(view_name) },
|
|
cql3::query_processor::cache_internal::no).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
|
return *cql_result
|
|
| std::views::transform([] (const cql3::untyped_result_set::row& row) {
|
|
auto host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
|
auto status = row.get_as<sstring>("status");
|
|
return std::pair(std::move(host_id), std::move(status));
|
|
})
|
|
| std::ranges::to<std::unordered_map<locator::host_id, sstring>>();
|
|
});
|
|
}
|
|
|
|
future<std::unordered_map<locator::host_id, sstring>> view_builder::view_status(sstring ks_name, sstring view_name) const {
|
|
if (_view_build_status_on == view_build_status_location::group0) {
|
|
co_return co_await view_status_common(_qp, db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2,
|
|
std::move(ks_name), std::move(view_name), db::consistency_level::LOCAL_ONE);
|
|
} else {
|
|
co_return co_await view_status_common(_qp, db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::VIEW_BUILD_STATUS,
|
|
std::move(ks_name), std::move(view_name), db::consistency_level::ONE);
|
|
}
|
|
}
|
|
|
|
future<std::unordered_map<sstring, sstring>>
|
|
view_builder::view_build_statuses(sstring keyspace, sstring view_name, const gms::gossiper& gossiper) const {
|
|
std::unordered_map<locator::host_id, sstring> status = co_await view_status(std::move(keyspace), std::move(view_name));
|
|
std::unordered_map<sstring, sstring> status_map;
|
|
const auto& topo = _db.get_token_metadata().get_topology();
|
|
topo.for_each_node([&] (const locator::node& node) {
|
|
auto it = status.find(node.host_id());
|
|
auto s = it != status.end() ? std::move(it->second) : "UNKNOWN";
|
|
status_map.emplace(fmt::to_string(gossiper.get_address_map().get(node.host_id())), std::move(s));
|
|
});
|
|
co_return status_map;
|
|
}
|
|
|
|
future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
|
vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token());
|
|
if (this_shard_id() == 0) {
|
|
co_await mark_view_build_started(view->ks_name(), view->cf_name());
|
|
}
|
|
|
|
if (this_shard_id() == smp::count - 1) {
|
|
inject_failure("add_new_view_fail_last_shard");
|
|
}
|
|
|
|
co_await _sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), step.current_token());
|
|
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
|
|
}
|
|
|
|
static bool should_ignore_tablet_keyspace(const replica::database& db, const sstring& ks_name) {
|
|
return db.features().view_building_coordinator && db.has_keyspace(ks_name) && db.find_keyspace(ks_name).uses_tablets();
|
|
}
|
|
|
|
future<view_builder::view_builder_units> view_builder::get_or_adopt_view_builder_lock(view_builder_units_opt units) {
|
|
co_return units ? std::move(*units) : co_await get_units(_sem, view_builder_semaphore_units);
|
|
}
|
|
|
|
future<> view_builder::dispatch_create_view(sstring ks_name, sstring view_name) {
|
|
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
|
co_return;
|
|
}
|
|
|
|
auto units = co_await get_or_adopt_view_builder_lock(std::nullopt);
|
|
co_await handle_seed_view_build_progress(ks_name, view_name);
|
|
|
|
co_await coroutine::all(
|
|
[this, ks_name, view_name, units = std::move(units)] mutable -> future<> {
|
|
co_await handle_create_view_local(ks_name, view_name, std::move(units)); },
|
|
[this, ks_name, view_name] mutable -> future<> {
|
|
co_await container().invoke_on_others([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable -> future<> {
|
|
return vb.handle_create_view_local(ks_name, view_name, std::nullopt); }); });
|
|
}
|
|
|
|
future<> view_builder::handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name) {
|
|
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
|
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
|
return _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
|
|
}
|
|
|
|
future<> view_builder::handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units) {
|
|
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::move(units));
|
|
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
|
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
|
try {
|
|
co_await coroutine::all(
|
|
[&step] -> future<> {
|
|
co_await step.base->await_pending_writes(); },
|
|
[&step] -> future<> {
|
|
co_await step.base->await_pending_streams(); });
|
|
co_await flush_base(step.base, _as);
|
|
|
|
// This resets the build step to the current token. It may result in views currently
|
|
// being built to receive duplicate updates, but it simplifies things as we don't have
|
|
// to keep around a list of new views to build the next time the reader crosses a token
|
|
// threshold.
|
|
co_await initialize_reader_at_current_token(step);
|
|
co_await add_new_view(view, step);
|
|
} catch (abort_requested_exception&) {
|
|
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
|
} catch (raft::request_aborted&) {
|
|
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
|
} catch (...) {
|
|
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
|
|
}
|
|
|
|
_build_step.signal();
|
|
}
|
|
|
|
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
|
|
if (this_shard_id() != 0) {
|
|
return;
|
|
}
|
|
|
|
// Do it in the background, serialized and broadcast from shard 0.
|
|
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
|
return dispatch_create_view(std::move(ks_name), std::move(view_name));
|
|
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
|
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
|
|
}));
|
|
}
|
|
|
|
future<> view_builder::dispatch_update_view(sstring ks_name, sstring view_name) {
|
|
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
|
co_return;
|
|
}
|
|
|
|
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::nullopt);
|
|
|
|
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
|
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
|
if (step_it == _base_to_build_step.end()) {
|
|
co_return; // In case all the views for this CF have finished building already.
|
|
}
|
|
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
|
|
return bs.view->id() == view->id();
|
|
});
|
|
if (status_it != step_it->second.build_status.end()) {
|
|
status_it->view = std::move(view);
|
|
}
|
|
}
|
|
|
|
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
|
|
// Do it in the background, serialized.
|
|
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
|
return dispatch_update_view(std::move(ks_name), std::move(view_name));
|
|
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
|
try {
|
|
std::rethrow_exception(ep);
|
|
} catch (const seastar::gate_closed_exception&) {
|
|
vlogger.warn("Ignoring gate_closed_exception during view update {}.{}", ks_name, view_name);
|
|
} catch (const seastar::broken_named_semaphore&) {
|
|
vlogger.warn("Ignoring broken_named_semaphore during view update {}.{}", ks_name, view_name);
|
|
} catch (const replica::no_such_column_family&) {
|
|
vlogger.warn("Ignoring no_such_column_family during view update {}.{}", ks_name, view_name);
|
|
}
|
|
}));
|
|
}
|
|
|
|
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
|
|
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
|
co_return;
|
|
}
|
|
|
|
auto units = co_await get_or_adopt_view_builder_lock(std::nullopt);
|
|
|
|
co_await coroutine::all(
|
|
[this, ks_name, view_name, units = std::move(units)] mutable -> future<> {
|
|
co_await handle_drop_view_local(ks_name, view_name, std::move(units)); },
|
|
[this, ks_name, view_name] mutable -> future<> {
|
|
co_await container().invoke_on_others([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable -> future<> {
|
|
return vb.handle_drop_view_local(ks_name, view_name, std::nullopt); });});
|
|
co_await handle_drop_view_global_cleanup(ks_name, view_name);
|
|
}
|
|
|
|
future<> view_builder::handle_drop_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units) {
|
|
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::move(units));
|
|
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
|
|
|
|
for (auto& [_, step] : _base_to_build_step) {
|
|
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
|
|
continue;
|
|
}
|
|
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
|
|
if (it->view->cf_name() == view_name) {
|
|
_built_views.erase(it->view->id());
|
|
step.build_status.erase(it);
|
|
co_return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> view_builder::handle_drop_view_global_cleanup(const sstring& ks_name, const sstring& view_name) {
|
|
if (this_shard_id() != 0) {
|
|
co_return;
|
|
}
|
|
vlogger.info0("Starting view global cleanup {}.{}", ks_name, view_name);
|
|
|
|
try {
|
|
co_await coroutine::all(
|
|
[this, &ks_name, &view_name] -> future<> {
|
|
co_await _sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name); },
|
|
[this, &ks_name, &view_name] -> future<> {
|
|
co_await _sys_ks.remove_built_view(ks_name, view_name); },
|
|
[this, &ks_name, &view_name] -> future<> {
|
|
co_await remove_view_build_status(ks_name, view_name); });
|
|
} catch (...) {
|
|
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, std::current_exception());
|
|
}
|
|
}
|
|
|
|
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
|
|
if (this_shard_id() != 0) {
|
|
return;
|
|
}
|
|
|
|
// Do it in the background, serialized and broadcast from shard 0.
|
|
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
|
return dispatch_drop_view(std::move(ks_name), std::move(view_name));
|
|
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
|
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
|
|
}));
|
|
}
|
|
|
|
future<> view_builder::run_in_background() {
|
|
return seastar::async([this] {
|
|
exponential_backoff_retry r(1s, 1min);
|
|
while (!_as.abort_requested()) {
|
|
try {
|
|
_build_step.wait([this] { return !_base_to_build_step.empty(); }).get();
|
|
} catch (const seastar::broken_condition_variable&) {
|
|
return;
|
|
}
|
|
auto units = get_units(_sem, view_builder_semaphore_units).get();
|
|
++_stats.steps_performed;
|
|
try {
|
|
execute(_current_step->second, exponential_backoff_retry(1s, 1min));
|
|
r.reset();
|
|
} catch (const abort_requested_exception&) {
|
|
return;
|
|
} catch (...) {
|
|
++_current_step->second.base->cf_stats()->view_building_paused;
|
|
++_stats.steps_failed;
|
|
auto base = _current_step->second.base->schema();
|
|
vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception());
|
|
r.retry(_as).get();
|
|
initialize_reader_at_current_token(_current_step->second).get();
|
|
}
|
|
if (_current_step->second.build_status.empty()) {
|
|
auto base = _current_step->second.base->schema();
|
|
auto reader = std::move(_current_step->second.reader);
|
|
_current_step = _base_to_build_step.erase(_current_step);
|
|
reader.close().get();
|
|
} else {
|
|
++_current_step;
|
|
}
|
|
if (_current_step == _base_to_build_step.end()) {
|
|
_current_step = _base_to_build_step.begin();
|
|
}
|
|
}
|
|
}).handle_exception([] (std::exception_ptr ex) {
|
|
vlogger.warn("Unexcepted error executing build step: {}. Ignored.", ex);
|
|
});
|
|
}
|
|
|
|
future<> view_builder::generate_mutations_on_node_left(replica::database& db, db::system_keyspace& sys_ks, api::timestamp_type timestamp, locator::host_id host_id, utils::chunked_vector<canonical_mutation>& muts) {
|
|
// When a node is removed, we delete all its rows from the view_build_status table together with
|
|
// the topology update operation.
|
|
|
|
if (!db.features().view_build_status_on_group0) {
|
|
// We didn't upgrade to the v2 table yet. nothing to delete, and other nodes
|
|
// may not know about the v2 table.
|
|
co_return;
|
|
}
|
|
|
|
auto& qp = sys_ks.query_processor();
|
|
muts.reserve(muts.size() + db.get_views().size());
|
|
// We expect the table to have a row for each existing view, so generate delete mutations for all views.
|
|
for (auto& view : db.get_views()) {
|
|
if (should_ignore_tablet_keyspace(db, view->ks_name())) {
|
|
continue;
|
|
}
|
|
auto mut = co_await sys_ks.make_remove_view_build_status_on_host_mutation(timestamp, {view->ks_name(), view->cf_name()}, host_id);
|
|
muts.emplace_back(std::move(mut));
|
|
}
|
|
}
|
|
|
|
future<> view_builder::migrate_to_v1_5(locator::token_metadata_ptr tmptr, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as, service::group0_guard guard) {
|
|
// Update the view builder version to v1_5
|
|
auto version_mut = co_await sys_ks.make_view_builder_version_mutation(guard.write_timestamp(), db::system_keyspace::view_builder_version_t::v1_5);
|
|
|
|
// write the version as topology_change so that we can apply
|
|
// the change to the view_builder service in topology_state_load
|
|
service::topology_change change {
|
|
.mutations{canonical_mutation(std::move(version_mut))},
|
|
};
|
|
|
|
auto group0_cmd = group0_client.prepare_command(std::move(change), guard, "migrate view_build_status to v1_5");
|
|
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
|
}
|
|
|
|
future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as, service::group0_guard guard) {
|
|
inject_failure("view_builder_migrate_to_v2");
|
|
|
|
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::VIEW_BUILD_STATUS);
|
|
|
|
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
|
|
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
|
|
auto nodes_count = tmptr->get_normal_token_owners().size();
|
|
auto cl = db::consistency_level::ALL;
|
|
if (nodes_count == 1) {
|
|
cl = db::consistency_level::ONE;
|
|
} else if (nodes_count == 2) {
|
|
cl = db::consistency_level::TWO;
|
|
}
|
|
|
|
auto rows = co_await qp.execute_internal(
|
|
format("SELECT keyspace_name, view_name, host_id, status, WRITETIME(status) AS ts FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::VIEW_BUILD_STATUS),
|
|
cl,
|
|
view_builder_query_state(),
|
|
{},
|
|
cql3::query_processor::cache_internal::no);
|
|
|
|
auto col_names = schema->all_columns() | std::views::transform([] (const auto& col) {return col.name_as_cql_string(); }) | std::ranges::to<std::vector<sstring>>();
|
|
auto col_names_str = fmt::to_string(fmt::join(col_names, ", "));
|
|
sstring val_binders_str = "?";
|
|
for (size_t i = 1; i < col_names.size(); ++i) {
|
|
val_binders_str += ", ?";
|
|
}
|
|
|
|
utils::chunked_vector<mutation> migration_muts;
|
|
migration_muts.reserve(rows->size() + 1);
|
|
|
|
// Insert all valid rows into the new table.
|
|
// Note the tables have the same schema.
|
|
for (const auto& row: *rows) {
|
|
// Skip adding the row if it doesn't belong to a known node.
|
|
// In the v1 table we may have left over rows that belong to nodes that were removed
|
|
// and we didn't clean them, so do that now.
|
|
auto host_id = row.get_as<utils::UUID>("host_id");
|
|
if (!tmptr->get_topology().find_node(locator::host_id(host_id))) {
|
|
vlogger.warn("Dropping a row from view_build_status: host {} does not exist", host_id);
|
|
continue;
|
|
}
|
|
|
|
// Skip adding left over rows that don't belong to known views.
|
|
auto ks_name = row.get_as<sstring>("keyspace_name");
|
|
auto view_name = row.get_as<sstring>("view_name");
|
|
if (!sys_ks.local_db().has_schema(ks_name, view_name)) {
|
|
vlogger.warn("Dropping a row from view_build_status: view {}.{} does not exist", ks_name, view_name);
|
|
continue;
|
|
}
|
|
|
|
std::vector<data_value_or_unset> values;
|
|
for (const auto& col: schema->all_columns()) {
|
|
if (row.has(col.name_as_text())) {
|
|
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
|
} else {
|
|
values.push_back(unset_value{});
|
|
}
|
|
}
|
|
|
|
// keep the row timestamp so it won't overwrite newer writes
|
|
auto row_ts = row.get_as<api::timestamp_type>("ts");
|
|
|
|
auto muts = co_await qp.get_mutations_internal(
|
|
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
|
|
db::system_keyspace::NAME,
|
|
db::system_keyspace::VIEW_BUILD_STATUS_V2,
|
|
col_names_str,
|
|
val_binders_str),
|
|
view_builder_query_state(),
|
|
row_ts,
|
|
std::move(values));
|
|
if (muts.size() != 1) {
|
|
on_internal_error(vlogger, format("expecting single insert mutation, got {}", muts.size()));
|
|
}
|
|
migration_muts.push_back(std::move(muts[0]));
|
|
}
|
|
|
|
// Update the view builder version to v2
|
|
auto version_mut = co_await sys_ks.make_view_builder_version_mutation(guard.write_timestamp(), db::system_keyspace::view_builder_version_t::v2);
|
|
migration_muts.push_back(std::move(version_mut));
|
|
|
|
// write the version as topology_change so that we can apply
|
|
// the change to the view_builder service in topology_state_load
|
|
service::topology_change change {
|
|
.mutations{migration_muts.begin(), migration_muts.end()},
|
|
};
|
|
|
|
auto group0_cmd = group0_client.prepare_command(std::move(change), guard, "migrate view_build_status to v2");
|
|
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
|
}
|
|
|
|
future<> view_builder::upgrade_to_v1_5() {
|
|
if (_view_build_status_on == view_build_status_location::sys_dist_ks) {
|
|
// drain all write operations to the old table and start writing to both tables.
|
|
// note that we wait here only for operations that access the dist table and not group0, otherwise
|
|
// we get a deadlock.
|
|
_view_build_status_on = view_build_status_location::both;
|
|
co_await _upgrade_phaser.advance_and_await();
|
|
}
|
|
}
|
|
|
|
future<> view_builder::upgrade_to_v2() {
|
|
if (_view_build_status_on == view_build_status_location::group0) {
|
|
co_return;
|
|
}
|
|
|
|
_view_build_status_on = view_build_status_location::group0;
|
|
|
|
if (_init_virtual_table_on_upgrade) {
|
|
init_virtual_table();
|
|
}
|
|
}
|
|
|
|
void view_builder::init_virtual_table() {
|
|
if (_view_build_status_on != view_build_status_location::group0) {
|
|
// we didn't upgrade to v2 yet. defer the operation
|
|
_init_virtual_table_on_upgrade = true;
|
|
return;
|
|
}
|
|
|
|
// We set the old system_distributed.view_build_status table to read virtually
|
|
// from system.view_build_status_v2 in order to make the transition transparent for
|
|
// readers of the table and maintain compatibility.
|
|
|
|
auto ms_v2 = mutation_source([this] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& ps,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
|
|
// The v1 distributed table and the v2 system local table have different shard mapping.
|
|
// The v2 table uses the null sharder, everything is on shard zero, while the v1 table
|
|
// has the normal sharder.
|
|
// So to simulate a read in v1 table in some shard, we need to read all the rows
|
|
// belonging to this shard from v2 table in shard zero.
|
|
// In order to read the table in a different shard we use the multishard reader. It is
|
|
// set to read from all the shards, but actually it only has rows to read in shard zero.
|
|
// Then we also need to filter only the keys belonging to this shard according to v1 sharder.
|
|
|
|
auto& table_v2 = _db.find_column_family(db::system_keyspace::view_build_status_v2());
|
|
|
|
mutation_reader ms_reader = make_multishard_combining_reader(
|
|
seastar::make_shared<streaming_reader_lifecycle_policy>(_db.container(), table_v2.schema()->id(), gc_clock::now()),
|
|
table_v2.schema(),
|
|
table_v2.get_effective_replication_map(),
|
|
std::move(permit),
|
|
pr, ps, trace_state, fwd_mr);
|
|
|
|
auto& sharder_v1 = s->get_sharder();
|
|
auto filter_fn = [&sharder_v1, shard_id = this_shard_id()] (const dht::decorated_key& dk) {
|
|
return sharder_v1.shard_for_reads(dk.token()) == shard_id;
|
|
};
|
|
|
|
return make_filtering_reader(std::move(ms_reader), std::move(filter_fn));
|
|
});
|
|
|
|
auto& table_v1 = _db.find_column_family(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::VIEW_BUILD_STATUS);
|
|
table_v1.set_virtual_reader(std::move(ms_v2));
|
|
|
|
// ignore writes to the table
|
|
table_v1.set_virtual_writer([&] (const frozen_mutation&) -> future<> {
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
// Called in the context of a seastar::thread.
|
|
class view_builder::consumer : public view_consumer {
|
|
public:
|
|
struct built_views {
|
|
build_step& step;
|
|
std::vector<view_build_status> views;
|
|
|
|
built_views(build_step& step)
|
|
: step(step) {
|
|
}
|
|
|
|
built_views(built_views&& other)
|
|
: step(other.step)
|
|
, views(std::move(other.views)) {
|
|
}
|
|
|
|
~built_views() {
|
|
for (auto&& status : views) {
|
|
// Use step.current_token(), which may have wrapped around and become < first_token.
|
|
step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()});
|
|
}
|
|
}
|
|
|
|
void release() {
|
|
views.clear();
|
|
}
|
|
};
|
|
|
|
private:
|
|
view_builder& _builder;
|
|
build_step& _step;
|
|
built_views _built_views;
|
|
|
|
protected:
|
|
virtual void check_for_built_views() override {
|
|
inject_failure("view_builder_check_for_built_views");
|
|
for (auto it = _step.build_status.begin(); it != _step.build_status.end();) {
|
|
// A view starts being built at token t1. Due to resharding, that may not necessarily be a
|
|
// shard-owned token. We finish building the view when the next_token to build is just before
|
|
// (or at) the first token, but the shard-owned current token is after (or at) the first token.
|
|
// In the system tables, we set first_token = next_token to signal the completion of the build
|
|
// process in case of a restart.
|
|
if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) {
|
|
_built_views.views.push_back(std::move(*it));
|
|
it = _step.build_status.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
}
|
|
virtual void load_views_to_build() override {
|
|
inject_failure("view_builder_load_views");
|
|
for (auto&& vs : _step.build_status) {
|
|
if (_step.current_token() >= vs.next_token) {
|
|
if (partition_key_matches(_builder.get_db().as_data_dictionary(), *_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
|
|
_views_to_build.push_back(vs.view);
|
|
}
|
|
if (vs.next_token || _step.current_token() != vs.first_token) {
|
|
vs.next_token = _step.current_key.token();
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
virtual bool should_stop_consuming_end_of_partition() override {
|
|
return _step.build_status.empty();
|
|
}
|
|
|
|
virtual dht::decorated_key& get_current_key() override {
|
|
return _step.current_key;
|
|
}
|
|
virtual void set_current_key(dht::decorated_key key) override {
|
|
_step.current_key = std::move(key);
|
|
}
|
|
|
|
virtual lw_shared_ptr<replica::table> base() override {
|
|
return _step.base;
|
|
}
|
|
virtual mutation_reader& reader() override {
|
|
return _step.reader;
|
|
}
|
|
virtual reader_permit& permit() override {
|
|
return _builder._permit;
|
|
}
|
|
|
|
public:
|
|
consumer(view_builder& builder, shared_ptr<view_update_generator> gen, build_step& step, gc_clock::time_point now)
|
|
: view_consumer(std::move(gen), now, builder._as)
|
|
, _builder(builder)
|
|
, _step(step)
|
|
, _built_views{step} {
|
|
if (!step.current_key.key().is_empty(*_step.reader.schema())) {
|
|
load_views_to_build();
|
|
}
|
|
}
|
|
|
|
stop_iteration consume_new_partition(const dht::decorated_key& dk) {
|
|
inject_failure("view_builder_consume_new_partition");
|
|
if (dk.key().is_empty()) {
|
|
on_internal_error(vlogger, format("Trying to consume empty partition key {}", dk));
|
|
}
|
|
set_current_key(std::move(dk));
|
|
check_for_built_views();
|
|
_views_to_build.clear();
|
|
load_views_to_build();
|
|
return stop_iteration(_views_to_build.empty());
|
|
}
|
|
|
|
stop_iteration consume(tombstone) {
|
|
inject_failure("view_builder_consume_tombstone");
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
stop_iteration consume(static_row&& sr, tombstone, bool) {
|
|
inject_failure("view_builder_consume_static_row");
|
|
if (_views_to_build.empty() || _builder._as.abort_requested()) {
|
|
return stop_iteration::yes;
|
|
}
|
|
|
|
add_fragment(std::move(sr));
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live) {
|
|
inject_failure("view_builder_consume_clustering_row");
|
|
if (!is_live) {
|
|
return stop_iteration::no;
|
|
}
|
|
if (_views_to_build.empty() || _builder._as.abort_requested()) {
|
|
return stop_iteration::yes;
|
|
}
|
|
|
|
add_fragment(std::move(cr));
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
void add_fragment(auto&& fragment) {
|
|
_fragments_memory_usage += fragment.memory_usage(*reader().schema());
|
|
_fragments.emplace_back(*reader().schema(), permit(), std::move(fragment));
|
|
if (_fragments_memory_usage > batch_memory_max) {
|
|
// Although we have not yet completed the batch of base rows that
|
|
// compact_for_query<> planned for us (view_builder::batchsize),
|
|
// we've still collected enough rows to reach sizeable memory use,
|
|
// so let's flush these rows now.
|
|
flush_fragments();
|
|
}
|
|
}
|
|
|
|
stop_iteration consume(range_tombstone_change&&) {
|
|
inject_failure("view_builder_consume_range_tombstone");
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
void flush_fragments() {
|
|
inject_failure("view_builder_flush_fragments");
|
|
_builder._as.check();
|
|
if (!_fragments.empty()) {
|
|
_fragments.emplace_front(*reader().schema(), permit(), partition_start(get_current_key(), tombstone()));
|
|
auto base_schema = base()->schema();
|
|
auto fragments_reader = make_mutation_reader_from_fragments(reader().schema(), permit(), std::move(_fragments));
|
|
auto close_reader = defer([&fragments_reader] { fragments_reader.close().get(); });
|
|
fragments_reader.upgrade_schema(base_schema);
|
|
_gen->populate_views(
|
|
*base(),
|
|
_views_to_build,
|
|
get_current_key().token(),
|
|
std::move(fragments_reader),
|
|
_now).get();
|
|
close_reader.cancel();
|
|
_fragments.clear();
|
|
_fragments_memory_usage = 0;
|
|
}
|
|
}
|
|
|
|
stop_iteration consume_end_of_partition() {
|
|
inject_failure("view_builder_consume_end_of_partition");
|
|
utils::get_local_injector().inject("view_builder_consume_end_of_partition_delay", utils::wait_for_message(std::chrono::seconds(60))).get();
|
|
flush_fragments();
|
|
return stop_iteration(should_stop_consuming_end_of_partition());
|
|
}
|
|
|
|
// Must be called in a seastar thread.
|
|
built_views consume_end_of_stream() {
|
|
inject_failure("view_builder_consume_end_of_stream");
|
|
if (vlogger.is_enabled(log_level::debug)) {
|
|
auto view_names = _views_to_build | std::views::transform([](auto v) {
|
|
return v->cf_name();
|
|
}) | std::ranges::to<std::vector<sstring>>();
|
|
vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(),
|
|
_step.base->schema()->cf_name(), _step.current_token(), view_names);
|
|
}
|
|
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
|
|
// before going back to the minimum token, advance current_key to the end
|
|
// and check for built views in that range.
|
|
_step.current_key = { _step.prange.end().value_or(dht::ring_position::max()).value().token(), partition_key::make_empty()};
|
|
check_for_built_views();
|
|
|
|
_step.current_key = {dht::minimum_token(), partition_key::make_empty()};
|
|
for (auto&& vs : _step.build_status) {
|
|
vs.next_token = dht::minimum_token();
|
|
}
|
|
_builder.initialize_reader_at_current_token(_step).get();
|
|
check_for_built_views();
|
|
}
|
|
return std::move(_built_views);
|
|
}
|
|
};
|
|
|
|
// Called in the context of a seastar::thread.
|
|
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
|
inject_failure("dont_start_build_step");
|
|
gc_clock::time_point now = gc_clock::now();
|
|
auto compaction_state = make_lw_shared<compact_for_query_state>(
|
|
*step.reader.schema(),
|
|
now,
|
|
step.pslice,
|
|
batch_size,
|
|
query::max_partitions,
|
|
tombstone_gc_state::no_gc());
|
|
auto consumer = compact_for_query<view_builder::consumer>(compaction_state, view_builder::consumer{*this, _vug.shared_from_this(), step, now});
|
|
auto built = step.reader.consume_in_thread(std::move(consumer));
|
|
if (auto ds = std::move(*compaction_state).detach_state()) {
|
|
if (ds->current_tombstone) {
|
|
step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(*ds->current_tombstone)));
|
|
}
|
|
step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(ds->partition_start)));
|
|
}
|
|
|
|
_as.check();
|
|
|
|
std::vector<future<>> bookkeeping_ops;
|
|
bookkeeping_ops.reserve(built.views.size() + step.build_status.size());
|
|
for (auto& [view, first_token, _] : built.views) {
|
|
bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token));
|
|
}
|
|
built.release();
|
|
for (auto& [view, _, next_token] : step.build_status) {
|
|
if (next_token) {
|
|
bookkeeping_ops.push_back(
|
|
_sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), *next_token));
|
|
}
|
|
}
|
|
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
|
|
vlogger.warn("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
|
}).get();
|
|
utils::get_local_injector().inject("delay_finishing_build_step", utils::wait_for_message(60s)).get();
|
|
}
|
|
|
|
future<> view_builder::mark_as_built(view_ptr view) {
|
|
return seastar::when_all_succeed(
|
|
_sys_ks.mark_view_as_built(view->ks_name(), view->cf_name()),
|
|
mark_view_build_success(view->ks_name(), view->cf_name())).discard_result();
|
|
}
|
|
|
|
future<> view_builder::mark_existing_views_as_built() {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
auto views = _db.get_views();
|
|
co_await coroutine::parallel_for_each(views | std::views::filter([this] (view_ptr& v) {
|
|
return !should_ignore_tablet_keyspace(_db, v->ks_name());
|
|
}), [this] (view_ptr& view) {
|
|
return mark_as_built(view);
|
|
});
|
|
}
|
|
|
|
future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) {
|
|
_built_views.emplace(view->id());
|
|
vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name());
|
|
return container().map_reduce0(
|
|
[view_id = view->id()] (view_builder& builder) {
|
|
return builder._built_views.contains(view_id);
|
|
},
|
|
true,
|
|
[] (bool result, bool shard_complete) {
|
|
return result && shard_complete;
|
|
}).then([this, view, next_token = std::move(next_token)] (bool built) {
|
|
if (built) {
|
|
inject_failure("view_builder_mark_view_as_built");
|
|
return container().invoke_on_all([view_id = view->id()] (view_builder& builder) {
|
|
if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) {
|
|
return make_ready_future<>();
|
|
}
|
|
auto view = builder._db.find_schema(view_id);
|
|
vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name());
|
|
return builder.mark_as_built(view_ptr(view)).then([&builder, view] {
|
|
// The view is built, so shard 0 can remove the entry in the build progress system table on
|
|
// behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries.
|
|
return builder._sys_ks.remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name());
|
|
}).then([&builder, view] {
|
|
auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name()));
|
|
if (it != builder._build_notifiers.end()) {
|
|
it->second.set_value();
|
|
}
|
|
});
|
|
});
|
|
}
|
|
return _sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), next_token);
|
|
});
|
|
}
|
|
|
|
future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name) {
|
|
return container().invoke_on(0, [ks_name, view_name] (view_builder& builder) {
|
|
auto v = std::pair(std::move(ks_name), std::move(view_name));
|
|
return builder._build_notifiers[std::move(v)].get_shared_future();
|
|
});
|
|
}
|
|
|
|
void node_update_backlog::add(update_backlog backlog) {
|
|
_backlogs[this_shard_id()].backlog.store(backlog, std::memory_order_relaxed);
|
|
_backlogs[this_shard_id()].need_publishing = need_publishing::yes;
|
|
}
|
|
|
|
update_backlog node_update_backlog::fetch() {
|
|
auto now = clock::now();
|
|
if (now >= _last_update.load(std::memory_order_relaxed) + _interval) {
|
|
_last_update.store(now, std::memory_order_relaxed);
|
|
auto new_max = std::ranges::fold_left(
|
|
_backlogs,
|
|
update_backlog::no_backlog(),
|
|
[] (const update_backlog& lhs, const per_shard_backlog& rhs) {
|
|
return std::max(lhs, rhs.load());
|
|
});
|
|
_max.store(new_max, std::memory_order_relaxed);
|
|
return new_max;
|
|
}
|
|
// If we perform a shard-aware write, we can read the backlog of the current shard,
|
|
// which was just updated.
|
|
// We still need to compare it to the max, aggregated from all shards, which might
|
|
// still be higher despite being most likely slightly outdated.
|
|
return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed));
|
|
}
|
|
|
|
future<std::optional<update_backlog>> node_update_backlog::fetch_if_changed() {
|
|
_last_update.store(clock::now(), std::memory_order_relaxed);
|
|
auto [np, max] = co_await map_reduce(std::views::iota(0u, smp::count),
|
|
[this] (shard_id shard) {
|
|
return smp::submit_to(shard, [this, shard] {
|
|
// Even if the shard's backlog didn't change, we still need to take it into account when calculating the new max.
|
|
return std::make_pair(std::exchange(_backlogs[shard].need_publishing, need_publishing::no), fetch_shard(shard));
|
|
});
|
|
},
|
|
std::make_pair(need_publishing::no, db::view::update_backlog::no_backlog()),
|
|
[] (std::pair<need_publishing, db::view::update_backlog> a, std::pair<need_publishing, db::view::update_backlog> b) {
|
|
return std::make_pair(a.first || b.first, std::max(a.second, b.second));
|
|
});
|
|
_max.store(max, std::memory_order_relaxed);
|
|
co_return np ? std::make_optional(max) : std::nullopt;
|
|
}
|
|
|
|
update_backlog node_update_backlog::fetch_shard(unsigned shard) {
|
|
return _backlogs[shard].backlog.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
future<bool> view_builder::check_view_build_ongoing(const locator::token_metadata& tm, const sstring& ks_name, const sstring& cf_name) {
|
|
using view_statuses_type = std::unordered_map<locator::host_id, sstring>;
|
|
return view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
|
|
return std::ranges::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
|
|
// Only consider status of known hosts.
|
|
return view_status.second == "STARTED" && tm.get_topology().find_node(view_status.first);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table) {
|
|
return _vug.register_staging_sstable(std::move(sst), std::move(table));
|
|
}
|
|
|
|
future<sstable_destination_decision> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) {
|
|
if (is_internal_keyspace(t.schema()->ks_name())) {
|
|
co_return sstable_destination_decision::normal_directory;
|
|
}
|
|
|
|
if (vb.get_db().find_keyspace(t.schema()->ks_name()).uses_tablets()) {
|
|
// views are managed by view building coordinator
|
|
if (t.views().empty()) {
|
|
co_return sstable_destination_decision::normal_directory;
|
|
}
|
|
|
|
auto build_status_map = co_await vb.get_sys_ks().get_view_build_status_map();
|
|
auto views_names = t.views()
|
|
| std::views::transform([] (const view_ptr& v) { return std::make_pair(v->ks_name(), v->cf_name()); })
|
|
| std::ranges::to<std::set>();
|
|
|
|
bool any_started_building = false;
|
|
bool any_started = false;
|
|
bool all_success = true;
|
|
for (auto& [view_name, statuses]: build_status_map) {
|
|
if (!views_names.contains(view_name)) {
|
|
continue;
|
|
}
|
|
|
|
any_started_building = true;
|
|
any_started = any_started || std::ranges::any_of(statuses | std::views::values, [] (const build_status& status) {
|
|
return status == build_status::STARTED;
|
|
});
|
|
all_success = all_success && std::ranges::all_of(statuses | std::views::values, [] (const build_status& status) {
|
|
return status == build_status::SUCCESS;
|
|
});
|
|
}
|
|
|
|
if (!any_started_building) {
|
|
// If all of the views didn't start building yet (and none of them is finished building)
|
|
// the sstable can go to normal directory and no view update will be lost
|
|
co_return sstable_destination_decision::normal_directory;
|
|
} else if (any_started) {
|
|
// If any of the views started building and didn't finished yet, we need to use view building
|
|
// coordinator to schedule staging sstables processing to control if replica is not in a pending state.
|
|
co_return sstable_destination_decision::staging_managed_by_vbc;
|
|
} else if (all_success) {
|
|
// If all of the views were built, the sstable can be registered to view update generator directly (in case of `stream_reason::repair`).
|
|
co_return reason == streaming::stream_reason::repair ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory;
|
|
}
|
|
// This should be unreachable, reaching this point would mean that,
|
|
// any of the views started building, none of the status is `STARTED` and not all statuses are `SUCCESS`.
|
|
// Since there are only 2 statuses: STARTED and SUCCESS, this is unreachable.
|
|
on_internal_error(vlogger, fmt::format("check_needs_view_update_path() reached unreachable branch. any_started_building: {} | any_started: {} | all_success: {}", any_started_building, any_started, all_success));
|
|
} else {
|
|
// views are managed by view builder
|
|
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
|
co_return sstable_destination_decision::staging_directly_to_generator;
|
|
}
|
|
|
|
auto any_view_build_ongoing = co_await map_reduce(t.views(),
|
|
[&] (const view_ptr& view) { return vb.check_view_build_ongoing(*tmptr, view->ks_name(), view->cf_name()); },
|
|
false,
|
|
std::logical_or<bool>());
|
|
co_return any_view_build_ongoing ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory;
|
|
}
|
|
}
|
|
|
|
void view_updating_consumer::do_flush_buffer() {
|
|
_staging_reader_handle.pause();
|
|
|
|
if (_buffer.front().partition().empty()) {
|
|
// If we flushed mid-partition we can have an empty mutation if we
|
|
// flushed right before getting the end-of-partition fragment.
|
|
_buffer.pop_front();
|
|
}
|
|
|
|
while (!_buffer.empty()) {
|
|
try {
|
|
auto lock_holder = _view_update_pusher(std::move(_buffer.front())).get();
|
|
} catch (...) {
|
|
vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
|
}
|
|
_buffer.pop_front();
|
|
}
|
|
|
|
_buffer_size = 0;
|
|
}
|
|
|
|
void view_updating_consumer::flush_builder() {
|
|
_buffer.emplace_back(_mut_builder->flush());
|
|
}
|
|
|
|
void view_updating_consumer::end_builder() {
|
|
_mut_builder->consume_end_of_partition();
|
|
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
|
|
_buffer.emplace_back(std::move(*mut_opt));
|
|
}
|
|
_mut_builder.reset();
|
|
}
|
|
|
|
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
|
|
if (_buffer_size >= _buffer_size_hard_limit) {
|
|
flush_builder();
|
|
do_flush_buffer();
|
|
}
|
|
}
|
|
|
|
view_updating_consumer::view_updating_consumer(view_update_generator& gen, schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
|
|
evictable_reader_handle& staging_reader_handle)
|
|
: view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle,
|
|
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables), gen = gen.shared_from_this()] (mutation m) mutable {
|
|
auto s = m.schema();
|
|
return table->stream_view_replica_updates(gen, std::move(s), std::move(m), db::no_timeout, excluded_sstables);
|
|
})
|
|
{ }
|
|
|
|
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
|
|
: _proxy(proxy)
|
|
, _state(state)
|
|
, _timeout_duration(timeout_duration)
|
|
, _view(view)
|
|
, _view_table(_proxy.get_db().local().find_column_family(view))
|
|
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
|
|
, _view_pk()
|
|
, _concurrency_semaphore(concurrency)
|
|
, _ex(ex)
|
|
{}
|
|
|
|
|
|
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
|
|
try {
|
|
_gate.close().get();
|
|
} catch (...) {
|
|
// Closing the gate should never throw, but if it does anyway, capture the exception.
|
|
_ex = std::current_exception();
|
|
}
|
|
}
|
|
|
|
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
|
|
SCYLLA_ASSERT(thread::running_in_thread());
|
|
_view_pk = key;
|
|
}
|
|
|
|
// Assumes running in seastar::thread
|
|
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
|
|
auto units = get_units(_concurrency_semaphore, 1).get();
|
|
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
|
|
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
|
|
if (f.failed()) {
|
|
_ex = f.get_exception();
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
|
|
auto view_exploded_pk = pk.explode();
|
|
auto view_exploded_ck = ck.explode();
|
|
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
|
|
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
|
|
std::flat_map<const column_definition*, bytes> view_key_cols_not_in_base_key;
|
|
for (const column_definition& view_cdef : _view->all_columns()) {
|
|
const column_definition* base_cdef = _base_schema->get_column_definition(view_cdef.name());
|
|
if (base_cdef) {
|
|
std::vector<bytes>& view_exploded_key = view_cdef.is_partition_key() ? view_exploded_pk : view_exploded_ck;
|
|
if (base_cdef->is_partition_key()) {
|
|
base_exploded_pk[base_cdef->id] = view_exploded_key[view_cdef.id];
|
|
} else if (base_cdef->is_clustering_key()) {
|
|
base_exploded_ck[base_cdef->id] = view_exploded_key[view_cdef.id];
|
|
} else if (!base_cdef->is_computed() && view_cdef.is_primary_key()) {
|
|
view_key_cols_not_in_base_key[base_cdef] = view_exploded_key[view_cdef.id];
|
|
}
|
|
}
|
|
}
|
|
partition_key base_pk = partition_key::from_exploded(base_exploded_pk);
|
|
clustering_key base_ck = clustering_key::from_exploded(base_exploded_ck);
|
|
|
|
dht::partition_range_vector partition_ranges({dht::partition_range::make_singular(dht::decorate_key(*_base_schema, base_pk))});
|
|
auto selection = cql3::selection::selection::for_columns(_base_schema,
|
|
view_key_cols_not_in_base_key.empty() ? std::vector<const column_definition*>({&_base_schema->partition_key_columns().front()}) : view_key_cols_not_in_base_key.keys());
|
|
|
|
std::vector<query::clustering_range> bounds{query::clustering_range::make_singular(base_ck)};
|
|
utils::small_vector<column_id, 8> view_key_col_ids;
|
|
for (const column_definition* col_def : view_key_cols_not_in_base_key.keys()) {
|
|
view_key_col_ids.push_back(col_def->id);
|
|
}
|
|
query::partition_slice partition_slice(std::move(bounds), {}, std::move(view_key_col_ids), selection->get_query_options());
|
|
auto command = ::make_lw_shared<query::read_command>(_base_schema->id(), _base_schema->version(), partition_slice,
|
|
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
|
|
auto timeout = db::timeout_clock::now() + _timeout_duration;
|
|
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
|
|
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
|
|
query::result& result = *base_qr.query_result;
|
|
auto delete_ghost_row = [&]() -> future<> {
|
|
mutation m(_view, pk);
|
|
auto& row = m.partition().clustered_row(*_view, ck);
|
|
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
|
|
timeout = db::timeout_clock::now() + _timeout_duration;
|
|
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
|
|
};
|
|
if (result.row_count().value_or(0) == 0) {
|
|
co_await delete_ghost_row();
|
|
} else if (!view_key_cols_not_in_base_key.empty()) {
|
|
if (result.row_count().value_or(0) != 1) {
|
|
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
|
|
}
|
|
auto results = query::result_set::from_raw_result(_base_schema, partition_slice, result);
|
|
auto& base_row = results.row(0);
|
|
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
|
|
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
|
|
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
|
|
co_await delete_ghost_row();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
std::chrono::microseconds calculate_view_update_throttling_delay(db::view::update_backlog backlog,
|
|
db::timeout_clock::time_point timeout,
|
|
uint32_t view_flow_control_delay_limit_in_ms) {
|
|
auto adjust = [] (float x) { return x * x * x; };
|
|
auto budget = std::max(service::storage_proxy::clock_type::duration(0),
|
|
timeout - service::storage_proxy::clock_type::now());
|
|
std::chrono::microseconds ret(uint32_t(adjust(backlog.relative_size()) * view_flow_control_delay_limit_in_ms * 1000));
|
|
// "budget" has millisecond resolution and can potentially be long
|
|
// in the future so converting it to microseconds may overflow.
|
|
// So to compare buget and ret we need to convert both to the lower
|
|
// resolution.
|
|
if (std::chrono::duration_cast<service::storage_proxy::clock_type::duration>(ret) < budget) {
|
|
return ret;
|
|
} else {
|
|
// budget is small (< ret) so can be converted to microseconds
|
|
return std::chrono::duration_cast<std::chrono::microseconds>(budget);
|
|
}
|
|
}
|
|
|
|
build_status build_status_from_string(std::string_view str) {
|
|
if (str == "STARTED") {
|
|
return build_status::STARTED;
|
|
}
|
|
if (str == "SUCCESS") {
|
|
return build_status::SUCCESS;
|
|
}
|
|
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", str));
|
|
}
|
|
|
|
sstring build_status_to_sstring(build_status status) {
|
|
switch (status) {
|
|
case build_status::STARTED:
|
|
return "STARTED";
|
|
case build_status::SUCCESS:
|
|
return "SUCCESS";
|
|
}
|
|
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
|
|
}
|
|
|
|
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name, locator::token_metadata_ptr tmptr) {
|
|
const auto& rs = db.find_keyspace(keyspace_name).get_replication_strategy();
|
|
|
|
if (rs.uses_tablets() && !db.features().views_with_tablets) {
|
|
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
|
|
"To be able to use them, make sure all nodes in the cluster are upgraded.");
|
|
}
|
|
|
|
try {
|
|
locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs);
|
|
} catch (const std::invalid_argument& e) {
|
|
throw std::logic_error(fmt::format(
|
|
"Materialized views and secondary indexes are not supported on the keyspace '{}': {}",
|
|
keyspace_name, e.what()));
|
|
}
|
|
}
|
|
|
|
} // namespace view
|
|
} // namespace db
|