Merge remote-tracking branch 'dev/penberg/keyspace-merging/v5' from seastar-dev.git

From Pekka:

"This patch series converts LegacySchemaTables keyspace merging code to
C++. After this series, keyspaces are actually created as demonstrated
by the newly added test in cql_query_test.cc."
This commit is contained in:
Tomasz Grabiec
2015-04-28 18:06:23 +02:00
19 changed files with 502 additions and 143 deletions

View File

@@ -211,6 +211,10 @@ public:
return _size;
}
bool empty() const {
return _size == 0;
}
void reserve(size_t size) {
// FIXME: implement
}

View File

@@ -45,35 +45,41 @@ public:
const ::shared_ptr<ut_meta_data> user_types;
#if 0
public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites)
{
this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData());
}
ks_meta_data(sstring name_,
sstring strategy_name_,
std::unordered_map<sstring, sstring> strategy_options_,
bool durable_writes_)
: ks_meta_data{std::move(name_),
std::move(strategy_name_),
std::move(strategy_options_),
durable_writes_,
{}, ::make_shared<ut_meta_data>()}
{ }
ks_meta_data(sstring name_,
sstring strategy_name_,
std::unordered_map<sstring, sstring> strategy_options_,
bool durable_writes_,
std::vector<schema_ptr> cf_defs)
: ks_meta_data{std::move(name_),
std::move(strategy_name_),
std::move(strategy_options_),
durable_writes_,
std::move(cf_defs),
::make_shared<ut_meta_data>()}
{ }
public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites,
Iterable<CFMetaData> cfDefs)
{
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
}
#endif
ks_meta_data(sstring name_,
sstring strategy_name_,
std::unordered_map<sstring, sstring> strategy_options_,
bool durable_writes_,
std::vector<schema_ptr> cf_defs,
shared_ptr<ut_meta_data> user_types_)
: name{name_}
: name{std::move(name_)}
, strategy_name{strategy_name_.empty() ? "NetworkTopologyStrategy" : strategy_name_}
, strategy_options{std::move(strategy_options_)}
, durable_writes{durable_writes_}
, user_types{user_types_}
, user_types{std::move(user_types_)}
{
for (auto&& s : cf_defs) {
_cf_meta_data.emplace(s->cf_name(), s);

View File

@@ -408,6 +408,7 @@ urchin_core = (['database.cc',
'dht/murmur3_partitioner.cc',
'unimplemented.cc',
'query.cc',
'query-result-set.cc',
'locator/abstract_replication_strategy.cc',
'locator/simple_strategy.cc',
'locator/token_metadata.cc',

View File

@@ -303,6 +303,14 @@ keyspace& database::add_keyspace(sstring name, keyspace k) {
return _keyspaces.emplace(std::move(name), std::move(k)).first->second;
}
void database::update_keyspace(const sstring& name) {
throw std::runtime_error("not implemented");
}
void database::drop_keyspace(const sstring& name) {
throw std::runtime_error("not implemented");
}
void database::add_column_family(const utils::UUID& uuid, column_family&& cf) {
if (_keyspaces.count(cf._schema->ks_name()) == 0) {
throw std::invalid_argument("Keyspace " + cf._schema->ks_name() + " not defined");

View File

@@ -138,6 +138,8 @@ public:
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
bool has_keyspace(const sstring& name) const;
void update_keyspace(const sstring& name);
void drop_keyspace(const sstring& name);
column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family);
const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family);
column_family& find_column_family(const utils::UUID&) throw (no_such_column_family);

View File

@@ -23,8 +23,14 @@
#include "utils/UUID_gen.hh"
#include "legacy_schema_tables.hh"
#include "dht/i_partitioner.hh"
#include "system_keyspace.hh"
#include "query-result-set.hh"
#include "core/do_with.hh"
using namespace db::system_keyspace;
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
@@ -388,38 +394,45 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
mutation.add(partition.cf);
}
}
#endif
private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
future<schema_result>
read_schema_for_keyspaces(service::storage_proxy& proxy, const sstring& schema_table_name, const std::set<sstring>& keyspace_names)
{
Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
for (String keyspaceName : keyspaceNames)
{
Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName);
if (schemaEntity.cf != null)
schema.put(schemaEntity.key, schemaEntity.cf);
}
return schema;
auto map = [&proxy, schema_table_name] (sstring keyspace_name) { return read_schema_partition_for_keyspace(proxy, schema_table_name, keyspace_name); };
auto insert = [] (schema_result&& schema, auto&& schema_entity) {
if (schema_entity.second) {
schema.insert(std::move(schema_entity));
}
return std::move(schema);
};
return map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result(), insert);
}
#if 0
private static ByteBuffer getSchemaKSKey(String ksName)
{
return AsciiType.instance.fromString(ksName);
}
#endif
private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName)
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name)
{
DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey);
auto schema = proxy.get_db().local().find_schema(system_keyspace::NAME, schema_table_name);
auto keyspace_key = dht::global_partitioner().decorate_key(partition_key::from_single_value(*schema, to_bytes(keyspace_name)));
return read_schema_partition_for_keyspace(proxy, schema_table_name, keyspace_key);
}
private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey)
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key)
{
QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis());
return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter));
return proxy.query_local(system_keyspace::NAME, schema_table_name, keyspace_key).then([keyspace_key] (auto&& rs) {
return std::make_pair(keyspace_key, std::move(rs));
});
}
#if 0
private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName)
{
DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
@@ -461,90 +474,116 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
{
#if 0
schema_ptr s = keyspaces();
// compare before/after schemas of the affected keyspaces only
Set<String> keyspaces = new HashSet<>(mutations.size());
for (Mutation mutation : mutations)
keyspaces.add(ByteBufferUtil.string(mutation.key()));
std::set<sstring> keyspaces;
for (auto&& mutation : mutations) {
keyspaces.emplace(boost::any_cast<sstring>(utf8_type->deserialize(mutation.key().get_component(*s, 0))));
}
// current state of the schema
Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
#endif
return proxy.mutate_locally(std::move(mutations)).then([] {
auto old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto old_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces);
auto old_types = read_schema_for_keyspaces(proxy, USERTYPES, keyspaces);
auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
auto old_aggregates = read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces);
return when_all(std::move(old_keyspaces), std::move(old_column_families), std::move(old_types),
std::move(old_functions), std::move(old_aggregates)).then([&proxy, keyspaces, mutations = std::move(mutations)] (auto&& old_results) mutable {
return proxy.mutate_locally(std::move(mutations)).then([&proxy, keyspaces, old_results = std::move(old_results)] () mutable {
#if 0
if (doFlush)
flushSchemaTables();
// with new data applied
Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
mergeTables(oldColumnFamilies, newColumnFamilies);
mergeTypes(oldTypes, newTypes);
mergeFunctions(oldFunctions, newFunctions);
mergeAggregates(oldAggregates, newAggregates);
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
for (String keyspaceToDrop : keyspacesToDrop)
Schema.instance.dropKeyspace(keyspaceToDrop);
if (doFlush)
flushSchemaTables();
#endif
return make_ready_future<>();
// with new data applied
auto new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto new_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces);
auto new_types = read_schema_for_keyspaces(proxy, USERTYPES, keyspaces);
auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
auto new_aggregates = read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces);
// FIXME: Make the update atomic like in Origin.
return when_all(std::move(new_keyspaces), std::move(new_column_families), std::move(new_types),
std::move(new_functions), std::move(new_aggregates)).then([&proxy, old_results = std::move(old_results)] (auto&& new_results) mutable {
auto old_keyspaces = std::move(std::get<schema_result>(std::get<0>(old_results).get()));
auto old_column_families = std::move(std::get<schema_result>(std::get<1>(old_results).get()));
auto old_types = std::move(std::get<schema_result>(std::get<2>(old_results).get()));
auto old_functions = std::move(std::get<schema_result>(std::get<3>(old_results).get()));
auto old_aggregates = std::move(std::get<schema_result>(std::get<4>(old_results).get()));
auto new_keyspaces = std::move(std::get<schema_result>(std::get<0>(new_results).get()));
auto new_column_families = std::move(std::get<schema_result>(std::get<1>(new_results).get()));
auto new_types = std::move(std::get<schema_result>(std::get<2>(new_results).get()));
auto new_functions = std::move(std::get<schema_result>(std::get<3>(new_results).get()));
auto new_aggregates = std::move(std::get<schema_result>(std::get<4>(new_results).get()));
auto keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
#if 0
mergeTables(oldColumnFamilies, newColumnFamilies);
mergeTypes(oldTypes, newTypes);
mergeFunctions(oldFunctions, newFunctions);
mergeAggregates(oldAggregates, newAggregates);
#endif
return when_all(std::move(keyspaces_to_drop)).then([proxy] (auto&& results) mutable {
auto keyspaces_to_drop = std::move(std::get<std::set<sstring>>(std::get<0>(results).get()));
return proxy.get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
for (auto&& keyspace_to_drop : keyspaces_to_drop) {
db.drop_keyspace(keyspace_to_drop);
}
});
});
});
});
});
}
future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after)
{
std::vector<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>> created;
std::vector<sstring> altered;
std::set<sstring> dropped;
for (auto&& right : after) {
auto left = before.find(right.first);
if (left != before.end()) {
schema_ptr s = keyspaces();
auto b = left->first._key.get_component(*s, 0);
sstring keyspace_name = boost::any_cast<sstring>(utf8_type->deserialize(b));
auto&& pre = left->second;
auto&& post = right.second;
if (!pre->empty() && !post->empty()) {
altered.emplace_back(keyspace_name);
} else if (!pre->empty()) {
dropped.emplace(keyspace_name);
} else if (!post->empty()) { // a (re)created keyspace
created.emplace_back(std::move(right));
}
} else {
if (!right.second->empty()) {
created.emplace_back(std::move(right));
}
}
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) {
return proxy.get_db().invoke_on_all([&created, altered = std::move(altered)] (database& db) {
for (auto&& kv : created) {
auto ksm = create_keyspace_from_schema_partition(kv);
keyspace k;
k.create_replication_strategy(*ksm);
db.add_keyspace(ksm->name, std::move(k));
}
for (auto&& name : altered) {
db.update_keyspace(name);
}
});
}).then([dropped = std::move(dropped)] () {
return make_ready_future<std::set<sstring>>(dropped);
});
}
#if 0
private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
{
List<Row> created = new ArrayList<>();
List<String> altered = new ArrayList<>();
Set<String> dropped = new HashSet<>();
/*
* - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
* - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
* there that only has the top-level deletion, if:
* a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
* b) a pulled dropped keyspace that got dropped before it could find a way to this node
* - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
* that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
* to this node
*/
MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
if (entry.getValue().hasColumns())
created.add(new Row(entry.getKey(), entry.getValue()));
for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
{
String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
ColumnFamily pre = entry.getValue().leftValue();
ColumnFamily post = entry.getValue().rightValue();
if (pre.hasColumns() && post.hasColumns())
altered.add(keyspaceName);
else if (pre.hasColumns())
dropped.add(keyspaceName);
else if (post.hasColumns()) // a (re)created keyspace
created.add(new Row(entry.getKey(), post));
}
for (Row row : created)
Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row));
for (String name : altered)
Schema.instance.updateKeyspace(name);
return dropped;
}
// see the comments for mergeKeyspaces()
private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
{
@@ -765,14 +804,16 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
* Keyspace metadata serialization/deserialization.
*/
mutation make_create_keyspace_mutation(lw_shared_ptr<config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions)
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions)
{
std::vector<mutation> mutations;
schema_ptr s = keyspaces();
auto pkey = partition_key::from_exploded(*s, {utf8_type->decompose(keyspace->name)});
mutation m(pkey, s);
exploded_clustering_prefix ckey;
m.set_cell(ckey, "durable_writes", keyspace->durable_writes, timestamp);
m.set_cell(ckey, "strategy_class", keyspace->strategy_name, timestamp);
mutations.emplace_back(std::move(m));
#if 0
adder.add("strategy_options", json(keyspace.strategyOptions));
#endif
@@ -783,11 +824,10 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
addTypeToSchemaMutation(type, timestamp, mutation);
#endif
for (auto&& kv : keyspace->cf_meta_data()) {
add_table_to_schema_mutation(kv.second, timestamp, true, m);
add_table_to_schema_mutation(kv.second, timestamp, true, pkey, mutations);
}
}
return m;
return mutations;
}
#if 0
@@ -816,29 +856,28 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
return createKeyspaceFromSchemaPartition(partition);
}
#endif
/**
* Deserialize only Keyspace attributes without nested tables or types
*
* @param partition Keyspace attributes in serialized form
*/
private static KSMetaData createKeyspaceFromSchemaPartition(Row partition)
lw_shared_ptr<config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>& result)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES);
UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
try
{
return new KSMetaData(row.getString("keyspace_name"),
AbstractReplicationStrategy.getClass(row.getString("strategy_class")),
fromJsonMap(row.getString("strategy_options")),
row.getBoolean("durable_writes"));
}
catch (ConfigurationException e)
{
throw new RuntimeException(e);
auto&& rs = result.second;
if (rs->empty()) {
throw std::runtime_error("query result has no rows");
}
auto&& row = rs->row(0);
auto keyspace_name = row.get_nonnull<sstring>("keyspace_name");
auto strategy_name = row.get_nonnull<sstring>("strategy_class");
std::unordered_map<sstring, sstring> strategy_options;
bool durable_writes = row.get_nonnull<bool>("durable_writes");
return make_lw_shared<config::ks_meta_data>(keyspace_name, strategy_name, strategy_options, durable_writes);
}
#if 0
/*
* User type metadata serialization/deserialization.
*/
@@ -925,19 +964,19 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
}
#endif
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, mutation& m)
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector<mutation>& mutations)
{
throw std::runtime_error("not implemented");
#if 0
// For property that can be null (and can be changed), we insert tombstones, to make sure
// we don't keep a property the user has removed
ColumnFamily cells = mutation.addOrGet(Columnfamilies);
Composite prefix = Columnfamilies.comparator.make(table.cfName);
CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
adder.add("cf_id", table.cfId);
adder.add("type", table.cfType.toString());
schema_ptr s = columnfamilies();
mutation m{pkey, s};
mutations.emplace_back(std::move(m));
auto ckey = clustering_key::from_single_value(*s, to_bytes(table->cf_name()));
m.set_clustered_cell(ckey, "cf_id", table->id(), timestamp);
#if 0
m.set_clustered_cell(ckey, "type", table.cfType.toString(), timestamp);
#endif
#if 0
if (table.isSuper())
{
// We need to continue saving the comparator and subcomparator separatly, otherwise
@@ -950,10 +989,14 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
{
adder.add("comparator", table.comparator.toString());
}
#endif
#if 0
adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance());
adder.add("caching", table.getCaching().toString());
adder.add("comment", table.getComment());
#endif
m.set_clustered_cell(ckey, "comment", table->comment(), timestamp);
#if 0
adder.add("compaction_strategy_class", table.compactionStrategyClass.getName());
adder.add("compaction_strategy_options", json(table.compactionStrategyOptions));
adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions()));

