diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index 630a4d83ee..c6aaf180bc 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -35,6 +35,7 @@ #include "core/thread.hh" #include "json.hh" +#include "db/marshal/type_parser.hh" #include "db/config.hh" #include @@ -437,16 +438,19 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE }); } -#if 0 - private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName) + future + read_schema_partition_for_table(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name, const sstring& table_name) { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); - ColumnFamilyStore store = getSchemaCFS(schemaTableName); - Composite prefix = store.getComparator().make(tableName); - ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis()); - return new Row(key, cells); + auto schema = proxy.get_db().local().find_schema(system_keyspace::NAME, schema_table_name); + auto keyspace_key = dht::global_partitioner().decorate_key(*schema, + partition_key::from_single_value(*schema, to_bytes(keyspace_name))); + auto clustering_range = {query::clustering_range(clustering_key_prefix::from_clustering_prefix(*schema, exploded_clustering_prefix({to_bytes(table_name)})))}; + return proxy.query_local(system_keyspace::NAME, schema_table_name, keyspace_key, clustering_range).then([keyspace_name] (auto&& rs) { + return schema_result::value_type{keyspace_name, std::move(rs)}; + }); } +#if 0 private static boolean isEmptySchemaPartition(Row partition) { return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns()); @@ -602,60 +606,61 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) { auto& after = pair.first; auto& before = pair.second; - return proxy.get_db().invoke_on_all([&before, &after] (database& db) { - std::vector created; - std::vector altered; - std::vector dropped; - auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool { - return *x == *y; + return proxy.get_db().invoke_on_all([&proxy, &before, &after] (database& db) { + return seastar::async([&proxy, &db, &before, &after] { + std::vector created; + std::vector altered; + std::vector dropped; + auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool { + return *x == *y; + }); + for (auto&& key : diff.entries_only_on_right) { + auto&& value = after[key]; + if (!value->empty()) { + auto&& tables = create_tables_from_tables_partition(proxy, value); + boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); + } + } + for (auto&& key : diff.entries_differing) { + sstring keyspace_name = key; + + auto&& pre = before[key]; + auto&& post = after[key]; + + if (!pre->empty() && !post->empty()) { + auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); + auto after = create_tables_from_tables_partition(proxy, post); + auto delta = difference(std::map{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool { + return *x == *y; + }); + for (auto&& key : delta.entries_only_on_left) { + dropped.emplace_back(before[key]); + } + for (auto&& key : delta.entries_only_on_right) { + created.emplace_back(after[key]); + } + for (auto&& key : delta.entries_differing) { + altered.emplace_back(after[key]); + } + } else if (!pre->empty()) { + auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); + boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped)); + } else if (!post->empty()) { + auto tables = create_tables_from_tables_partition(proxy, post); + boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); + } + } + for (auto&& cfm : created) { + column_family::config cfg; // FIXME + db.add_column_family(std::move(column_family{cfm, cfg})); + } + for (auto&& cfm : altered) { + db.update_column_family(cfm->ks_name(), cfm->cf_name()); + } + for (auto&& cfm : dropped) { + db.drop_column_family(cfm->ks_name(), cfm->cf_name()); + } }); - for (auto&& key : diff.entries_only_on_right) { - auto&& value = after[key]; - if (!value->empty()) { - auto&& tables = create_tables_from_tables_partition(value); - boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); - } - } - for (auto&& key : diff.entries_differing) { - sstring keyspace_name = key; - - auto&& pre = before[key]; - auto&& post = after[key]; - - if (!pre->empty() && !post->empty()) { - auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); - auto after = create_tables_from_tables_partition(post); - auto delta = difference(std::map{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool { - return *x == *y; - }); - for (auto&& key : delta.entries_only_on_left) { - dropped.emplace_back(before[key]); - } - for (auto&& key : delta.entries_only_on_right) { - created.emplace_back(after[key]); - } - for (auto&& key : delta.entries_differing) { - altered.emplace_back(after[key]); - } - } else if (!pre->empty()) { - auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data(); - boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped)); - } else if (!post->empty()) { - auto tables = create_tables_from_tables_partition(post); - boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created)); - } - } - for (auto&& cfm : created) { - column_family::config cfg; // FIXME - db.add_column_family(std::move(column_family{cfm, cfg})); - } - for (auto&& cfm : altered) { - db.update_column_family(cfm->ks_name(), cfm->cf_name()); - } - for (auto&& cfm : dropped) { - db.drop_column_family(cfm->ks_name(), cfm->cf_name()); - } - return make_ready_future<>(); }); }); } @@ -1147,11 +1152,11 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE * * @return map containing name of the table and its metadata for faster lookup */ - std::map create_tables_from_tables_partition(const schema_result::mapped_type& result) + std::map create_tables_from_tables_partition(service::storage_proxy& proxy, const schema_result::mapped_type& result) { std::map tables{}; for (auto&& row : result->rows()) { - auto&& cfm = create_table_from_table_row(row); + auto&& cfm = create_table_from_table_row(proxy, row); tables.emplace(cfm->cf_name(), std::move(cfm)); } return tables; @@ -1163,13 +1168,14 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns); } +#endif - private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns) + void create_table_from_table_row_and_columns_partition(schema_builder& builder, const query::result_set_row& table_row, const schema_result::value_type& serialized_columns) { - String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS); - return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns)); + create_table_from_table_row_and_column_rows(builder, table_row, serialized_columns.second); } +#if 0 private static CFMetaData createTableFromTablePartition(Row row) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); @@ -1182,17 +1188,16 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE * * @return Metadata deserialized from schema */ - schema_ptr create_table_from_table_row(const query::result_set_row& row) + schema_ptr create_table_from_table_row(service::storage_proxy& proxy, const query::result_set_row& row) { auto ks_name = row.get_nonnull("keyspace_name"); auto cf_name = row.get_nonnull("columnfamily_name"); auto id = row.get_nonnull("cf_id"); schema_builder builder{ks_name, cf_name, id}; + auto serialized_columns = read_schema_partition_for_table(proxy, COLUMNS, ks_name, cf_name).get0(); + create_table_from_table_row_and_columns_partition(builder, row, serialized_columns); #if 0 // FIXME: - Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName); - CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns); - Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName); try { @@ -1207,86 +1212,80 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE return builder.build(); } -#if 0 - public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result, - UntypedResultSet serializedColumnDefinitions) + void create_table_from_table_row_and_column_rows(schema_builder& builder, const query::result_set_row& table_row, const schema_result::mapped_type& serialized_column_definitions) { - try - { - String ksName = result.getString("keyspace_name"); - String cfName = result.getString("columnfamily_name"); + auto ks_name = table_row.get_nonnull("keyspace_name"); + auto cf_name = table_row.get_nonnull("columnfamily_name"); - AbstractType rawComparator = TypeParser.parse(result.getString("comparator")); - AbstractType subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null; - ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type")); +#if 0 + AbstractType rawComparator = TypeParser.parse(result.getString("comparator")); + AbstractType subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null; + ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type")); - AbstractType fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator); + AbstractType fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator); +#endif - List columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions, - ksName, - cfName, - fullRawComparator, - cfType == ColumnFamilyType.Super); + std::vector column_defs = create_columns_from_column_rows(serialized_column_definitions, + ks_name, + cf_name/*, + fullRawComparator, + cfType == ColumnFamilyType.Super*/); - boolean isDense = result.has("is_dense") - ? result.getBoolean("is_dense") - : CFMetaData.calculateIsDense(fullRawComparator, columnDefs); +#if 0 + boolean isDense = result.has("is_dense") + ? result.getBoolean("is_dense") + : CFMetaData.calculateIsDense(fullRawComparator, columnDefs); - CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense); + CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense); - // if we are upgrading, we use id generated from names initially - UUID cfId = result.has("cf_id") - ? result.getUUID("cf_id") - : CFMetaData.generateLegacyCfId(ksName, cfName); + // if we are upgrading, we use id generated from names initially + UUID cfId = result.has("cf_id") + ? result.getUUID("cf_id") + : CFMetaData.generateLegacyCfId(ksName, cfName); - CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId); - cfm.isDense(isDense); + CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId); + cfm.isDense(isDense); - cfm.readRepairChance(result.getDouble("read_repair_chance")); - cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); - cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); - cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); - cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); - cfm.minCompactionThreshold(result.getInt("min_compaction_threshold")); - cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold")); - if (result.has("comment")) - cfm.comment(result.getString("comment")); - if (result.has("memtable_flush_period_in_ms")) - cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); - cfm.caching(CachingOptions.fromString(result.getString("caching"))); - if (result.has("default_time_to_live")) - cfm.defaultTimeToLive(result.getInt("default_time_to_live")); - if (result.has("speculative_retry")) - cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry"))); - cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class"))); - cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); - cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); + cfm.readRepairChance(result.getDouble("read_repair_chance")); + cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); + cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); + cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); + cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); + cfm.minCompactionThreshold(result.getInt("min_compaction_threshold")); + cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold")); + if (result.has("comment")) + cfm.comment(result.getString("comment")); + if (result.has("memtable_flush_period_in_ms")) + cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); + cfm.caching(CachingOptions.fromString(result.getString("caching"))); + if (result.has("default_time_to_live")) + cfm.defaultTimeToLive(result.getInt("default_time_to_live")); + if (result.has("speculative_retry")) + cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry"))); + cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class"))); + cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); + cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); - if (result.has("min_index_interval")) - cfm.minIndexInterval(result.getInt("min_index_interval")); + if (result.has("min_index_interval")) + cfm.minIndexInterval(result.getInt("min_index_interval")); - if (result.has("max_index_interval")) - cfm.maxIndexInterval(result.getInt("max_index_interval")); + if (result.has("max_index_interval")) + cfm.maxIndexInterval(result.getInt("max_index_interval")); - if (result.has("bloom_filter_fp_chance")) - cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); - else - cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); + if (result.has("bloom_filter_fp_chance")) + cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); + else + cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); - if (result.has("dropped_columns")) - cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance))); - - for (ColumnDefinition cd : columnDefs) - cfm.addOrReplaceColumnDefinition(cd); - - return cfm.rebuild(); - } - catch (SyntaxException | ConfigurationException e) - { - throw new RuntimeException(e); + if (result.has("dropped_columns")) + cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance))); +#endif + for (auto&& cdef : column_defs) { + builder.with_column(cdef); } } +#if 0 private static Map convertDroppedColumns(Map raw) { Map converted = Maps.newHashMap(); @@ -1333,14 +1332,21 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE } } -#if 0 - private static ColumnDefinition.Kind deserializeKind(String kind) - { - if (kind.equalsIgnoreCase("clustering_key")) - return ColumnDefinition.Kind.CLUSTERING_COLUMN; - return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase()); + column_kind deserialize_kind(sstring kind) { + if (kind == "partition_key") { + return column_kind::partition_key; + } else if (kind == "clustering_key") { + return column_kind::clustering_key; + } else if (kind == "static") { + return column_kind::static_column; + } else if (kind == "regular") { + return column_kind::regular_column; + } else { + throw std::invalid_argument("unknown column kind: " + kind); + } } +#if 0 private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) { ColumnFamily cells = mutation.addOrGet(Columns); @@ -1350,42 +1356,52 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE Composite prefix = Columns.comparator.make(table.cfName, column.name.toString()); cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); } +#endif - private static List createColumnsFromColumnRows(UntypedResultSet rows, - String keyspace, - String table, - AbstractType rawComparator, - boolean isSuper) + std::vector create_columns_from_column_rows(const schema_result::mapped_type& rows, + const sstring& keyspace, + const sstring& table/*, + AbstractType rawComparator, + boolean isSuper*/) { - List columns = new ArrayList<>(); - for (UntypedResultSet.Row row : rows) - columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper)); + std::vector columns; + for (auto&& row : rows->rows()) { + columns.emplace_back(std::move(create_column_from_column_row(row, keyspace, table/*, rawComparator, isSuper*/))); + } return columns; } - private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row, - String keyspace, - String table, - AbstractType rawComparator, - boolean isSuper) + column_definition create_column_from_column_row(const query::result_set_row& row, + sstring keyspace, + sstring table/*, + AbstractType rawComparator, + boolean isSuper*/) { - ColumnDefinition.Kind kind = deserializeKind(row.getString("type")); + auto kind = deserialize_kind(row.get_nonnull("type")); - Integer componentIndex = null; - if (row.has("component_index")) - componentIndex = row.getInt("component_index"); + if (row.has("component_index")) { + // FIXME: We need to pass component_index to schema_builder + // to ensure columns are instantiated in the correct order. + auto component_index = row.get_nonnull("component_index"); + assert(component_index == 0); + } +#if 0 else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper) componentIndex = 1; // A ColumnDefinition for super columns applies to the column component +#endif +#if 0 // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we // we need to use the comparator fromString method AbstractType comparator = kind == ColumnDefinition.Kind.REGULAR ? getComponentComparator(rawComparator, componentIndex) : UTF8Type.instance; - ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator); +#endif + auto name = row.get_nonnull("column_name"); - AbstractType validator = parseType(row.getString("validator")); + auto validator = parse_type(row.get_nonnull("validator")); +#if 0 IndexType indexType = null; if (row.has("index_type")) indexType = IndexType.valueOf(row.getString("index_type")); @@ -1397,10 +1413,11 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE String indexName = null; if (row.has("index_name")) indexName = row.getString("index_name"); - - return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind); +#endif + return column_definition{to_bytes(name), validator, kind}; } +#if 0 private static AbstractType getComponentComparator(AbstractType rawComparator, Integer componentIndex) { return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType))) @@ -1634,22 +1651,13 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE return mutation; } - - private static AbstractType parseType(String str) - { - try - { - return TypeParser.parse(str); - } - catch (SyntaxException | ConfigurationException e) - { - // We only use this when reading the schema where we shouldn't get an error - throw new RuntimeException(e); - } - } -} #endif + data_type parse_type(sstring str) + { + return db::marshal::type_parser::parse(str); + } + std::vector all_tables() { return { keyspaces(), columnfamilies(), columns(), triggers(), usertypes(), functions(), aggregates() diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index 116d505626..10614965bf 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -73,15 +73,32 @@ mutation make_create_keyspace_mutation(lw_shared_ptr keyspace std::vector make_create_table_mutations(lw_shared_ptr keyspace, schema_ptr table, api::timestamp_type timestamp); -std::map create_tables_from_tables_partition(const schema_result::mapped_type& result); +std::map create_tables_from_tables_partition(service::storage_proxy& proxy, const schema_result::mapped_type& result); void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations); -schema_ptr create_table_from_table_row(const query::result_set_row& row); +schema_ptr create_table_from_table_row(service::storage_proxy& proxy, const query::result_set_row& row); + +void create_table_from_table_row_and_column_rows(schema_builder& builder, const query::result_set_row& table_row, const schema_result::mapped_type& serialized_columns); + +std::vector create_columns_from_column_rows(const schema_result::mapped_type& rows, + const sstring& keyspace, + const sstring& table/*, + AbstractType rawComparator, + boolean isSuper*/); + +column_definition create_column_from_column_row(const query::result_set_row& row, + sstring keyspace, + sstring table/*, + AbstractType rawComparator, + boolean isSuper*/); + void add_column_to_schema_mutation(schema_ptr table, const column_definition& column, api::timestamp_type timestamp, const partition_key& pkey, std::vector& mutations); sstring serialize_kind(column_kind kind); +column_kind deserialize_kind(sstring kind); +data_type parse_type(sstring str); } // namespace legacy_schema_tables } // namespace db