From d25bd89ee1d332e299dcdf3f7dc730c4e29e131e Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 12 May 2015 17:16:28 +0300 Subject: [PATCH] db/legacy_schema_tables: Convert table merging to C++ Signed-off-by: Pekka Enberg --- db/legacy_schema_tables.cc | 152 ++++++++++++++++++++----------------- db/legacy_schema_tables.hh | 8 ++ 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index 6531c30183..7e3fb48698 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -28,10 +28,14 @@ #include "system_keyspace.hh" #include "query-result-set.hh" +#include "schema_builder.hh" #include "map_difference.hh" #include "core/do_with.hh" +#include +#include + using namespace db::system_keyspace; /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ @@ -515,13 +519,13 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE auto new_aggregates = std::move(std::get(std::get<4>(new_results).get())); auto keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)); + auto table_merge_done = merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)); #if 0 - mergeTables(oldColumnFamilies, newColumnFamilies); mergeTypes(oldTypes, newTypes); mergeFunctions(oldFunctions, newFunctions); mergeAggregates(oldAggregates, newAggregates); #endif - return when_all(std::move(keyspaces_to_drop)).then([&proxy] (auto&& results) mutable { + return when_all(std::move(keyspaces_to_drop), std::move(table_merge_done)).then([&proxy] (auto&& results) mutable { auto keyspaces_to_drop = std::move(std::get>(std::get<0>(results).get())); return proxy.get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) { @@ -593,61 +597,71 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE }); } -#if 0 - // see the comments for mergeKeyspaces() - private static void mergeTables(Map before, Map after) + // see the comments for merge_keyspaces() + future<> merge_tables(service::storage_proxy& proxy, schema_result&& before, schema_result&& after) { - List created = new ArrayList<>(); - List altered = new ArrayList<>(); - List dropped = new ArrayList<>(); - - MapDifference diff = Maps.difference(before, after); - - for (Map.Entry entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry> entry : diff.entriesDiffering().entrySet()) - { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - { - MapDifference delta = - Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(), - createTablesFromTablesPartition(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function, CFMetaData>() - { - public CFMetaData apply(MapDifference.ValueDifference pair) - { - return pair.rightValue(); + return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) { + auto& after = pair.first; + auto& before = pair.second; + return proxy.get_db().invoke_on_all([&before, &after] (database& db) { + std::vector created; + std::vector altered; + std::vector dropped; + auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool { + return *x == *y; + }); + for (auto&& key : diff.entries_only_on_right) { + auto&& value = after[key]; + if (!value->empty()) { + auto&& tables = create_tables_from_tables_partition(value); + boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); } - })); - } - else if (pre.hasColumns()) - { - dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values()); - } - else if (post.hasColumns()) - { - created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values()); - } - } + } + for (auto&& key : diff.entries_differing) { + sstring keyspace_name = key; - for (CFMetaData cfm : created) - Schema.instance.addTable(cfm); - for (CFMetaData cfm : altered) - Schema.instance.updateTable(cfm.ksName, cfm.cfName); - for (CFMetaData cfm : dropped) - Schema.instance.dropTable(cfm.ksName, cfm.cfName); + auto&& pre = before[key]; + auto&& post = after[key]; + + if (!pre->empty() && !post->empty()) { + auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); + auto after = create_tables_from_tables_partition(post); + auto delta = difference(std::map{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool { + return *x == *y; + }); + for (auto&& key : delta.entries_only_on_left) { + dropped.emplace_back(before[key]); + } + for (auto&& key : delta.entries_only_on_right) { + created.emplace_back(after[key]); + } + for (auto&& key : delta.entries_differing) { + altered.emplace_back(after[key]); + } + } else if (!pre->empty()) { + auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); + boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped)); + } else if (!post->empty()) { + auto tables = create_tables_from_tables_partition(post); + boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); + } + } + for (auto&& cfm : created) { + column_family::config cfg; // FIXME + db.add_column_family(std::move(column_family{cfm, cfg})); + } + for (auto&& cfm : altered) { + db.update_column_family(cfm->ks_name(), cfm->cf_name()); + } + for (auto&& cfm : dropped) { + db.drop_column_family(cfm->ks_name(), cfm->cf_name()); + } + return make_ready_future<>(); + }); + }); } +#if 0 // see the comments for mergeKeyspaces() private static void mergeTypes(Map before, Map after) { @@ -1121,27 +1135,24 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE return createTableFromTablePartition(partition); } +#endif /** * Deserialize tables from low-level schema representation, all of them belong to the same keyspace * * @return map containing name of the table and its metadata for faster lookup */ - private static Map createTablesFromTablesPartition(Row partition) + std::map create_tables_from_tables_partition(const schema_result::mapped_type& result) { - if (partition.cf == null) - return Collections.emptyMap(); - - String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); - Map tables = new HashMap<>(); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - { - CFMetaData cfm = createTableFromTableRow(row); - tables.put(cfm.cfName, cfm); + std::map tables{}; + for (auto&& row : result->rows()) { + auto&& cfm = create_table_from_table_row(row); + tables.emplace(cfm->cf_name(), std::move(cfm)); } return tables; } +#if 0 public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); @@ -1159,17 +1170,21 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); return createTableFromTableRow(QueryProcessor.resultify(query, row).one()); } +#endif /** * Deserialize table metadata from low-level representation * * @return Metadata deserialized from schema */ - private static CFMetaData createTableFromTableRow(UntypedResultSet.Row result) + schema_ptr create_table_from_table_row(const query::result_set_row& row) { - String ksName = result.getString("keyspace_name"); - String cfName = result.getString("columnfamily_name"); - + auto ks_name = row.get_nonnull("keyspace_name"); + auto cf_name = row.get_nonnull("columnfamily_name"); + auto id = row.get_nonnull("cf_id"); + schema_builder builder{ks_name, cf_name, id}; +#if 0 + // FIXME: Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName); CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns); @@ -1183,10 +1198,11 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE { throw new RuntimeException(e); } - - return cfm; +#endif + return builder.build(); } +#if 0 public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result, UntypedResultSet serializedColumnDefinitions) { diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index fa30d1a5ff..396dd6670c 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -65,11 +65,19 @@ std::vector make_create_keyspace_mutations(lw_shared_ptr create_keyspace_from_schema_partition(const schema_result::value_type& partition); +future<> merge_tables(service::storage_proxy& proxy, schema_result&& before, schema_result&& after); + +lw_shared_ptr create_keyspace_from_schema_partition(const schema_result::value_type& partition); + mutation make_create_keyspace_mutation(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true); std::vector make_create_table_mutations(lw_shared_ptr keyspace, schema_ptr table, api::timestamp_type timestamp); +std::map create_tables_from_tables_partition(const schema_result::mapped_type& result); + void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations); +schema_ptr create_table_from_table_row(const query::result_set_row& row); + } // namespace legacy_schema_tables } // namespace db