Merge branch 'penberg/keyspace-merging-improvements/v2' from seastar-dev.git

From Pekka:

 "This series implements a Maps.difference() function in C++, changes
 storage_proxy::query_local() to not return foreign_ptr>, and finally
 changes the keyspace merging code to follow Origin."
This commit is contained in:
Tomasz Grabiec
2015-05-12 13:08:02 +02:00
7 changed files with 238 additions and 49 deletions

View File

@@ -170,6 +170,7 @@ urchin_tests = [
'tests/cartesian_product_test',
'tests/urchin/hash_test',
'tests/urchin/serializer_test',
'tests/urchin/map_difference_test',
'tests/urchin/message',
'tests/urchin/gossip',
'tests/urchin/compound_test',

View File

@@ -28,6 +28,7 @@
#include "system_keyspace.hh"
#include "query-result-set.hh"
#include "map_difference.hh"
#include "core/do_with.hh"
@@ -418,7 +419,7 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
}
#endif
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
future<std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name)
{
auto schema = proxy.get_db().local().find_schema(system_keyspace::NAME, schema_table_name);
@@ -427,7 +428,7 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
return read_schema_partition_for_keyspace(proxy, schema_table_name, keyspace_key);
}
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
future<std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key)
{
return proxy.query_local(system_keyspace::NAME, schema_table_name, keyspace_key).then([keyspace_key] (auto&& rs) {
@@ -544,29 +545,41 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after)
{
std::vector<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>> created;
std::vector<std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>> created;
std::vector<sstring> altered;
std::set<sstring> dropped;
for (auto&& right : after) {
auto left = before.find(right.first);
if (left != before.end()) {
schema_ptr s = keyspaces();
auto b = left->first._key.get_component(*s, 0);
sstring keyspace_name = boost::any_cast<sstring>(utf8_type->deserialize(b));
auto&& pre = left->second;
auto&& post = right.second;
if (!pre->empty() && !post->empty()) {
altered.emplace_back(keyspace_name);
} else if (!pre->empty()) {
dropped.emplace(keyspace_name);
} else if (!post->empty()) { // a (re)created keyspace
created.emplace_back(std::move(right));
}
} else {
if (!right.second->empty()) {
created.emplace_back(std::move(right));
}
/*
* - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
* - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
* there that only has the top-level deletion, if:
* a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
* b) a pulled dropped keyspace that got dropped before it could find a way to this node
* - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
* that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
* to this node
*/
auto diff = difference(before, after);
for (auto&& entry : diff.entries_only_on_right) {
if (!entry.second->empty()) {
created.emplace_back(std::move(entry));
}
}
for (auto&& entry : diff.entries_differing) {
schema_ptr s = keyspaces();
auto b = entry.first._key.get_component(*s, 0);
sstring keyspace_name = boost::any_cast<sstring>(utf8_type->deserialize(b));
auto&& pre = entry.second.left_value;
auto&& post = entry.second.right_value;
if (!pre->empty() && !post->empty()) {
altered.emplace_back(keyspace_name);
} else if (!pre->empty()) {
dropped.emplace(keyspace_name);
} else if (!post->empty()) { // a (re)created keyspace
created.emplace_back(entry.first, std::move(post));
}
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) {
@@ -866,7 +879,7 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
*
* @param partition Keyspace attributes in serialized form
*/
lw_shared_ptr<::config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>& result)
lw_shared_ptr<::config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>& result)
{
auto&& rs = result.second;
if (rs->empty()) {

View File

@@ -39,7 +39,7 @@ class result_set;
namespace db {
namespace legacy_schema_tables {
using schema_result = std::map<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>,
using schema_result = std::map<dht::decorated_key, lw_shared_ptr<query::result_set>,
dht::decorated_key::less_comparator>;
static constexpr auto KEYSPACES = "schema_keyspaces";
@@ -54,10 +54,10 @@ extern std::vector<const char*> ALL;
std::vector<schema_ptr> all_tables();
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
future<std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
future<std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>>
future<std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>>
read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key);
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations);
@@ -68,7 +68,7 @@ future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<::config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
lw_shared_ptr<::config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, foreign_ptr<lw_shared_ptr<query::result_set>>>& partition);
lw_shared_ptr<::config::ks_meta_data> create_keyspace_from_schema_partition(const std::pair<dht::decorated_key, lw_shared_ptr<query::result_set>>& partition);
mutation make_create_keyspace_mutation(lw_shared_ptr<::config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);

75
map_difference.hh Normal file
View File

@@ -0,0 +1,75 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <map>
template<typename Tp>
struct value_difference {
Tp left_value;
Tp right_value;
value_difference(const Tp& left_value_, const Tp& right_value_)
: left_value(left_value_)
, right_value(right_value_)
{ }
};
template<typename Key, typename Tp, typename Compare, typename Alloc>
struct map_difference {
// Entries in left map whose keys don't exist in the right map.
std::map<Key, Tp, Compare> entries_only_on_left;
// Entries in right map whose keys don't exist in the left map.
std::map<Key, Tp, Compare> entries_only_on_right;
// Entries that appear in both maps with the same value.
std::map<Key, Tp, Compare> entries_in_common;
// Entries that appear in both maps but have different values.
std::map<Key, value_difference<Tp>, Compare> entries_differing;
map_difference(const Compare& cmp, const Alloc& alloc)
: entries_only_on_left{cmp, alloc}
, entries_only_on_right{cmp, alloc}
, entries_in_common{cmp, alloc}
, entries_differing{cmp, alloc}
{ }
};
template<typename Key, typename Tp, typename Compare, typename Alloc>
inline
map_difference<Key, Tp, Compare, Alloc>
difference(const std::map<Key, Tp, Compare, Alloc>& left,
const std::map<Key, Tp, Compare, Alloc>& right,
const Compare& key_comp, const Alloc& alloc) {
map_difference<Key, Tp, Compare, Alloc> diff{key_comp, alloc};
diff.entries_only_on_right = right;
for (auto&& kv : left) {
auto&& left_key = kv.first;
auto&& it = right.find(left_key);
if (it != right.end()) {
diff.entries_only_on_right.erase(left_key);
auto&& left_value = kv.second;
auto&& right_value = it->second;
if (left_value == right_value) {
diff.entries_in_common.emplace(kv);
} else {
value_difference<Tp> value_diff{left_value, right_value};
diff.entries_differing.emplace(left_key, std::move(value_diff));
}
} else {
diff.entries_only_on_left.emplace(kv);
}
}
return diff;
}
template<typename Key, typename Tp, typename Compare, typename Alloc>
inline
map_difference<Key, Tp, Compare, Alloc>
difference(const std::map<Key, Tp, Compare, Alloc>& left, const std::map<Key, Tp, Compare, Alloc>& right) {
return difference(left, right, left.key_comp(), left.get_allocator());
}

View File

@@ -1222,30 +1222,38 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
}).finally([cmd] {});
}
future<foreign_ptr<lw_shared_ptr<query::result_set>>>
// The query_local() method returns a result set value object (which is
// copyable) that is accessible on the local CPU without having to use
// the foreign_ptr<> annotation. The result set object is constructed by
// first performing the query on shard CPU and the building the result
// set on the local CPU.
future<lw_shared_ptr<query::result_set>>
storage_proxy::query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key)
{
auto shard = _db.local().shard_of(key._token);
return _db.invoke_on(shard, [ks_name, cf_name, key] (database& db) {
auto schema = db.find_schema(ks_name, cf_name);
std::vector<query::clustering_range> row_ranges = {query::clustering_range::make_open_ended_both_sides()};
std::vector<column_id> regular_cols;
boost::range::push_back(regular_cols, schema->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
std::vector<column_id> static_cols;
boost::range::push_back(static_cols, schema->static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
auto opts = query::partition_slice::option_set::of<
query::partition_slice::option::send_partition_key,
query::partition_slice::option::send_clustering_key>();
query::partition_slice slice{row_ranges, static_cols, regular_cols, opts};
std::vector<query::partition_range> pr = {query::partition_range::make_open_ended_both_sides()};
auto cmd = make_lw_shared<query::read_command>(schema->id(), pr, slice, std::numeric_limits<uint32_t>::max());
return db.query(*cmd).then([key, schema, slice](lw_shared_ptr<query::result>&& result) {
query::result_set_builder builder{schema};
bytes_ostream w(result->buf());
query::result_view view(w.linearize());
view.consume(slice, builder);
return make_foreign(builder.build());
auto&& db = _db.local();
auto schema = db.find_schema(ks_name, cf_name);
std::vector<query::clustering_range> row_ranges = {query::clustering_range::make_open_ended_both_sides()};
std::vector<column_id> regular_cols;
boost::range::push_back(regular_cols, schema->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
std::vector<column_id> static_cols;
boost::range::push_back(static_cols, schema->static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
auto opts = query::partition_slice::option_set::of<
query::partition_slice::option::send_partition_key,
query::partition_slice::option::send_clustering_key>();
query::partition_slice slice{row_ranges, static_cols, regular_cols, opts};
std::vector<query::partition_range> pr = {query::partition_range::make_open_ended_both_sides()};
auto cmd = make_lw_shared<query::read_command>(schema->id(), pr, slice, std::numeric_limits<uint32_t>::max());
auto shard = db.shard_of(key._token);
return _db.invoke_on(shard, [cmd] (database& db) {
return db.query(*cmd).then([] (lw_shared_ptr<query::result>&& result) {
return make_foreign(std::move(result));
}).finally([cmd] {});
}).then([this, schema, slice] (auto&& result) {
query::result_set_builder builder{schema};
bytes_ostream w(result->buf());
query::result_view view(w.linearize());
view.consume(slice, builder);
return builder.build();
});
}

View File

@@ -74,7 +74,7 @@ public:
future<foreign_ptr<lw_shared_ptr<query::result>>> query(lw_shared_ptr<query::read_command> cmd, db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<query::result_set>>> query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key);
future<lw_shared_ptr<query::result_set>> query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key);
};
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright 2015 Cloudius Systems
*/
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE core
#include <boost/test/unit_test.hpp>
#include "map_difference.hh"
#include <map>
using namespace std;
BOOST_AUTO_TEST_CASE(both_empty) {
map<int, int> left;
map<int, int> right;
auto diff = difference(left, right);
BOOST_REQUIRE(diff.entries_only_on_left.empty());
BOOST_REQUIRE(diff.entries_only_on_right.empty());
BOOST_REQUIRE(diff.entries_in_common.empty());
BOOST_REQUIRE(diff.entries_differing.empty());
}
BOOST_AUTO_TEST_CASE(left_empty) {
map<int, int> left;
map<int, int> right;
right.emplace(1, 100);
right.emplace(2, 200);
auto diff = difference(left, right);
BOOST_REQUIRE(diff.entries_only_on_left.empty());
BOOST_REQUIRE(diff.entries_only_on_right == right);
BOOST_REQUIRE(diff.entries_in_common.empty());
BOOST_REQUIRE(diff.entries_differing.empty());
}
BOOST_AUTO_TEST_CASE(right_empty) {
map<int, int> left;
map<int, int> right;
left.emplace(1, 100);
left.emplace(2, 200);
auto diff = difference(left, right);
BOOST_REQUIRE(diff.entries_only_on_left == left);
BOOST_REQUIRE(diff.entries_only_on_right.empty());
BOOST_REQUIRE(diff.entries_in_common.empty());
BOOST_REQUIRE(diff.entries_differing.empty());
}
BOOST_AUTO_TEST_CASE(both_same) {
map<int, int> left;
map<int, int> right;
left.emplace(1, 100);
left.emplace(2, 200);
right.emplace(1, 100);
right.emplace(2, 200);
auto diff = difference(left, right);
BOOST_REQUIRE(diff.entries_only_on_left.empty());
BOOST_REQUIRE(diff.entries_only_on_right.empty());
BOOST_REQUIRE(diff.entries_in_common == left);
BOOST_REQUIRE(diff.entries_differing.empty());
}
BOOST_AUTO_TEST_CASE(differing_values) {
map<int, int> left;
map<int, int> right;
left.emplace(1, 100);
left.emplace(2, 200);
right.emplace(1, 1000);
right.emplace(2, 2000);
auto diff = difference(left, right);
BOOST_REQUIRE(diff.entries_only_on_left.empty());
BOOST_REQUIRE(diff.entries_only_on_right.empty());
BOOST_REQUIRE(diff.entries_in_common.empty());
BOOST_REQUIRE(diff.entries_differing.size() == 2);
}