locator: futurize snitch creation

- Forbid explicit snitch creation with constructor.
   - Allow the creation of snitches only with locator::make_snitch() template
     function.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>

New in v4:
   - Make sure the snitch is stopped before it's destroyed when _snitch_is_ready
     is returned in an exceptional state.

New in v2:
   - Change snitch_ptr to be std::unique_ptr<i_endpoint_snitch>
   - abstract_replication_strategy::create_replication_strategy(): explicitly
     specify (template) types of create_object() parameters.
   - Re-arrange the loop in marge_keyspaces() so that lambdas that depend on
     "this" complete before there is a chance that "this" gets destroyed.
   - create_keyspace(): Don't add a new keyspace if a keyspace with this name
     already exists.
   - i_endpoint_snitch: added a stop() virtual method
      - Added a stop() pure virtual method.
      - Added an enum class snitch_state and a _state member initialized to snitch_state::initializing,
        added an assert() in a destructor requiring _state to become snitch_state::stopped,
        which should be set when stop() is complete.
   - rack_inferring_snitch: added a stop() method.
   - simple_snitch: added a stop() method.
   - Added stop() methods to abstract_replication_strategy and keyspace.
   - Updated database::stop() to wait for all keyspaces in _keyspaces to stop.
This commit is contained in:
Vlad Zolotarov
2015-06-01 19:47:46 +03:00
parent aecc2b4279
commit a2594015f9
11 changed files with 173 additions and 43 deletions

View File

@@ -584,9 +584,15 @@ create_keyspace(distributed<database>& db, const lw_shared_ptr<keyspace_metadata
return make_directory(db.local()._cfg->data_file_directories()[0] + "/" + ksm->name()).then([ksm, &db] {
return db.invoke_on_all([ksm] (database& db) {
auto cfg = db.make_keyspace_config(*ksm);
keyspace ks(ksm, cfg);
ks.create_replication_strategy();
db.add_keyspace(ksm->name(), std::move(ks));
auto fu = ks.create_replication_strategy();
return fu.then([&db, ks = std::move(ks), ksm] () mutable {
db.add_keyspace(ksm->name(), std::move(ks));
return make_ready_future<>();
});
});
});
// FIXME: rollback on error, or keyspace directory remains on disk, poisoning
@@ -692,11 +698,13 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const
}
}
void
future<>
keyspace::create_replication_strategy() {
static thread_local locator::token_metadata tm;
static locator::simple_snitch snitch;
using namespace locator;
static thread_local token_metadata tm;
static std::unordered_map<sstring, sstring> options = {{"replication_factor", "3"}};
auto d2t = [](double d) {
unsigned long l = net::hton(static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max())));
std::array<int8_t, 8> a;
@@ -707,7 +715,16 @@ keyspace::create_replication_strategy() {
tm.update_normal_token({dht::token::kind::key, {d2t(1.0/4).data(), 8}}, to_sstring("127.0.0.2"));
tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3"));
tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4"));
_replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), tm, snitch, options);
return make_snitch<simple_snitch>().then(
[this] (snitch_ptr&& s) {
_replication_strategy =
abstract_replication_strategy::create_replication_strategy(
_metadata->name(), _metadata->strategy_name(),
tm, std::move(s), options);
return make_ready_future<>();
});
}
locator::abstract_replication_strategy&
@@ -750,15 +767,21 @@ schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_c
return find_column_family(uuid).schema();
}
keyspace&
database::find_or_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
future<>
database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
auto i = _keyspaces.find(ksm->name());
if (i != _keyspaces.end()) {
return i->second;
return make_ready_future<>();
}
keyspace ks(ksm, std::move(make_keyspace_config(*ksm)));
ks.create_replication_strategy();
return _keyspaces.emplace(ksm->name(), std::move(ks)).first->second;
auto fu = ks.create_replication_strategy();
return fu.then([ks = std::move(ks), ksm, this] () mutable {
_keyspaces.emplace(ksm->name(), std::move(ks));
return make_ready_future<>();
});
}
std::set<sstring>
@@ -1018,5 +1041,7 @@ operator<<(std::ostream& os, const atomic_cell& ac) {
future<>
database::stop() {
return make_ready_future<>();
return do_for_each(_keyspaces, [this] (auto& val_pair) {
return val_pair.second.stop();
});
}

View File

@@ -214,13 +214,20 @@ public:
const lw_shared_ptr<keyspace_metadata>& metadata() const {
return _metadata;
}
void create_replication_strategy();
future<> create_replication_strategy();
locator::abstract_replication_strategy& get_replication_strategy();
column_family::config make_column_family_config(const schema& s) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
void add_column_family(const schema_ptr& s) {
_metadata->add_column_family(s);
}
future<> stop() {
if (_replication_strategy) {
return _replication_strategy->stop();
}
return make_ready_future<>();
}
private:
sstring column_family_directory(const sstring& name, utils::UUID uuid) const;
};
@@ -276,7 +283,7 @@ public:
const utils::UUID& find_uuid(const schema_ptr&) const throw (std::out_of_range);
/* below, find* throws no_such_<type> on fail */
keyspace& find_or_create_keyspace(const lw_shared_ptr<keyspace_metadata>&);
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&);
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
bool has_keyspace(const sstring& name) const;