View File

@@ -29,11 +29,18 @@
#include "schema.hh"
#include <vector>
#include <map>
namespace query {
class result_set;
}
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db {
namespace legacy_schema_tables {
using schema_result = std::map<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>;
static constexpr auto KEYSPACES = "schema_keyspaces";
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
static constexpr auto COLUMNS = "schema_columns";
@@ -46,13 +53,23 @@ extern std::vector<const char*> ALL;
std::vector<schema_ptr> all_tables();
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key);
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations);
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);
mutation make_create_keyspace_mutation(lw_shared_ptr<config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after);
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, mutation& m);
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
lw_shared_ptr<config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>& partition);
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector<mutation>& mutations);
} // namespace legacy_schema_tables
} // namespace db

View File

@@ -274,6 +274,12 @@ public:
public:
using compound = lw_shared_ptr<compound_type<allow_prefixes::no>>;
bytes_view get_component(const schema& s, size_t idx) const {
auto it = begin(s);
std::advance(it, idx);
return *it;
}
static partition_key from_bytes(bytes b) {
return partition_key(std::move(b));
}

View File

@@ -23,6 +23,15 @@ void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, cons
update_column(row, def, std::move(value));
}
void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value,
api::timestamp_type timestamp, ttl_opt ttl) {
auto column_def = _schema->get_column_definition(name);
if (!column_def) {
throw std::runtime_error(sprint("no column definition found for '%s'", name));
}
return set_clustered_cell(key, *column_def, atomic_cell::make_live(timestamp, ttl, column_def->type->decompose(value)));
}
void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = _p.clustered_row(key).cells;
update_column(row, def, std::move(value));

