schema_tables: Make merge_tables() compare by mutations

Schema version is calculated from mutations, so merge_schema should
also look at mutation changes to detect schema changes whenever
version changes.
This commit is contained in:
Tomasz Grabiec
2015-12-22 18:43:25 +01:00
parent 5707c5e7ca
commit d8ff9ee441
3 changed files with 134 additions and 84 deletions

View File

@@ -164,12 +164,13 @@ cql3::statements::create_index_statement::validate(distributed<service::storage_
throw exceptions::invalid_request_exception(
sprint(
"Cannot create secondary index on partition key column %s",
target->column->name()));
*target->column));
}
}
future<bool>
cql3::statements::create_index_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
throw std::runtime_error("Indexes are not supported yet");
auto schema = proxy.local().get_db().local().find_schema(keyspace(), column_family());
auto target = _raw_target->prepare(schema);

View File

@@ -54,6 +54,8 @@
#include "core/thread.hh"
#include "json.hh"
#include "log.hh"
#include "frozen_schema.hh"
#include "schema_registry.hh"
#include "db/marshal/type_parser.hh"
#include "db/config.hh"
@@ -61,7 +63,6 @@
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/map.hpp>
#include "md5_hasher.hh"
#include "compaction_strategy.hh"
@@ -73,6 +74,36 @@ namespace schema_tables {
logging::logger logger("schema_tables");
struct qualified_name {
sstring keyspace_name;
sstring table_name;
qualified_name(sstring keyspace_name, sstring table_name)
: keyspace_name(std::move(keyspace_name))
, table_name(std::move(table_name))
{ }
qualified_name(const schema_ptr& s)
: keyspace_name(s->ks_name())
, table_name(s->cf_name())
{ }
bool operator<(const qualified_name& o) const {
return keyspace_name < o.keyspace_name
|| (keyspace_name == o.keyspace_name && table_name < o.table_name);
}
bool operator==(const qualified_name& o) const {
return keyspace_name == o.keyspace_name && table_name == o.table_name;
}
};
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table);
static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after);
std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, /* not present in 2.1.8: FUNCTIONS, AGGREGATES */ };
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
@@ -488,7 +519,7 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mu
}
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
{
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] () mutable {
return do_merge_schema(proxy, std::move(mutations), do_flush);
}).finally([] {
@@ -496,6 +527,35 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mu
});
}
// Returns names of live table definitions of given keyspace
future<std::vector<sstring>>
static read_table_names_of_keyspace(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name) {
auto s = columnfamilies();
auto pkey = dht::global_partitioner().decorate_key(*s, partition_key::from_singular(*s, keyspace_name));
return db::system_keyspace::query(proxy, COLUMNFAMILIES, pkey).then([] (auto&& rs) {
std::vector<sstring> result;
for (const query::result_set_row& row : rs->rows()) {
result.emplace_back(row.get_nonnull<sstring>("columnfamily_name"));
}
return result;
});
}
// Call inside a seastar thread
static
std::map<qualified_name, schema_mutations>
read_tables_for_keyspaces(distributed<service::storage_proxy>& proxy, const std::set<sstring>& keyspace_names)
{
std::map<qualified_name, schema_mutations> result;
for (auto&& keyspace_name : keyspace_names) {
for (auto&& table_name : read_table_names_of_keyspace(proxy, keyspace_name).get0()) {
auto qn = qualified_name(keyspace_name, table_name);
result.emplace(qn, read_table_mutations(proxy, qn).get0());
}
}
return result;
}
future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable {
@@ -510,7 +570,7 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
// current state of the schema
auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& old_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces).get0();
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces);
/*auto& old_types = */read_schema_for_keyspaces(proxy, USERTYPES, keyspaces).get0();
#if 0 // not in 2.1.8
/*auto& old_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
@@ -530,7 +590,7 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
// with new data applied
auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& new_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces).get0();
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces);
/*auto& new_types = */read_schema_for_keyspaces(proxy, USERTYPES, keyspaces).get0();
#if 0 // not in 2.1.8
/*auto& new_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
@@ -538,7 +598,7 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
#endif
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)).get0();
merge_tables(proxy, std::move(old_column_families), std::move(new_column_families));
#if 0
mergeTypes(oldTypes, newTypes);
mergeFunctions(oldFunctions, newFunctions);
@@ -622,79 +682,60 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
}
// see the comments for merge_keyspaces()
future<> merge_tables(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after)
static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) {
auto& after = pair.first;
auto& before = pair.second;
auto changed_at = db_clock::now();
return proxy.local().get_db().invoke_on_all([changed_at, &proxy, &before, &after] (database& db) {
return seastar::async([changed_at, &proxy, &db, &before, &after] {
std::vector<schema_ptr> created;
std::vector<schema_ptr> altered;
std::vector<schema_ptr> dropped;
auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool {
return *x == *y;
});
for (auto&& key : diff.entries_only_on_left) {
auto&& rs = before[key];
for (const query::result_set_row& row : rs->rows()) {
auto ks_name = row.get_nonnull<sstring>("keyspace_name");
auto cf_name = row.get_nonnull<sstring>("columnfamily_name");
dropped.emplace_back(db.find_schema(ks_name, cf_name));
}
}
for (auto&& key : diff.entries_only_on_right) {
auto&& value = after[key];
auto&& tables = create_tables_from_tables_partition(proxy, value).get0();
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
}
for (auto&& key : diff.entries_differing) {
sstring keyspace_name = key;
auto changed_at = db_clock::now();
std::vector<global_schema_ptr> created;
std::vector<global_schema_ptr> altered;
std::vector<global_schema_ptr> dropped;
auto&& post = after[key];
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
auto after = create_tables_from_tables_partition(proxy, post).get0();
auto delta = difference(std::map<sstring, schema_ptr>{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool {
return *x == *y;
});
for (auto&& key : delta.entries_only_on_left) {
dropped.emplace_back(before[key]);
}
for (auto&& key : delta.entries_only_on_right) {
created.emplace_back(after[key]);
}
for (auto&& key : delta.entries_differing) {
altered.emplace_back(after[key]);
}
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
dropped.emplace_back(s);
}
for (auto&& key : diff.entries_only_on_right) {
created.emplace_back(create_table_from_mutations(after.at(key)));
}
for (auto&& key : diff.entries_differing) {
altered.emplace_back(create_table_from_mutations(after.at(key)));
}
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, changed_at] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
schema_ptr s = gs.get();
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
db.add_column_family(s, cfg);
}
for (auto&& gs : altered) {
// FIXME: Send out notifications
schema_ptr s = gs.get();
db.update_column_family(s).get();
}
parallel_for_each(dropped.begin(), dropped.end(), [changed_at, &db](auto&& gs) {
schema_ptr s = gs.get();
return db.drop_column_family(changed_at, s->ks_name(), s->cf_name());
}).get();
// FIXME: clean this up by reorganizing the code
// Send CQL events only once, not once per shard.
if (engine().cpu_id() == 0) {
for (auto&& gs : created) {
schema_ptr s = gs.get();
service::migration_manager::notify_create_column_family(s).get0();
auto& ks = db.find_keyspace(s->ks_name());
ks.make_directory_for_column_family(s->cf_name(), s->id());
}
for (auto&& cfm : created) {
auto& ks = db.find_keyspace(cfm->ks_name());
auto cfg = ks.make_column_family_config(*cfm);
db.add_column_family(cfm, cfg);
for (auto&& gs : dropped) {
schema_ptr s = gs.get();
service::migration_manager::notify_drop_column_family(std::move(s)).get0();
}
parallel_for_each(altered.begin(), altered.end(), [&db] (auto&& cfm) {
// FIXME: Send out notifications
return db.update_column_family(cfm);
}).get();
parallel_for_each(dropped.begin(), dropped.end(), [changed_at, &db] (auto&& cfm) {
return db.drop_column_family(changed_at, cfm->ks_name(), cfm->cf_name());
}).get();
// FIXME: clean this up by reorganizing the code
// Send CQL events only once, not once per shard.
if (engine().cpu_id() == 0) {
for (auto&& cfm : created) {
service::migration_manager::notify_create_column_family(cfm).get0();
auto& ks = db.find_keyspace(cfm->ks_name());
ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}
for (auto&& cfm : dropped) {
service::migration_manager::notify_drop_column_family(cfm).get0();
}
}
});
}
});
});
}).get();
}
#if 0
@@ -1193,15 +1234,13 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
return mutations;
}
future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table)
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table)
{
return read_schema_partition_for_table(proxy, COLUMNFAMILIES, keyspace, table).then([&proxy, keyspace, table] (mutation cf_m) {
if (!cf_m.live_row_count()) {
throw std::runtime_error(sprint("%s:%s not found in the schema definitions keyspace.", keyspace, table));
}
return read_schema_partition_for_table(proxy, COLUMNS, keyspace, table).then([cf_m = std::move(cf_m)] (mutation col_m) {
schema_mutations tm{std::move(cf_m), std::move(col_m)};
return create_table_from_mutations(std::move(tm));
return read_schema_partition_for_table(proxy, COLUMNFAMILIES, table.keyspace_name, table.table_name)
.then([&proxy, table] (mutation cf_m) {
return read_schema_partition_for_table(proxy, COLUMNS, table.keyspace_name, table.table_name)
.then([cf_m = std::move(cf_m)] (mutation col_m) {
return schema_mutations{std::move(cf_m), std::move(col_m)};
});
#if 0
// FIXME:
@@ -1219,6 +1258,18 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
});
}
future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table)
{
return do_with(qualified_name(keyspace, table), [&proxy] (auto&& qn) {
return read_table_mutations(proxy, qn).then([qn] (schema_mutations sm) {
if (!sm.live()) {
throw std::runtime_error(sprint("%s:%s not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name));
}
return create_table_from_mutations(std::move(sm));
});
});
}
/**
* Deserialize tables from low-level schema representation, all of them belong to the same keyspace
*

View File

@@ -94,8 +94,6 @@ std::vector<mutation> make_drop_keyspace_mutations(lw_shared_ptr<keyspace_metada
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result_value_type& partition);
future<> merge_tables(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after);
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result_value_type& partition);
mutation make_create_keyspace_mutation(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);