From d8ff9ee441e4acb6f2480616cda5a9cedf6b367c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 22 Dec 2015 18:43:25 +0100 Subject: [PATCH] schema_tables: Make merge_tables() compare by mutations Schema version is calculated from mutations, so merge_schema should also look at mutation changes to detect schema changes whenever version changes. --- cql3/statements/create_index_statement.cc | 3 +- db/schema_tables.cc | 213 ++++++++++++++-------- db/schema_tables.hh | 2 - 3 files changed, 134 insertions(+), 84 deletions(-) diff --git a/cql3/statements/create_index_statement.cc b/cql3/statements/create_index_statement.cc index 23a799a167..d2f27104a8 100644 --- a/cql3/statements/create_index_statement.cc +++ b/cql3/statements/create_index_statement.cc @@ -164,12 +164,13 @@ cql3::statements::create_index_statement::validate(distributedcolumn->name())); + *target->column)); } } future cql3::statements::create_index_statement::announce_migration(distributed& proxy, bool is_local_only) { + throw std::runtime_error("Indexes are not supported yet"); auto schema = proxy.local().get_db().local().find_schema(keyspace(), column_family()); auto target = _raw_target->prepare(schema); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 5b3378c3e1..1bc77d7164 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -54,6 +54,8 @@ #include "core/thread.hh" #include "json.hh" #include "log.hh" +#include "frozen_schema.hh" +#include "schema_registry.hh" #include "db/marshal/type_parser.hh" #include "db/config.hh" @@ -61,7 +63,6 @@ #include #include -#include "md5_hasher.hh" #include "compaction_strategy.hh" @@ -73,6 +74,36 @@ namespace schema_tables { logging::logger logger("schema_tables"); +struct qualified_name { + sstring keyspace_name; + sstring table_name; + + qualified_name(sstring keyspace_name, sstring table_name) + : keyspace_name(std::move(keyspace_name)) + , table_name(std::move(table_name)) + { } + + qualified_name(const schema_ptr& s) + : keyspace_name(s->ks_name()) + , table_name(s->cf_name()) + { } + + bool operator<(const qualified_name& o) const { + return keyspace_name < o.keyspace_name + || (keyspace_name == o.keyspace_name && table_name < o.table_name); + } + + bool operator==(const qualified_name& o) const { + return keyspace_name == o.keyspace_name && table_name == o.table_name; + } +}; + +static future read_table_mutations(distributed& proxy, const qualified_name& table); + +static void merge_tables(distributed& proxy, + std::map&& before, + std::map&& after); + std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, /* not present in 2.1.8: FUNCTIONS, AGGREGATES */ }; using days = std::chrono::duration>; @@ -488,7 +519,7 @@ future<> merge_schema(distributed& proxy, std::vector merge_schema(distributed& proxy, std::vector mutations, bool do_flush) -{ +{ return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] () mutable { return do_merge_schema(proxy, std::move(mutations), do_flush); }).finally([] { @@ -496,6 +527,35 @@ future<> merge_schema(distributed& proxy, std::vector> +static read_table_names_of_keyspace(distributed& proxy, const sstring& keyspace_name) { + auto s = columnfamilies(); + auto pkey = dht::global_partitioner().decorate_key(*s, partition_key::from_singular(*s, keyspace_name)); + return db::system_keyspace::query(proxy, COLUMNFAMILIES, pkey).then([] (auto&& rs) { + std::vector result; + for (const query::result_set_row& row : rs->rows()) { + result.emplace_back(row.get_nonnull("columnfamily_name")); + } + return result; + }); +} + +// Call inside a seastar thread +static +std::map +read_tables_for_keyspaces(distributed& proxy, const std::set& keyspace_names) +{ + std::map result; + for (auto&& keyspace_name : keyspace_names) { + for (auto&& table_name : read_table_names_of_keyspace(proxy, keyspace_name).get0()) { + auto qn = qualified_name(keyspace_name, table_name); + result.emplace(qn, read_table_mutations(proxy, qn).get0()); + } + } + return result; +} + future<> do_merge_schema(distributed& proxy, std::vector mutations, bool do_flush) { return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable { @@ -510,7 +570,7 @@ future<> do_merge_schema(distributed& proxy, std::vector // current state of the schema auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0(); - auto&& old_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces).get0(); + auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces); /*auto& old_types = */read_schema_for_keyspaces(proxy, USERTYPES, keyspaces).get0(); #if 0 // not in 2.1.8 /*auto& old_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0(); @@ -530,7 +590,7 @@ future<> do_merge_schema(distributed& proxy, std::vector // with new data applied auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0(); - auto&& new_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces).get0(); + auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces); /*auto& new_types = */read_schema_for_keyspaces(proxy, USERTYPES, keyspaces).get0(); #if 0 // not in 2.1.8 /*auto& new_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0(); @@ -538,7 +598,7 @@ future<> do_merge_schema(distributed& proxy, std::vector #endif std::set keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0(); - merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)).get0(); + merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)); #if 0 mergeTypes(oldTypes, newTypes); mergeFunctions(oldFunctions, newFunctions); @@ -622,79 +682,60 @@ future> merge_keyspaces(distributed& p } // see the comments for merge_keyspaces() -future<> merge_tables(distributed& proxy, schema_result&& before, schema_result&& after) +static void merge_tables(distributed& proxy, + std::map&& before, + std::map&& after) { - return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) { - auto& after = pair.first; - auto& before = pair.second; - auto changed_at = db_clock::now(); - return proxy.local().get_db().invoke_on_all([changed_at, &proxy, &before, &after] (database& db) { - return seastar::async([changed_at, &proxy, &db, &before, &after] { - std::vector created; - std::vector altered; - std::vector dropped; - auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool { - return *x == *y; - }); - for (auto&& key : diff.entries_only_on_left) { - auto&& rs = before[key]; - for (const query::result_set_row& row : rs->rows()) { - auto ks_name = row.get_nonnull("keyspace_name"); - auto cf_name = row.get_nonnull("columnfamily_name"); - dropped.emplace_back(db.find_schema(ks_name, cf_name)); - } - } - for (auto&& key : diff.entries_only_on_right) { - auto&& value = after[key]; - auto&& tables = create_tables_from_tables_partition(proxy, value).get0(); - boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); - } - for (auto&& key : diff.entries_differing) { - sstring keyspace_name = key; + auto changed_at = db_clock::now(); + std::vector created; + std::vector altered; + std::vector dropped; - auto&& post = after[key]; - auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); - auto after = create_tables_from_tables_partition(proxy, post).get0(); - auto delta = difference(std::map{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool { - return *x == *y; - }); - for (auto&& key : delta.entries_only_on_left) { - dropped.emplace_back(before[key]); - } - for (auto&& key : delta.entries_only_on_right) { - created.emplace_back(after[key]); - } - for (auto&& key : delta.entries_differing) { - altered.emplace_back(after[key]); - } + auto diff = difference(before, after); + for (auto&& key : diff.entries_only_on_left) { + auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name); + dropped.emplace_back(s); + } + for (auto&& key : diff.entries_only_on_right) { + created.emplace_back(create_table_from_mutations(after.at(key))); + } + for (auto&& key : diff.entries_differing) { + altered.emplace_back(create_table_from_mutations(after.at(key))); + } + + proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, changed_at] (database& db) { + return seastar::async([&] { + for (auto&& gs : created) { + schema_ptr s = gs.get(); + auto& ks = db.find_keyspace(s->ks_name()); + auto cfg = ks.make_column_family_config(*s); + db.add_column_family(s, cfg); + } + for (auto&& gs : altered) { + // FIXME: Send out notifications + schema_ptr s = gs.get(); + db.update_column_family(s).get(); + } + parallel_for_each(dropped.begin(), dropped.end(), [changed_at, &db](auto&& gs) { + schema_ptr s = gs.get(); + return db.drop_column_family(changed_at, s->ks_name(), s->cf_name()); + }).get(); + // FIXME: clean this up by reorganizing the code + // Send CQL events only once, not once per shard. + if (engine().cpu_id() == 0) { + for (auto&& gs : created) { + schema_ptr s = gs.get(); + service::migration_manager::notify_create_column_family(s).get0(); + auto& ks = db.find_keyspace(s->ks_name()); + ks.make_directory_for_column_family(s->cf_name(), s->id()); } - for (auto&& cfm : created) { - auto& ks = db.find_keyspace(cfm->ks_name()); - auto cfg = ks.make_column_family_config(*cfm); - db.add_column_family(cfm, cfg); + for (auto&& gs : dropped) { + schema_ptr s = gs.get(); + service::migration_manager::notify_drop_column_family(std::move(s)).get0(); } - parallel_for_each(altered.begin(), altered.end(), [&db] (auto&& cfm) { - // FIXME: Send out notifications - return db.update_column_family(cfm); - }).get(); - parallel_for_each(dropped.begin(), dropped.end(), [changed_at, &db] (auto&& cfm) { - return db.drop_column_family(changed_at, cfm->ks_name(), cfm->cf_name()); - }).get(); - // FIXME: clean this up by reorganizing the code - // Send CQL events only once, not once per shard. - if (engine().cpu_id() == 0) { - for (auto&& cfm : created) { - service::migration_manager::notify_create_column_family(cfm).get0(); - auto& ks = db.find_keyspace(cfm->ks_name()); - ks.make_directory_for_column_family(cfm->cf_name(), cfm->id()); - } - for (auto&& cfm : dropped) { - service::migration_manager::notify_drop_column_family(cfm).get0(); - } - } - }); + } }); - }); + }).get(); } #if 0 @@ -1193,15 +1234,13 @@ std::vector make_drop_table_mutations(lw_shared_ptr return mutations; } -future create_table_from_name(distributed& proxy, const sstring& keyspace, const sstring& table) +static future read_table_mutations(distributed& proxy, const qualified_name& table) { - return read_schema_partition_for_table(proxy, COLUMNFAMILIES, keyspace, table).then([&proxy, keyspace, table] (mutation cf_m) { - if (!cf_m.live_row_count()) { - throw std::runtime_error(sprint("%s:%s not found in the schema definitions keyspace.", keyspace, table)); - } - return read_schema_partition_for_table(proxy, COLUMNS, keyspace, table).then([cf_m = std::move(cf_m)] (mutation col_m) { - schema_mutations tm{std::move(cf_m), std::move(col_m)}; - return create_table_from_mutations(std::move(tm)); + return read_schema_partition_for_table(proxy, COLUMNFAMILIES, table.keyspace_name, table.table_name) + .then([&proxy, table] (mutation cf_m) { + return read_schema_partition_for_table(proxy, COLUMNS, table.keyspace_name, table.table_name) + .then([cf_m = std::move(cf_m)] (mutation col_m) { + return schema_mutations{std::move(cf_m), std::move(col_m)}; }); #if 0 // FIXME: @@ -1219,6 +1258,18 @@ future create_table_from_name(distributed& p }); } +future create_table_from_name(distributed& proxy, const sstring& keyspace, const sstring& table) +{ + return do_with(qualified_name(keyspace, table), [&proxy] (auto&& qn) { + return read_table_mutations(proxy, qn).then([qn] (schema_mutations sm) { + if (!sm.live()) { + throw std::runtime_error(sprint("%s:%s not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name)); + } + return create_table_from_mutations(std::move(sm)); + }); + }); +} + /** * Deserialize tables from low-level schema representation, all of them belong to the same keyspace * diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 77413cbfe5..bccd5ac0c0 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -94,8 +94,6 @@ std::vector make_drop_keyspace_mutations(lw_shared_ptr create_keyspace_from_schema_partition(const schema_result_value_type& partition); -future<> merge_tables(distributed& proxy, schema_result&& before, schema_result&& after); - lw_shared_ptr create_keyspace_from_schema_partition(const schema_result_value_type& partition); mutation make_create_keyspace_mutation(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);