View File

@@ -23,6 +23,7 @@ public:
mutation(const mutation&) = default;
void set_static_cell(const column_definition& def, atomic_cell_or_collection value);
void set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value);
void set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl = {});
void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value);
void set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl = {});
void set_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value);

106
query-result-set.cc Normal file
View File

@@ -0,0 +1,106 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include "query-result-set.hh"
namespace query {
result_set_builder::result_set_builder(schema_ptr schema)
: _schema{schema}
{ }
lw_shared_ptr<result_set> result_set_builder::build() const {
return make_lw_shared<result_set>(_rows);
}
void result_set_builder::accept_new_partition(const partition_key& key, uint32_t row_count)
{
_pkey_cells = deserialize(key);
}
void result_set_builder::accept_new_partition(uint32_t row_count)
{
}
void result_set_builder::accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row)
{
auto ckey_cells = deserialize(key);
auto static_cells = deserialize(static_row, true);
auto regular_cells = deserialize(row, false);
std::unordered_map<sstring, boost::any> cells;
cells.insert(_pkey_cells.begin(), _pkey_cells.end());
cells.insert(ckey_cells.begin(), ckey_cells.end());
cells.insert(static_cells.begin(), static_cells.end());
cells.insert(regular_cells.begin(), regular_cells.end());
_rows.emplace_back(_schema, std::move(cells));
}
void result_set_builder::accept_new_row(const query::result_row_view &static_row, const query::result_row_view &row)
{
auto static_cells = deserialize(static_row, true);
auto regular_cells = deserialize(row, false);
std::unordered_map<sstring, boost::any> cells;
cells.insert(_pkey_cells.begin(), _pkey_cells.end());
cells.insert(static_cells.begin(), static_cells.end());
cells.insert(regular_cells.begin(), regular_cells.end());
_rows.emplace_back(_schema, std::move(cells));
}
void result_set_builder::accept_partition_end(const result_row_view& static_row)
{
_pkey_cells.clear();
}
std::unordered_map<sstring, boost::any>
result_set_builder::deserialize(const partition_key& key)
{
std::unordered_map<sstring, boost::any> cells;
auto i = key.begin(*_schema);
for (auto&& col : _schema->partition_key_columns()) {
cells.emplace(col.name_as_text(), col.type->deserialize(*i));
++i;
}
return cells;
}
std::unordered_map<sstring, boost::any>
result_set_builder::deserialize(const clustering_key& key)
{
std::unordered_map<sstring, boost::any> cells;
auto i = key.begin(*_schema);
for (auto&& col : _schema->clustering_key_columns()) {
cells.emplace(col.name_as_text(), col.type->deserialize(*i));
++i;
}
return cells;
}
std::unordered_map<sstring, boost::any>
result_set_builder::deserialize(const result_row_view& row, bool is_static)
{
std::unordered_map<sstring, boost::any> cells;
auto i = row.iterator();
auto columns = is_static ? _schema->static_columns() : _schema->regular_columns();
for (auto &&col : columns) {
if (col.is_atomic()) {
auto cell = i.next_atomic_cell();
if (cell) {
auto view = cell.value();
cells.emplace(col.name_as_text(), col.type->deserialize(view.value()));
}
} else {
auto cell = i.next_collection_cell();
if (cell) {
auto ctype = static_pointer_cast<collection_type_impl>(col.type);
auto view = cell.value();
cells.emplace(col.name_as_text(), ctype->deserialize(view.data, serialization_format::internal()));
}
}
}
return cells;
}
}