View File

@@ -580,17 +580,32 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
created.emplace_back(schema_result::value_type{key, std::move(post)});
}
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) {
return proxy.get_db().invoke_on_all([&created, altered = std::move(altered)] (database& db) {
for (auto&& kv : created) {
auto ksm = create_keyspace_from_schema_partition(kv);
keyspace k(ksm, db.make_keyspace_config(*ksm));
k.create_replication_strategy();
db.add_keyspace(ksm->name(), std::move(k));
}
for (auto&& name : altered) {
db.update_keyspace(name);
}
return proxy.get_db().invoke_on_all([&proxy, &created, altered = std::move(altered)] (database& db) {
auto temp_vec_ptr = make_lw_shared<std::vector<std::pair<lw_shared_ptr<keyspace_metadata>, std::unique_ptr<keyspace>>>>();
return do_for_each(created,
[&db, temp_vec_ptr] (auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
std::unique_ptr<keyspace>
k_ptr(new keyspace(ksm, db.make_keyspace_config(*ksm)));
auto fu = k_ptr->create_replication_strategy();
temp_vec_ptr->emplace_back(ksm, std::move(k_ptr));
return fu;
}).then([&db, temp_vec_ptr] {
return do_for_each(*temp_vec_ptr, [&db] (auto&& p_val) {
db.add_keyspace(p_val.first->name(), std::move(*p_val.second));
return make_ready_future<>();
});
}).then([altered = std::move(altered), &db] () mutable {
for (auto&& name : altered) {
db.update_keyspace(name);
}
return make_ready_future<>();
});
});
}).then([dropped = std::move(dropped)] () {
return make_ready_future<std::set<sstring>>(dropped);

View File

@@ -7,12 +7,17 @@
namespace locator {
abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) :
_ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(snitch) {}
abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) :
_ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(std::move(snitch)) {}
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) {
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) {
sstring class_name = strategy_name.find(".") != sstring::npos ? strategy_name : "org.apache.cassandra.locator." + strategy_name;
return create_object<abstract_replication_strategy>(class_name, ks_name, tk_metadata, snitch, config_options);
return create_object<abstract_replication_strategy,
const sstring&,
token_metadata&,
snitch_ptr&&,
std::unordered_map<sstring, sstring>&>
(class_name, ks_name, tk_metadata, std::move(snitch), config_options);
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(const token& search_token) {

View File

@@ -26,13 +26,14 @@ protected:
keyspace* _keyspace = nullptr;
std::unordered_map<sstring, sstring> _config_options;
token_metadata& _token_metadata;
i_endpoint_snitch& _snitch;
snitch_ptr _snitch;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) = 0;
public:
abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
virtual ~abstract_replication_strategy() {}
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
std::vector<inet_address> get_natural_endpoints(const token& search_token);
future<> stop() { return _snitch->stop(); }
};
}

View File

