db/legacy_schema_tables: Convert table merging to C++
Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
@@ -28,10 +28,14 @@
|
||||
#include "system_keyspace.hh"
|
||||
|
||||
#include "query-result-set.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "map_difference.hh"
|
||||
|
||||
#include "core/do_with.hh"
|
||||
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
using namespace db::system_keyspace;
|
||||
|
||||
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
|
||||
@@ -515,13 +519,13 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
auto new_aggregates = std::move(std::get<schema_result>(std::get<4>(new_results).get()));
|
||||
|
||||
auto keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
|
||||
auto table_merge_done = merge_tables(proxy, std::move(old_column_families), std::move(new_column_families));
|
||||
#if 0
|
||||
mergeTables(oldColumnFamilies, newColumnFamilies);
|
||||
mergeTypes(oldTypes, newTypes);
|
||||
mergeFunctions(oldFunctions, newFunctions);
|
||||
mergeAggregates(oldAggregates, newAggregates);
|
||||
#endif
|
||||
return when_all(std::move(keyspaces_to_drop)).then([&proxy] (auto&& results) mutable {
|
||||
return when_all(std::move(keyspaces_to_drop), std::move(table_merge_done)).then([&proxy] (auto&& results) mutable {
|
||||
auto keyspaces_to_drop = std::move(std::get<std::set<sstring>>(std::get<0>(results).get()));
|
||||
|
||||
return proxy.get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
|
||||
@@ -593,61 +597,71 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
// see the comments for mergeKeyspaces()
|
||||
private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
|
||||
// see the comments for merge_keyspaces()
|
||||
future<> merge_tables(service::storage_proxy& proxy, schema_result&& before, schema_result&& after)
|
||||
{
|
||||
List<CFMetaData> created = new ArrayList<>();
|
||||
List<CFMetaData> altered = new ArrayList<>();
|
||||
List<CFMetaData> dropped = new ArrayList<>();
|
||||
|
||||
MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
|
||||
|
||||
for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
|
||||
if (entry.getValue().hasColumns())
|
||||
created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values());
|
||||
|
||||
for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
|
||||
{
|
||||
String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
|
||||
|
||||
ColumnFamily pre = entry.getValue().leftValue();
|
||||
ColumnFamily post = entry.getValue().rightValue();
|
||||
|
||||
if (pre.hasColumns() && post.hasColumns())
|
||||
{
|
||||
MapDifference<String, CFMetaData> delta =
|
||||
Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(),
|
||||
createTablesFromTablesPartition(new Row(entry.getKey(), post)));
|
||||
|
||||
dropped.addAll(delta.entriesOnlyOnLeft().values());
|
||||
created.addAll(delta.entriesOnlyOnRight().values());
|
||||
Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>()
|
||||
{
|
||||
public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair)
|
||||
{
|
||||
return pair.rightValue();
|
||||
return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) {
|
||||
auto& after = pair.first;
|
||||
auto& before = pair.second;
|
||||
return proxy.get_db().invoke_on_all([&before, &after] (database& db) {
|
||||
std::vector<schema_ptr> created;
|
||||
std::vector<schema_ptr> altered;
|
||||
std::vector<schema_ptr> dropped;
|
||||
auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool {
|
||||
return *x == *y;
|
||||
});
|
||||
for (auto&& key : diff.entries_only_on_right) {
|
||||
auto&& value = after[key];
|
||||
if (!value->empty()) {
|
||||
auto&& tables = create_tables_from_tables_partition(value);
|
||||
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
|
||||
}
|
||||
}));
|
||||
}
|
||||
else if (pre.hasColumns())
|
||||
{
|
||||
dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values());
|
||||
}
|
||||
else if (post.hasColumns())
|
||||
{
|
||||
created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto&& key : diff.entries_differing) {
|
||||
sstring keyspace_name = key;
|
||||
|
||||
for (CFMetaData cfm : created)
|
||||
Schema.instance.addTable(cfm);
|
||||
for (CFMetaData cfm : altered)
|
||||
Schema.instance.updateTable(cfm.ksName, cfm.cfName);
|
||||
for (CFMetaData cfm : dropped)
|
||||
Schema.instance.dropTable(cfm.ksName, cfm.cfName);
|
||||
auto&& pre = before[key];
|
||||
auto&& post = after[key];
|
||||
|
||||
if (!pre->empty() && !post->empty()) {
|
||||
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
|
||||
auto after = create_tables_from_tables_partition(post);
|
||||
auto delta = difference(std::map<sstring, schema_ptr>{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool {
|
||||
return *x == *y;
|
||||
});
|
||||
for (auto&& key : delta.entries_only_on_left) {
|
||||
dropped.emplace_back(before[key]);
|
||||
}
|
||||
for (auto&& key : delta.entries_only_on_right) {
|
||||
created.emplace_back(after[key]);
|
||||
}
|
||||
for (auto&& key : delta.entries_differing) {
|
||||
altered.emplace_back(after[key]);
|
||||
}
|
||||
} else if (!pre->empty()) {
|
||||
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
|
||||
boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped));
|
||||
} else if (!post->empty()) {
|
||||
auto tables = create_tables_from_tables_partition(post);
|
||||
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
|
||||
}
|
||||
}
|
||||
for (auto&& cfm : created) {
|
||||
column_family::config cfg; // FIXME
|
||||
db.add_column_family(std::move(column_family{cfm, cfg}));
|
||||
}
|
||||
for (auto&& cfm : altered) {
|
||||
db.update_column_family(cfm->ks_name(), cfm->cf_name());
|
||||
}
|
||||
for (auto&& cfm : dropped) {
|
||||
db.drop_column_family(cfm->ks_name(), cfm->cf_name());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
// see the comments for mergeKeyspaces()
|
||||
private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
|
||||
{
|
||||
@@ -1121,27 +1135,24 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
|
||||
return createTableFromTablePartition(partition);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Deserialize tables from low-level schema representation, all of them belong to the same keyspace
|
||||
*
|
||||
* @return map containing name of the table and its metadata for faster lookup
|
||||
*/
|
||||
private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition)
|
||||
std::map<sstring, schema_ptr> create_tables_from_tables_partition(const schema_result::mapped_type& result)
|
||||
{
|
||||
if (partition.cf == null)
|
||||
return Collections.emptyMap();
|
||||
|
||||
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
|
||||
Map<String, CFMetaData> tables = new HashMap<>();
|
||||
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
|
||||
{
|
||||
CFMetaData cfm = createTableFromTableRow(row);
|
||||
tables.put(cfm.cfName, cfm);
|
||||
std::map<sstring, schema_ptr> tables{};
|
||||
for (auto&& row : result->rows()) {
|
||||
auto&& cfm = create_table_from_table_row(row);
|
||||
tables.emplace(cfm->cf_name(), std::move(cfm));
|
||||
}
|
||||
return tables;
|
||||
}
|
||||
|
||||
#if 0
|
||||
public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns)
|
||||
{
|
||||
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
|
||||
@@ -1159,17 +1170,21 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
|
||||
return createTableFromTableRow(QueryProcessor.resultify(query, row).one());
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Deserialize table metadata from low-level representation
|
||||
*
|
||||
* @return Metadata deserialized from schema
|
||||
*/
|
||||
private static CFMetaData createTableFromTableRow(UntypedResultSet.Row result)
|
||||
schema_ptr create_table_from_table_row(const query::result_set_row& row)
|
||||
{
|
||||
String ksName = result.getString("keyspace_name");
|
||||
String cfName = result.getString("columnfamily_name");
|
||||
|
||||
auto ks_name = row.get_nonnull<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_nonnull<sstring>("columnfamily_name");
|
||||
auto id = row.get_nonnull<utils::UUID>("cf_id");
|
||||
schema_builder builder{ks_name, cf_name, id};
|
||||
#if 0
|
||||
// FIXME:
|
||||
Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName);
|
||||
CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns);
|
||||
|
||||
@@ -1183,10 +1198,11 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return cfm;
|
||||
#endif
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
#if 0
|
||||
public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result,
|
||||
UntypedResultSet serializedColumnDefinitions)
|
||||
{
|
||||
|
||||
@@ -65,11 +65,19 @@ std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_meta
|
||||
|
||||
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result::value_type& partition);
|
||||
|
||||
future<> merge_tables(service::storage_proxy& proxy, schema_result&& before, schema_result&& after);
|
||||
|
||||
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result::value_type& partition);
|
||||
|
||||
mutation make_create_keyspace_mutation(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
|
||||
|
||||
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
|
||||
|
||||
std::map<sstring, schema_ptr> create_tables_from_tables_partition(const schema_result::mapped_type& result);
|
||||
|
||||
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector<mutation>& mutations);
|
||||
|
||||
schema_ptr create_table_from_table_row(const query::result_set_row& row);
|
||||
|
||||
} // namespace legacy_schema_tables
|
||||
} // namespace db
|
||||
|
||||
Reference in New Issue
Block a user