102
query-result-set.hh Normal file
View File

@@ -0,0 +1,102 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include "query-result-reader.hh"
#include "core/shared_ptr.hh"
#include <experimental/optional>
#include <stdexcept>
#include <boost/any.hpp>
namespace query {
class no_such_column : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
class null_column_value : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
// Result set row is a set of cells that are associated with a row
// including regular column cells, partition keys, as well as static values.
class result_set_row {
schema_ptr _schema;
std::unordered_map<sstring, boost::any> _cells;
public:
result_set_row(schema_ptr schema, std::unordered_map<sstring, boost::any>&& cells)
: _schema{schema}
, _cells{std::move(cells)}
{ }
// Look up a deserialized row cell value by column name.
template<typename T>
std::experimental::optional<T>
get(const sstring& column_name) const throw (no_such_column) {
auto it = _cells.find(column_name);
if (it == _cells.end()) {
throw no_such_column(column_name);
}
if (it->second.empty()) {
return std::experimental::nullopt;
}
return std::experimental::optional<T>{boost::any_cast<T>(it->second)};
}
template<typename T>
T get_nonnull(const sstring& column_name) const throw (no_such_column, null_column_value) {
auto v = get<T>(column_name);
if (v) {
return *v;
}
throw null_column_value(column_name);
}
};
// Result set is an in-memory representation of query results in
// deserialized format. To obtain a result set, use the result_set_builder
// class as a visitor to query_result::consume() function.
class result_set {
std::vector<result_set_row> _rows;
public:
result_set(const std::vector<result_set_row>& rows)
: _rows{std::move(rows)}
{ }
bool empty() const {
return _rows.empty();
}
const result_set_row& row(size_t idx) const throw (std::out_of_range) {
if (idx >= _rows.size()) {
throw std::out_of_range("no such row in result set: " + std::to_string(idx));
}
return _rows[idx];
}
};
// Result set builder is passed as a visitor to query_result::consume()
// function. You can call the build() method to obtain a result set that
// contains cells from the visited results.
class result_set_builder {
schema_ptr _schema;
std::vector<result_set_row> _rows;
std::unordered_map<sstring, boost::any> _pkey_cells;
public:
result_set_builder(schema_ptr schema);
lw_shared_ptr<result_set> build() const;
void accept_new_partition(const partition_key& key, uint32_t row_count);
void accept_new_partition(uint32_t row_count);
void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row);
void accept_new_row(const result_row_view &static_row, const result_row_view &row);
void accept_partition_end(const result_row_view& static_row);
private:
std::unordered_map<sstring, boost::any> deserialize(const partition_key& key);
std::unordered_map<sstring, boost::any> deserialize(const clustering_key& key);
std::unordered_map<sstring, boost::any> deserialize(const result_row_view& row, bool is_static);
};
}