@@ -34,12 +34,20 @@ using inet_address = gms::inet_address;
* A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
* in the 2nd and 3rd octets of the ip address, respectively.
*/
struct rack_inferring_snitch : public snitch_base {
class rack_inferring_snitch : public snitch_base {
private:
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
rack_inferring_snitch() {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
// This snitch is ready on creation
_snitch_is_ready.set_value();
}
public:
virtual sstring get_rack(inet_address endpoint) override {
return std::to_string((endpoint.raw_addr() >> 8) & 0xFF);
}
@@ -47,6 +55,12 @@ struct rack_inferring_snitch : public snitch_base {
virtual sstring get_datacenter(inet_address endpoint) override {
return std::to_string((endpoint.raw_addr() >> 16) & 0xFF);
}
// noop
virtual future<> stop() override {
_state = snitch_state::stopped;
return make_ready_future<>();
}
};
} // namespace locator

View File

@@ -22,6 +22,7 @@
#pragma once
#include "snitch_base.hh"
#include "utils/fb_utilities.hh"
#include <memory>
namespace locator {
@@ -30,12 +31,19 @@ namespace locator {
* allowing non-read-repaired reads to prefer a single endpoint, which improves
* cache locality.
*/
struct simple_snitch : public snitch_base {
class simple_snitch : public snitch_base {
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
simple_snitch() {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
// This snitch is ready on creation
_snitch_is_ready.set_value();
}
public:
virtual sstring get_rack(inet_address endpoint) override {
return "rack1";
}
@@ -66,6 +74,12 @@ struct simple_snitch : public snitch_base {
//
return 0;
}
// noop
virtual future<> stop() override {
_state = snitch_state::stopped;
return make_ready_future<>();
}
};
}

View File

@@ -8,8 +8,8 @@
namespace locator {
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options) {}
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, std::move(snitch), config_options) {}
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t) {
size_t replicas = get_replication_factor();
@@ -47,7 +47,7 @@ size_t simple_strategy::get_replication_factor() const {
return std::stol(it->second);
}
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, token_metadata&, i_endpoint_snitch&, std::unordered_map<sstring, sstring>&>;
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, token_metadata&, snitch_ptr&&, std::unordered_map<sstring, sstring>&>;
static registry registrator("org.apache.cassandra.locator.SimpleStrategy");
}

View File

@@ -12,7 +12,7 @@ class simple_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) override;
public:
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
virtual ~simple_strategy() {};
size_t get_replication_factor() const;
};

View File

@@ -25,10 +25,17 @@
#include <vector>
#include "gms/inet_address.hh"
#include "core/shared_ptr.hh"
namespace locator {
using inet_address = gms::inet_address;
struct i_endpoint_snitch;
typedef gms::inet_address inet_address;
typedef std::unique_ptr<i_endpoint_snitch> snitch_ptr;
template <typename SnitchClass, typename... A>
static future<snitch_ptr> make_snitch(A&&... a);
struct i_endpoint_snitch
{
@@ -79,9 +86,46 @@ struct i_endpoint_snitch
std::vector<inet_address>& l1,
std::vector<inet_address>& l2) = 0;
virtual ~i_endpoint_snitch() {};
virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); };
virtual future<> stop() = 0;
template <typename SnitchClass, typename... A>
friend future<snitch_ptr> make_snitch(A&&... a);
protected:
promise<> _snitch_is_ready;
enum class snitch_state {
initializing,
running,
stopping,
stopped
} _state = snitch_state::initializing;
};
template <typename SnitchClass, typename... A>
static future<snitch_ptr> make_snitch(A&&... a) {
snitch_ptr s(new SnitchClass(std::forward<A>(a)...));
auto fu = s->_snitch_is_ready.get_future();
return fu.then_wrapped([s = std::move(s)] (auto&& f) mutable {
try {
f.get();
return make_ready_future<snitch_ptr>(std::move(s));
} catch (...) {
auto eptr = std::current_exception();
auto fu = s->stop();
return fu.then([eptr, s = std::move(s)] () mutable {
std::rethrow_exception(eptr);
// just to make a compiler happy
return make_ready_future<snitch_ptr>(std::move(s));
});
}
});
}
class snitch_base : public i_endpoint_snitch {
public:
//

View File

@@ -797,11 +797,16 @@ SEASTAR_TEST_CASE(test_user_type) {
false
);
// We don't have "CREATE TYPE" yet, so we must insert the type manually
e.local_db().find_or_create_keyspace(ksm)._user_types.add_type(make_user_type());
return e.create_table([make_user_type] (auto ks_name) {
return e.local_db().create_keyspace(ksm).then(
[&e, make_user_type, ksm] {
keyspace& ks = e.local_db().find_keyspace(ksm->name());
ks._user_types.add_type(make_user_type());
return e.create_table([make_user_type] (auto ks_name) {
// CQL: "create table cf (id int primary key, t ut1)";
return schema({}, ks_name, "cf",
{{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type);
return schema({}, ks_name, "cf",
{{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type);
});
}).then([&e] {
return e.execute_cql("insert into cf (id, t) values (1, (1001, 2001, 'abc1'));").discard_result();
}).then([&e] {