mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge '[Backport 6.1] schema_tables: calculate_schema_digest: prevent stalls due to large m…' from ScyllaDB
…utations vector With a large number of table the schema mutations vector might get big enoug to cause reactor stalls when freed. For example, the following stall was hit on 2023.1.0~rc1-20230208.fe3cc281ec73 with 5000 tables: ``` (inlined by) ~vector at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/stl_vector.h:730 (inlined by) db::schema_tables::calculate_schema_digest(seastar::sharded<service::storage_proxy>&, enum_set<super_enum<db::schema_feature, (db::schema_feature)0, (db::schema_feature)1, (db::schema_feature)2, (db::schema_feature)3, (db::schema_feature)4, (db::schema_feature)5, (db::schema_feature)6, (db::schema_feature)7> >, seastar::noncopyable_function<bool (std::basic_string_view<char, std::char_traits<char> >)>) at ./db/schema_tables.cc:799 ``` This change returns a mutations generator from the `map` lambda coroutine so we can process them one at a time, destroy the mutations one at a time, and by that, reducing memory footprint and preventing reactor stalls. Fixes #18173 (cherry picked from commit95a5fba0ea) (cherry picked from commit52234214e5) Refs #18174 Closes scylladb/scylladb#20246 * github.com:scylladb/scylladb: schema_tables: calculate_schema_digest: filter the key earlier schema_tables: calculate_schema_digest: prevent stalls due to large mutations vector
This commit is contained in:
@@ -779,40 +779,35 @@ redact_columns_for_missing_features(mutation&& m, schema_features features) {
|
||||
*/
|
||||
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
|
||||
{
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
|
||||
using mutations_generator = coroutine::experimental::generator<mutation>;
|
||||
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> mutations_generator {
|
||||
auto& db = proxy.local().get_db();
|
||||
auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table);
|
||||
auto s = db.local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(::partition_key(p.mut().key()).get_component(*s, 0)));
|
||||
if (!accept_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
co_return mutations;
|
||||
};
|
||||
auto reduce = [features] (auto& hash, auto&& mutations) {
|
||||
for (const mutation& m : mutations) {
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
co_yield redact_columns_for_missing_features(std::move(mut), features);
|
||||
}
|
||||
};
|
||||
auto hash = md5_hasher();
|
||||
auto tables = all_table_names(features);
|
||||
{
|
||||
for (auto& table: tables) {
|
||||
auto mutations = co_await map(table);
|
||||
if (diff_logger.is_enabled(logging::log_level::trace)) {
|
||||
for (const mutation& m : mutations) {
|
||||
auto gen_mutations = map(table);
|
||||
while (auto mut_opt = co_await gen_mutations()) {
|
||||
auto& m = *mut_opt;
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
if (diff_logger.is_enabled(logging::log_level::trace)) {
|
||||
md5_hasher h;
|
||||
feed_hash_for_schema_digest(h, m, features);
|
||||
diff_logger.trace("Digest {} for {}, compacted={}", h.finalize(), m, compact_for_schema_digest(m));
|
||||
}
|
||||
}
|
||||
reduce(hash, mutations);
|
||||
}
|
||||
co_return utils::UUID_gen::get_name_UUID(hash.finalize());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user