View File

@@ -122,6 +122,9 @@ public:
const utils::UUID& id() const {
return _raw._id;
}
const sstring& comment() const {
return _raw._comment;
}
void set_comment(const sstring& comment) {
_raw._comment = comment;
}

View File

@@ -287,7 +287,8 @@ public:
logger.info(String.format("Create new Keyspace: %s", ksm));
#endif
return announce(proxy, db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally);
auto mutations = db::legacy_schema_tables::make_create_keyspace_mutations(ksm, timestamp);
return announce(proxy, std::move(mutations), announce_locally);
}
#if 0
@@ -442,6 +443,11 @@ public:
{
std::vector<mutation> mutations;
mutations.emplace_back(std::move(schema));
return announce(proxy, std::move(mutations), announce_locally);
}
static future<> announce(service::storage_proxy& proxy, std::vector<mutation> mutations, bool announce_locally)
{
if (announce_locally) {
return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false);
} else {

View File

@@ -27,6 +27,9 @@
#include "unimplemented.hh"
#include "query_result_merger.hh"
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/adaptor/transformed.hpp>
namespace service {
#if 0
@@ -1208,6 +1211,31 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
}).finally([cmd] {});
}
future<foreign_ptr<lw_shared_ptr<query::result_set>>>
storage_proxy::query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key)
{
auto shard = _db.local().shard_of(key._token);
return _db.invoke_on(shard, [ks_name, cf_name, key] (database& db) {
auto schema = db.find_schema(ks_name, cf_name);
std::vector<query::clustering_range> row_ranges = {query::clustering_range::make_open_ended_both_sides()};
std::vector<column_id> regular_cols;
boost::range::push_back(regular_cols, schema->regular_columns() | boost::adaptors::transformed([] (auto&& col) { return col.id; }));
auto opts = query::partition_slice::option_set::of<
query::partition_slice::option::send_partition_key>();
query::partition_slice slice{row_ranges, {}, regular_cols, opts};
std::vector<query::partition_range> pr = {query::partition_range::make_open_ended_both_sides()};
auto id = db.find_uuid(ks_name, cf_name);
auto cmd = make_lw_shared<query::read_command>(id, pr, slice, std::numeric_limits<uint32_t>::max());
return db.query(*cmd).then([key, schema, slice](lw_shared_ptr<query::result>&& result) {
query::result_set_builder builder{schema};
bytes_ostream w(result->buf());
query::result_view view(w.linearize());
view.consume(slice, builder);
return make_foreign(builder.build());
}).finally([cmd] {});
});
}
#if 0
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException

