Merge "snitch creation"

From Vlad:

"Currently database always created a SimpleSnitch and ignores the corresponding parameter
provided by the user. This series fixes this situation:
   - Changes the snitch creation interface to comply the Java-like interface that
     has already been used in a topology_strategy classes family.
   - Fix all the places where a SimpleSnitch has been created ignoring the user configuration."
This commit is contained in:
Avi Kivity
2015-06-14 17:59:15 +03:00
13 changed files with 83 additions and 51 deletions

View File

@@ -453,6 +453,8 @@ urchin_core = (['database.cc',
'locator/token_metadata.cc',
'locator/locator.cc',
'locator/snitch_base.cc',
'locator/simple_snitch.cc',
'locator/rack_inferring_snitch.cc',
'locator/gossiping_property_file_snitch.cc',
'message/messaging_service.cc',
'service/storage_service.cc',

View File

@@ -591,7 +591,7 @@ create_keyspace(distributed<database>& db, const lw_shared_ptr<keyspace_metadata
auto cfg = db.make_keyspace_config(*ksm);
keyspace ks(ksm, cfg);
auto fu = ks.create_replication_strategy();
auto fu = ks.create_replication_strategy(db.get_snitch_name());
return fu.then([&db, ks = std::move(ks), ksm] () mutable {
db.add_keyspace(ksm->name(), std::move(ks));
@@ -704,7 +704,7 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const
}
future<>
keyspace::create_replication_strategy() {
keyspace::create_replication_strategy(const sstring& snitch_name) {
using namespace locator;
static thread_local token_metadata tm;
@@ -721,7 +721,8 @@ keyspace::create_replication_strategy() {
tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3"));
tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4"));
return make_snitch<simple_snitch>().then(
// Fixme
return i_endpoint_snitch::create_snitch(snitch_name).then(
[this] (snitch_ptr&& s) {
_replication_strategy =
abstract_replication_strategy::create_replication_strategy(
@@ -780,7 +781,7 @@ database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
}
keyspace ks(ksm, std::move(make_keyspace_config(*ksm)));
auto fu = ks.create_replication_strategy();
auto fu = ks.create_replication_strategy(get_snitch_name());
return fu.then([ks = std::move(ks), ksm, this] () mutable {
_keyspaces.emplace(ksm->name(), std::move(ks));
@@ -1062,3 +1063,7 @@ database::stop() {
return val_pair.second.stop();
});
}
const sstring& database::get_snitch_name() const {
return _cfg->endpoint_snitch();
}

View File

@@ -218,7 +218,7 @@ public:
const lw_shared_ptr<keyspace_metadata>& metadata() const {
return _metadata;
}
future<> create_replication_strategy();
future<> create_replication_strategy(const sstring& snitch_name);
locator::abstract_replication_strategy& get_replication_strategy();
column_family::config make_column_family_config(const schema& s) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
@@ -310,6 +310,8 @@ public:
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
future<> apply(const frozen_mutation&);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm) const;
const sstring& get_snitch_name() const;
friend std::ostream& operator<<(std::ostream& out, const database& db);
friend future<> create_keyspace(distributed<database>&, const lw_shared_ptr<keyspace_metadata>&);
};

View File

@@ -34,6 +34,8 @@
#include "core/do_with.hh"
#include "json.hh"
#include "db/config.hh"
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/map.hpp>
@@ -591,7 +593,7 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
auto ksm = create_keyspace_from_schema_partition(val);
std::unique_ptr<keyspace>
k_ptr(new keyspace(ksm, db.make_keyspace_config(*ksm)));
auto fu = k_ptr->create_replication_strategy();
auto fu = k_ptr->create_replication_strategy(db.get_snitch_name());
temp_vec_ptr->emplace_back(ksm, std::move(k_ptr));
return fu;

View File

@@ -279,4 +279,11 @@ void gossiping_property_file_snitch::reload_gossiper_state()
#endif
// else this will eventually rerun at gossiperStarting()
}
namespace locator {
using registry = class_registrator<i_endpoint_snitch,
gossiping_property_file_snitch,
const sstring&>;
static registry registrator("org.apache.cassandra.locator.GossipingPropertyFileSnitch");
}
} // namespace locator

View File

@@ -54,17 +54,12 @@ public:
virtual void gossiper_starting() override;
virtual future<> stop() override;
private:
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
gossiping_property_file_snitch(
const sstring& fname = snitch_properties_filename);
private:
static logging::logger& logger() {
static thread_local logging::logger l("gossiping_property_file_snitch");
return l;
return i_endpoint_snitch::snitch_logger;
}
template <typename... Args>

View File

@@ -0,0 +1,6 @@
#include "locator/rack_inferring_snitch.hh"
namespace locator {
using registry = class_registrator<i_endpoint_snitch, rack_inferring_snitch>;
static registry registrator("org.apache.cassandra.locator.RackInferringSnitch");
}

View File

@@ -34,11 +34,7 @@ using inet_address = gms::inet_address;
* A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
* in the 2nd and 3rd octets of the ip address, respectively.
*/
class rack_inferring_snitch : public snitch_base {
private:
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
struct rack_inferring_snitch : public snitch_base {
rack_inferring_snitch() {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
@@ -47,7 +43,6 @@ private:
_snitch_is_ready.set_value();
}
public:
virtual sstring get_rack(inet_address endpoint) override {
return std::to_string((endpoint.raw_addr() >> 8) & 0xFF);
}

7
locator/simple_snitch.cc Normal file
View File

@@ -0,0 +1,7 @@
#include "locator/simple_snitch.hh"
#include "utils/class_registrator.hh"
namespace locator {
using registry = class_registrator<i_endpoint_snitch, simple_snitch>;
static registry registrator("org.apache.cassandra.locator.SimpleSnitch");
}

View File

@@ -27,14 +27,11 @@
namespace locator {
/**
* A simple endpoint snitch implementation that treats Strategy order as proximity,
* allowing non-read-repaired reads to prefer a single endpoint, which improves
* cache locality.
* A simple endpoint snitch implementation that treats Strategy order as
* proximity, allowing non-read-repaired reads to prefer a single endpoint,
* which improves cache locality.
*/
class simple_snitch : public snitch_base {
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
struct simple_snitch : public snitch_base {
simple_snitch() {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
@@ -43,7 +40,6 @@ class simple_snitch : public snitch_base {
_snitch_is_ready.set_value();
}
public:
virtual sstring get_rack(inet_address endpoint) override {
return "rack1";
}

View File

@@ -23,6 +23,9 @@
namespace locator {
thread_local logging::logger
i_endpoint_snitch::snitch_logger("snitch_logger");
std::vector<inet_address> snitch_base::get_sorted_list_by_proximity(
inet_address address,
std::unordered_set<inet_address>& unsorted_address) {

View File

@@ -26,6 +26,7 @@
#include "gms/inet_address.hh"
#include "core/shared_ptr.hh"
#include "utils/class_registrator.hh"
namespace locator {
@@ -34,11 +35,11 @@ struct i_endpoint_snitch;
typedef gms::inet_address inet_address;
typedef std::unique_ptr<i_endpoint_snitch> snitch_ptr;
template <typename SnitchClass, typename... A>
static future<snitch_ptr> make_snitch(A&&... a);
struct i_endpoint_snitch
{
template <typename... A>
static future<snitch_ptr> create_snitch(const sstring& snitch_name, A&&... a);
/**
* returns a String representing the rack this endpoint belongs to
*/
@@ -90,10 +91,8 @@ struct i_endpoint_snitch
virtual future<> stop() = 0;
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
protected:
static thread_local logging::logger snitch_logger;
promise<> _snitch_is_ready;
enum class snitch_state {
initializing,
@@ -103,27 +102,38 @@ protected:
} _state = snitch_state::initializing;
};
template <typename SnitchClass, typename... A>
static future<snitch_ptr> make_snitch(A&&... a) {
snitch_ptr s(new SnitchClass(std::forward<A>(a)...));
auto fu = s->_snitch_is_ready.get_future();
return fu.then_wrapped([s = std::move(s)] (auto&& f) mutable {
try {
f.get();
template <typename... A>
future<snitch_ptr> i_endpoint_snitch::create_snitch(
const sstring& snitch_name, A&&... a) {
return make_ready_future<snitch_ptr>(std::move(s));
} catch (...) {
auto eptr = std::current_exception();
auto fu = s->stop();
try {
snitch_ptr s(std::move(create_object<i_endpoint_snitch>(
snitch_name, std::forward<A>(a)...)));
auto fu = s->_snitch_is_ready.get_future();
return fu.then_wrapped([s = std::move(s)] (auto&& f) mutable {
try {
f.get();
return fu.then([eptr, s = std::move(s)] () mutable {
std::rethrow_exception(eptr);
// just to make a compiler happy
return make_ready_future<snitch_ptr>(std::move(s));
});
}
});
} catch (...) {
auto eptr = std::current_exception();
auto fu = s->stop();
return fu.then([eptr, s = std::move(s)] () mutable {
std::rethrow_exception(eptr);
// just to make a compiler happy
return make_ready_future<snitch_ptr>(std::move(s));
});
}
});
} catch (no_such_class& e) {
snitch_logger.error("{}", e.what());
throw;
} catch (...) {
throw;
}
}
class snitch_base : public i_endpoint_snitch {

View File

@@ -25,7 +25,9 @@ future<> one_test(const std::string& property_fname, bool exp_result) {
path fname(test_files_subdir);
fname /= path(property_fname);
return make_snitch<gossiping_property_file_snitch>(sstring(fname.string()))
return i_endpoint_snitch::create_snitch<const sstring&>(
"org.apache.cassandra.locator.GossipingPropertyFileSnitch",
sstring(fname.string()))
.then([exp_result] (snitch_ptr sptr) {
BOOST_CHECK(exp_result);
return sptr->stop();