db/legacy_schema_tables: Instantiate columns from system tables

Read the system tables and instantiate table columns to in-memory data
structures.

NOTE! We only support one component per partition and clustering key
because there is no way to pass component index to the schema builder.

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
Pekka Enberg
2015-06-15 09:34:44 +03:00
parent 5084ce0955
commit 38187f2e72
2 changed files with 201 additions and 176 deletions

View File

@@ -35,6 +35,7 @@
#include "core/thread.hh"
#include "json.hh"
#include "db/marshal/type_parser.hh"
#include "db/config.hh"
#include <boost/range/algorithm/copy.hpp>
@@ -437,16 +438,19 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
});
}
#if 0
private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName)
future<schema_result::value_type>
read_schema_partition_for_table(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name, const sstring& table_name)
{
DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
ColumnFamilyStore store = getSchemaCFS(schemaTableName);
Composite prefix = store.getComparator().make(tableName);
ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis());
return new Row(key, cells);
auto schema = proxy.get_db().local().find_schema(system_keyspace::NAME, schema_table_name);
auto keyspace_key = dht::global_partitioner().decorate_key(*schema,
partition_key::from_single_value(*schema, to_bytes(keyspace_name)));
auto clustering_range = {query::clustering_range(clustering_key_prefix::from_clustering_prefix(*schema, exploded_clustering_prefix({to_bytes(table_name)})))};
return proxy.query_local(system_keyspace::NAME, schema_table_name, keyspace_key, clustering_range).then([keyspace_name] (auto&& rs) {
return schema_result::value_type{keyspace_name, std::move(rs)};
});
}
#if 0
private static boolean isEmptySchemaPartition(Row partition)
{
return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns());
@@ -602,60 +606,61 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
return do_with(std::make_pair(std::move(after), std::move(before)), [&proxy] (auto& pair) {
auto& after = pair.first;
auto& before = pair.second;
return proxy.get_db().invoke_on_all([&before, &after] (database& db) {
std::vector<schema_ptr> created;
std::vector<schema_ptr> altered;
std::vector<schema_ptr> dropped;
auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool {
return *x == *y;
return proxy.get_db().invoke_on_all([&proxy, &before, &after] (database& db) {
return seastar::async([&proxy, &db, &before, &after] {
std::vector<schema_ptr> created;
std::vector<schema_ptr> altered;
std::vector<schema_ptr> dropped;
auto diff = difference(before, after, [](const auto& x, const auto& y) -> bool {
return *x == *y;
});
for (auto&& key : diff.entries_only_on_right) {
auto&& value = after[key];
if (!value->empty()) {
auto&& tables = create_tables_from_tables_partition(proxy, value);
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
}
}
for (auto&& key : diff.entries_differing) {
sstring keyspace_name = key;
auto&& pre = before[key];
auto&& post = after[key];
if (!pre->empty() && !post->empty()) {
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
auto after = create_tables_from_tables_partition(proxy, post);
auto delta = difference(std::map<sstring, schema_ptr>{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool {
return *x == *y;
});
for (auto&& key : delta.entries_only_on_left) {
dropped.emplace_back(before[key]);
}
for (auto&& key : delta.entries_only_on_right) {
created.emplace_back(after[key]);
}
for (auto&& key : delta.entries_differing) {
altered.emplace_back(after[key]);
}
} else if (!pre->empty()) {
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped));
} else if (!post->empty()) {
auto tables = create_tables_from_tables_partition(proxy, post);
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
}
}
for (auto&& cfm : created) {
column_family::config cfg; // FIXME
db.add_column_family(std::move(column_family{cfm, cfg}));
}
for (auto&& cfm : altered) {
db.update_column_family(cfm->ks_name(), cfm->cf_name());
}
for (auto&& cfm : dropped) {
db.drop_column_family(cfm->ks_name(), cfm->cf_name());
}
});
for (auto&& key : diff.entries_only_on_right) {
auto&& value = after[key];
if (!value->empty()) {
auto&& tables = create_tables_from_tables_partition(value);
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
}
}
for (auto&& key : diff.entries_differing) {
sstring keyspace_name = key;
auto&& pre = before[key];
auto&& post = after[key];
if (!pre->empty() && !post->empty()) {
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
auto after = create_tables_from_tables_partition(post);
auto delta = difference(std::map<sstring, schema_ptr>{before.begin(), before.end()}, after, [](const schema_ptr& x, const schema_ptr& y) -> bool {
return *x == *y;
});
for (auto&& key : delta.entries_only_on_left) {
dropped.emplace_back(before[key]);
}
for (auto&& key : delta.entries_only_on_right) {
created.emplace_back(after[key]);
}
for (auto&& key : delta.entries_differing) {
altered.emplace_back(after[key]);
}
} else if (!pre->empty()) {
auto before = db.find_keyspace(keyspace_name).metadata()->cf_meta_data();
boost::copy(before | boost::adaptors::map_values, std::back_inserter(dropped));
} else if (!post->empty()) {
auto tables = create_tables_from_tables_partition(post);
boost::copy(tables | boost::adaptors::map_values, std::back_inserter(created));
}
}
for (auto&& cfm : created) {
column_family::config cfg; // FIXME
db.add_column_family(std::move(column_family{cfm, cfg}));
}
for (auto&& cfm : altered) {
db.update_column_family(cfm->ks_name(), cfm->cf_name());
}
for (auto&& cfm : dropped) {
db.drop_column_family(cfm->ks_name(), cfm->cf_name());
}
return make_ready_future<>();
});
});
}
@@ -1147,11 +1152,11 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
*
* @return map containing name of the table and its metadata for faster lookup
*/
std::map<sstring, schema_ptr> create_tables_from_tables_partition(const schema_result::mapped_type& result)
std::map<sstring, schema_ptr> create_tables_from_tables_partition(service::storage_proxy& proxy, const schema_result::mapped_type& result)
{
std::map<sstring, schema_ptr> tables{};
for (auto&& row : result->rows()) {
auto&& cfm = create_table_from_table_row(row);
auto&& cfm = create_table_from_table_row(proxy, row);
tables.emplace(cfm->cf_name(), std::move(cfm));
}
return tables;
@@ -1163,13 +1168,14 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
}
#endif
private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns)
void create_table_from_table_row_and_columns_partition(schema_builder& builder, const query::result_set_row& table_row, const schema_result::value_type& serialized_columns)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS);
return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns));
create_table_from_table_row_and_column_rows(builder, table_row, serialized_columns.second);
}
#if 0
private static CFMetaData createTableFromTablePartition(Row row)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
@@ -1182,17 +1188,16 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
*
* @return Metadata deserialized from schema
*/
schema_ptr create_table_from_table_row(const query::result_set_row& row)
schema_ptr create_table_from_table_row(service::storage_proxy& proxy, const query::result_set_row& row)
{
auto ks_name = row.get_nonnull<sstring>("keyspace_name");
auto cf_name = row.get_nonnull<sstring>("columnfamily_name");
auto id = row.get_nonnull<utils::UUID>("cf_id");
schema_builder builder{ks_name, cf_name, id};
auto serialized_columns = read_schema_partition_for_table(proxy, COLUMNS, ks_name, cf_name).get0();
create_table_from_table_row_and_columns_partition(builder, row, serialized_columns);
#if 0
// FIXME:
Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName);
CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns);
Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
try
{
@@ -1207,86 +1212,80 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
return builder.build();
}
#if 0
public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result,
UntypedResultSet serializedColumnDefinitions)
void create_table_from_table_row_and_column_rows(schema_builder& builder, const query::result_set_row& table_row, const schema_result::mapped_type& serialized_column_definitions)
{
try
{
String ksName = result.getString("keyspace_name");
String cfName = result.getString("columnfamily_name");
auto ks_name = table_row.get_nonnull<sstring>("keyspace_name");
auto cf_name = table_row.get_nonnull<sstring>("columnfamily_name");
AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
#if 0
AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator);
AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator);
#endif
List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
ksName,
cfName,
fullRawComparator,
cfType == ColumnFamilyType.Super);
std::vector<column_definition> column_defs = create_columns_from_column_rows(serialized_column_definitions,
ks_name,
cf_name/*,
fullRawComparator,
cfType == ColumnFamilyType.Super*/);
boolean isDense = result.has("is_dense")
? result.getBoolean("is_dense")
: CFMetaData.calculateIsDense(fullRawComparator, columnDefs);
#if 0
boolean isDense = result.has("is_dense")
? result.getBoolean("is_dense")
: CFMetaData.calculateIsDense(fullRawComparator, columnDefs);
CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
// if we are upgrading, we use id generated from names initially
UUID cfId = result.has("cf_id")
? result.getUUID("cf_id")
: CFMetaData.generateLegacyCfId(ksName, cfName);
// if we are upgrading, we use id generated from names initially
UUID cfId = result.has("cf_id")
? result.getUUID("cf_id")
: CFMetaData.generateLegacyCfId(ksName, cfName);
CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
cfm.isDense(isDense);
CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
cfm.isDense(isDense);
cfm.readRepairChance(result.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
if (result.has("comment"))
cfm.comment(result.getString("comment"));
if (result.has("memtable_flush_period_in_ms"))
cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
cfm.caching(CachingOptions.fromString(result.getString("caching")));
if (result.has("default_time_to_live"))
cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
if (result.has("speculative_retry"))
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
cfm.readRepairChance(result.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
if (result.has("comment"))
cfm.comment(result.getString("comment"));
if (result.has("memtable_flush_period_in_ms"))
cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
cfm.caching(CachingOptions.fromString(result.getString("caching")));
if (result.has("default_time_to_live"))
cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
if (result.has("speculative_retry"))
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
if (result.has("min_index_interval"))
cfm.minIndexInterval(result.getInt("min_index_interval"));
if (result.has("min_index_interval"))
cfm.minIndexInterval(result.getInt("min_index_interval"));
if (result.has("max_index_interval"))
cfm.maxIndexInterval(result.getInt("max_index_interval"));
if (result.has("max_index_interval"))
cfm.maxIndexInterval(result.getInt("max_index_interval"));
if (result.has("bloom_filter_fp_chance"))
cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
else
cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
if (result.has("bloom_filter_fp_chance"))
cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
else
cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
if (result.has("dropped_columns"))
cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
for (ColumnDefinition cd : columnDefs)
cfm.addOrReplaceColumnDefinition(cd);
return cfm.rebuild();
}
catch (SyntaxException | ConfigurationException e)
{
throw new RuntimeException(e);
if (result.has("dropped_columns"))
cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
#endif
for (auto&& cdef : column_defs) {
builder.with_column(cdef);
}
}
#if 0
private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
{
Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
@@ -1333,14 +1332,21 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
}
}
#if 0
private static ColumnDefinition.Kind deserializeKind(String kind)
{
if (kind.equalsIgnoreCase("clustering_key"))
return ColumnDefinition.Kind.CLUSTERING_COLUMN;
return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
column_kind deserialize_kind(sstring kind) {
if (kind == "partition_key") {
return column_kind::partition_key;
} else if (kind == "clustering_key") {
return column_kind::clustering_key;
} else if (kind == "static") {
return column_kind::static_column;
} else if (kind == "regular") {
return column_kind::regular_column;
} else {
throw std::invalid_argument("unknown column kind: " + kind);
}
}
#if 0
private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
{
ColumnFamily cells = mutation.addOrGet(Columns);
@@ -1350,42 +1356,52 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
}
#endif
private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
String keyspace,
String table,
AbstractType<?> rawComparator,
boolean isSuper)
std::vector<column_definition> create_columns_from_column_rows(const schema_result::mapped_type& rows,
const sstring& keyspace,
const sstring& table/*,
AbstractType<?> rawComparator,
boolean isSuper*/)
{
List<ColumnDefinition> columns = new ArrayList<>();
for (UntypedResultSet.Row row : rows)
columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper));
std::vector<column_definition> columns;
for (auto&& row : rows->rows()) {
columns.emplace_back(std::move(create_column_from_column_row(row, keyspace, table/*, rawComparator, isSuper*/)));
}
return columns;
}
private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
String keyspace,
String table,
AbstractType<?> rawComparator,
boolean isSuper)
column_definition create_column_from_column_row(const query::result_set_row& row,
sstring keyspace,
sstring table/*,
AbstractType<?> rawComparator,
boolean isSuper*/)
{
ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
auto kind = deserialize_kind(row.get_nonnull<sstring>("type"));
Integer componentIndex = null;
if (row.has("component_index"))
componentIndex = row.getInt("component_index");
if (row.has("component_index")) {
// FIXME: We need to pass component_index to schema_builder
// to ensure columns are instantiated in the correct order.
auto component_index = row.get_nonnull<int32_t>("component_index");
assert(component_index == 0);
}
#if 0
else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper)
componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
#endif
#if 0
// Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
// we need to use the comparator fromString method
AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR
? getComponentComparator(rawComparator, componentIndex)
: UTF8Type.instance;
ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator);
#endif
auto name = row.get_nonnull<sstring>("column_name");
AbstractType<?> validator = parseType(row.getString("validator"));
auto validator = parse_type(row.get_nonnull<sstring>("validator"));
#if 0
IndexType indexType = null;
if (row.has("index_type"))
indexType = IndexType.valueOf(row.getString("index_type"));
@@ -1397,10 +1413,11 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
String indexName = null;
if (row.has("index_name"))
indexName = row.getString("index_name");
return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind);
#endif
return column_definition{to_bytes(name), validator, kind};
}
#if 0
private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex)
{
return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
@@ -1634,22 +1651,13 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
return mutation;
}
private static AbstractType<?> parseType(String str)
{
try
{
return TypeParser.parse(str);
}
catch (SyntaxException | ConfigurationException e)
{
// We only use this when reading the schema where we shouldn't get an error
throw new RuntimeException(e);
}
}
}
#endif
data_type parse_type(sstring str)
{
return db::marshal::type_parser::parse(str);
}
std::vector<schema_ptr> all_tables() {
return {
keyspaces(), columnfamilies(), columns(), triggers(), usertypes(), functions(), aggregates()

View File

@@ -73,15 +73,32 @@ mutation make_create_keyspace_mutation(lw_shared_ptr<keyspace_metadata> keyspace
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
std::map<sstring, schema_ptr> create_tables_from_tables_partition(const schema_result::mapped_type& result);
std::map<sstring, schema_ptr> create_tables_from_tables_partition(service::storage_proxy& proxy, const schema_result::mapped_type& result);
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector<mutation>& mutations);
schema_ptr create_table_from_table_row(const query::result_set_row& row);
schema_ptr create_table_from_table_row(service::storage_proxy& proxy, const query::result_set_row& row);
void create_table_from_table_row_and_column_rows(schema_builder& builder, const query::result_set_row& table_row, const schema_result::mapped_type& serialized_columns);
std::vector<column_definition> create_columns_from_column_rows(const schema_result::mapped_type& rows,
const sstring& keyspace,
const sstring& table/*,
AbstractType<?> rawComparator,
boolean isSuper*/);
column_definition create_column_from_column_row(const query::result_set_row& row,
sstring keyspace,
sstring table/*,
AbstractType<?> rawComparator,
boolean isSuper*/);
void add_column_to_schema_mutation(schema_ptr table, const column_definition& column, api::timestamp_type timestamp, const partition_key& pkey, std::vector<mutation>& mutations);
sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);
data_type parse_type(sstring str);
} // namespace legacy_schema_tables
} // namespace db