View File

@@ -27,6 +27,7 @@
#include "database.hh"
#include "query-request.hh"
#include "query-result.hh"
#include "query-result-set.hh"
#include "core/distributed.hh"
#include "db/consistency_level.hh"
@@ -38,6 +39,10 @@ private:
public:
storage_proxy(distributed<database>& db) : _db(db) {}
distributed<database>& get_db() {
return _db;
}
future<> mutate_locally(const mutation& m);
future<> mutate_locally(std::vector<mutation> mutations);
@@ -67,6 +72,8 @@ public:
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<query::result>>> query(lw_shared_ptr<query::read_command> cmd, db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<query::result_set>>> query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key);
};
}

View File

@@ -18,7 +18,9 @@
SEASTAR_TEST_CASE(test_create_keyspace_statement) {
return do_with_cql_env([] (auto& e) {
return e.execute_cql("create keyspace ks with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").discard_result();
return e.execute_cql("create keyspace ks2 with replication = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor' : 1 };").discard_result().then([&e] {
return e.require_keyspace_exists("ks2");
});
});
}

View File

@@ -105,6 +105,12 @@ public:
});
}
virtual future<> require_keyspace_exists(const sstring& ks_name) override {
auto& db = _db->local();
assert(db.has_keyspace(ks_name));
return make_ready_future<>();
}
virtual future<> require_column_has_value(const sstring& table_name,
std::vector<boost::any> pk,
std::vector<boost::any> ck,

View File

@@ -34,6 +34,8 @@ public:
virtual future<> create_table(std::function<schema(const sstring&)> schema_maker) = 0;
virtual future<> require_keyspace_exists(const sstring& ks_name) = 0;
virtual future<> require_column_has_value(
const sstring& table_name,
std::vector<boost::any> pk,