locator: futurize snitch creation
- Forbid explicit snitch creation with constructor.
- Allow the creation of snitches only with locator::make_snitch() template
function.
Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
New in v4:
- Make sure the snitch is stopped before it's destroyed when _snitch_is_ready
is returned in an exceptional state.
New in v2:
- Change snitch_ptr to be std::unique_ptr<i_endpoint_snitch>
- abstract_replication_strategy::create_replication_strategy(): explicitly
specify (template) types of create_object() parameters.
- Re-arrange the loop in marge_keyspaces() so that lambdas that depend on
"this" complete before there is a chance that "this" gets destroyed.
- create_keyspace(): Don't add a new keyspace if a keyspace with this name
already exists.
- i_endpoint_snitch: added a stop() virtual method
- Added a stop() pure virtual method.
- Added an enum class snitch_state and a _state member initialized to snitch_state::initializing,
added an assert() in a destructor requiring _state to become snitch_state::stopped,
which should be set when stop() is complete.
- rack_inferring_snitch: added a stop() method.
- simple_snitch: added a stop() method.
- Added stop() methods to abstract_replication_strategy and keyspace.
- Updated database::stop() to wait for all keyspaces in _keyspaces to stop.
This commit is contained in:
49
database.cc
49
database.cc
@@ -584,9 +584,15 @@ create_keyspace(distributed<database>& db, const lw_shared_ptr<keyspace_metadata
|
||||
return make_directory(db.local()._cfg->data_file_directories()[0] + "/" + ksm->name()).then([ksm, &db] {
|
||||
return db.invoke_on_all([ksm] (database& db) {
|
||||
auto cfg = db.make_keyspace_config(*ksm);
|
||||
|
||||
keyspace ks(ksm, cfg);
|
||||
ks.create_replication_strategy();
|
||||
db.add_keyspace(ksm->name(), std::move(ks));
|
||||
auto fu = ks.create_replication_strategy();
|
||||
|
||||
return fu.then([&db, ks = std::move(ks), ksm] () mutable {
|
||||
db.add_keyspace(ksm->name(), std::move(ks));
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
// FIXME: rollback on error, or keyspace directory remains on disk, poisoning
|
||||
@@ -692,11 +698,13 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
future<>
|
||||
keyspace::create_replication_strategy() {
|
||||
static thread_local locator::token_metadata tm;
|
||||
static locator::simple_snitch snitch;
|
||||
using namespace locator;
|
||||
|
||||
static thread_local token_metadata tm;
|
||||
static std::unordered_map<sstring, sstring> options = {{"replication_factor", "3"}};
|
||||
|
||||
auto d2t = [](double d) {
|
||||
unsigned long l = net::hton(static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max())));
|
||||
std::array<int8_t, 8> a;
|
||||
@@ -707,7 +715,16 @@ keyspace::create_replication_strategy() {
|
||||
tm.update_normal_token({dht::token::kind::key, {d2t(1.0/4).data(), 8}}, to_sstring("127.0.0.2"));
|
||||
tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3"));
|
||||
tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4"));
|
||||
_replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), tm, snitch, options);
|
||||
|
||||
return make_snitch<simple_snitch>().then(
|
||||
[this] (snitch_ptr&& s) {
|
||||
_replication_strategy =
|
||||
abstract_replication_strategy::create_replication_strategy(
|
||||
_metadata->name(), _metadata->strategy_name(),
|
||||
tm, std::move(s), options);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
locator::abstract_replication_strategy&
|
||||
@@ -750,15 +767,21 @@ schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_c
|
||||
return find_column_family(uuid).schema();
|
||||
}
|
||||
|
||||
keyspace&
|
||||
database::find_or_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
future<>
|
||||
database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
auto i = _keyspaces.find(ksm->name());
|
||||
if (i != _keyspaces.end()) {
|
||||
return i->second;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
keyspace ks(ksm, std::move(make_keyspace_config(*ksm)));
|
||||
ks.create_replication_strategy();
|
||||
return _keyspaces.emplace(ksm->name(), std::move(ks)).first->second;
|
||||
auto fu = ks.create_replication_strategy();
|
||||
|
||||
return fu.then([ks = std::move(ks), ksm, this] () mutable {
|
||||
_keyspaces.emplace(ksm->name(), std::move(ks));
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
std::set<sstring>
|
||||
@@ -1018,5 +1041,7 @@ operator<<(std::ostream& os, const atomic_cell& ac) {
|
||||
|
||||
future<>
|
||||
database::stop() {
|
||||
return make_ready_future<>();
|
||||
return do_for_each(_keyspaces, [this] (auto& val_pair) {
|
||||
return val_pair.second.stop();
|
||||
});
|
||||
}
|
||||
|
||||
11
database.hh
11
database.hh
@@ -214,13 +214,20 @@ public:
|
||||
const lw_shared_ptr<keyspace_metadata>& metadata() const {
|
||||
return _metadata;
|
||||
}
|
||||
void create_replication_strategy();
|
||||
future<> create_replication_strategy();
|
||||
locator::abstract_replication_strategy& get_replication_strategy();
|
||||
column_family::config make_column_family_config(const schema& s) const;
|
||||
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
|
||||
void add_column_family(const schema_ptr& s) {
|
||||
_metadata->add_column_family(s);
|
||||
}
|
||||
future<> stop() {
|
||||
if (_replication_strategy) {
|
||||
return _replication_strategy->stop();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
private:
|
||||
sstring column_family_directory(const sstring& name, utils::UUID uuid) const;
|
||||
};
|
||||
@@ -276,7 +283,7 @@ public:
|
||||
const utils::UUID& find_uuid(const schema_ptr&) const throw (std::out_of_range);
|
||||
|
||||
/* below, find* throws no_such_<type> on fail */
|
||||
keyspace& find_or_create_keyspace(const lw_shared_ptr<keyspace_metadata>&);
|
||||
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&);
|
||||
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
|
||||
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
|
||||
bool has_keyspace(const sstring& name) const;
|
||||
|
||||
@@ -580,17 +580,32 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
|
||||
created.emplace_back(schema_result::value_type{key, std::move(post)});
|
||||
}
|
||||
}
|
||||
|
||||
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) {
|
||||
return proxy.get_db().invoke_on_all([&created, altered = std::move(altered)] (database& db) {
|
||||
for (auto&& kv : created) {
|
||||
auto ksm = create_keyspace_from_schema_partition(kv);
|
||||
keyspace k(ksm, db.make_keyspace_config(*ksm));
|
||||
k.create_replication_strategy();
|
||||
db.add_keyspace(ksm->name(), std::move(k));
|
||||
}
|
||||
for (auto&& name : altered) {
|
||||
db.update_keyspace(name);
|
||||
}
|
||||
return proxy.get_db().invoke_on_all([&proxy, &created, altered = std::move(altered)] (database& db) {
|
||||
auto temp_vec_ptr = make_lw_shared<std::vector<std::pair<lw_shared_ptr<keyspace_metadata>, std::unique_ptr<keyspace>>>>();
|
||||
return do_for_each(created,
|
||||
[&db, temp_vec_ptr] (auto&& val) {
|
||||
|
||||
auto ksm = create_keyspace_from_schema_partition(val);
|
||||
std::unique_ptr<keyspace>
|
||||
k_ptr(new keyspace(ksm, db.make_keyspace_config(*ksm)));
|
||||
auto fu = k_ptr->create_replication_strategy();
|
||||
temp_vec_ptr->emplace_back(ksm, std::move(k_ptr));
|
||||
|
||||
return fu;
|
||||
}).then([&db, temp_vec_ptr] {
|
||||
return do_for_each(*temp_vec_ptr, [&db] (auto&& p_val) {
|
||||
db.add_keyspace(p_val.first->name(), std::move(*p_val.second));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([altered = std::move(altered), &db] () mutable {
|
||||
for (auto&& name : altered) {
|
||||
db.update_keyspace(name);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}).then([dropped = std::move(dropped)] () {
|
||||
return make_ready_future<std::set<sstring>>(dropped);
|
||||
|
||||
@@ -7,12 +7,17 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) :
|
||||
_ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(snitch) {}
|
||||
abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) :
|
||||
_ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(std::move(snitch)) {}
|
||||
|
||||
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) {
|
||||
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) {
|
||||
sstring class_name = strategy_name.find(".") != sstring::npos ? strategy_name : "org.apache.cassandra.locator." + strategy_name;
|
||||
return create_object<abstract_replication_strategy>(class_name, ks_name, tk_metadata, snitch, config_options);
|
||||
return create_object<abstract_replication_strategy,
|
||||
const sstring&,
|
||||
token_metadata&,
|
||||
snitch_ptr&&,
|
||||
std::unordered_map<sstring, sstring>&>
|
||||
(class_name, ks_name, tk_metadata, std::move(snitch), config_options);
|
||||
}
|
||||
|
||||
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(const token& search_token) {
|
||||
|
||||
@@ -26,13 +26,14 @@ protected:
|
||||
keyspace* _keyspace = nullptr;
|
||||
std::unordered_map<sstring, sstring> _config_options;
|
||||
token_metadata& _token_metadata;
|
||||
i_endpoint_snitch& _snitch;
|
||||
snitch_ptr _snitch;
|
||||
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) = 0;
|
||||
public:
|
||||
abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
virtual ~abstract_replication_strategy() {}
|
||||
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
std::vector<inet_address> get_natural_endpoints(const token& search_token);
|
||||
future<> stop() { return _snitch->stop(); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -34,12 +34,20 @@ using inet_address = gms::inet_address;
|
||||
* A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
|
||||
* in the 2nd and 3rd octets of the ip address, respectively.
|
||||
*/
|
||||
struct rack_inferring_snitch : public snitch_base {
|
||||
class rack_inferring_snitch : public snitch_base {
|
||||
private:
|
||||
template <typename SnitchClass, typename... A>
|
||||
friend future<snitch_ptr> make_snitch(A&&... a);
|
||||
|
||||
rack_inferring_snitch() {
|
||||
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
|
||||
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
|
||||
|
||||
// This snitch is ready on creation
|
||||
_snitch_is_ready.set_value();
|
||||
}
|
||||
|
||||
public:
|
||||
virtual sstring get_rack(inet_address endpoint) override {
|
||||
return std::to_string((endpoint.raw_addr() >> 8) & 0xFF);
|
||||
}
|
||||
@@ -47,6 +55,12 @@ struct rack_inferring_snitch : public snitch_base {
|
||||
virtual sstring get_datacenter(inet_address endpoint) override {
|
||||
return std::to_string((endpoint.raw_addr() >> 16) & 0xFF);
|
||||
}
|
||||
|
||||
// noop
|
||||
virtual future<> stop() override {
|
||||
_state = snitch_state::stopped;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#pragma once
|
||||
#include "snitch_base.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include <memory>
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -30,12 +31,19 @@ namespace locator {
|
||||
* allowing non-read-repaired reads to prefer a single endpoint, which improves
|
||||
* cache locality.
|
||||
*/
|
||||
struct simple_snitch : public snitch_base {
|
||||
class simple_snitch : public snitch_base {
|
||||
template <typename SnitchClass, typename... A>
|
||||
friend future<snitch_ptr> make_snitch(A&&... a);
|
||||
|
||||
simple_snitch() {
|
||||
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
|
||||
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
|
||||
|
||||
// This snitch is ready on creation
|
||||
_snitch_is_ready.set_value();
|
||||
}
|
||||
|
||||
public:
|
||||
virtual sstring get_rack(inet_address endpoint) override {
|
||||
return "rack1";
|
||||
}
|
||||
@@ -66,6 +74,12 @@ struct simple_snitch : public snitch_base {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
// noop
|
||||
virtual future<> stop() override {
|
||||
_state = snitch_state::stopped;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options) :
|
||||
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options) {}
|
||||
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options) :
|
||||
abstract_replication_strategy(keyspace_name, token_metadata, std::move(snitch), config_options) {}
|
||||
|
||||
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t) {
|
||||
size_t replicas = get_replication_factor();
|
||||
@@ -47,7 +47,7 @@ size_t simple_strategy::get_replication_factor() const {
|
||||
return std::stol(it->second);
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, token_metadata&, i_endpoint_snitch&, std::unordered_map<sstring, sstring>&>;
|
||||
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, token_metadata&, snitch_ptr&&, std::unordered_map<sstring, sstring>&>;
|
||||
static registry registrator("org.apache.cassandra.locator.SimpleStrategy");
|
||||
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ class simple_strategy : public abstract_replication_strategy {
|
||||
protected:
|
||||
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) override;
|
||||
public:
|
||||
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map<sstring, sstring>& config_options);
|
||||
virtual ~simple_strategy() {};
|
||||
size_t get_replication_factor() const;
|
||||
};
|
||||
|
||||
@@ -25,10 +25,17 @@
|
||||
#include <vector>
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
#include "core/shared_ptr.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
using inet_address = gms::inet_address;
|
||||
struct i_endpoint_snitch;
|
||||
|
||||
typedef gms::inet_address inet_address;
|
||||
typedef std::unique_ptr<i_endpoint_snitch> snitch_ptr;
|
||||
|
||||
template <typename SnitchClass, typename... A>
|
||||
static future<snitch_ptr> make_snitch(A&&... a);
|
||||
|
||||
struct i_endpoint_snitch
|
||||
{
|
||||
@@ -79,9 +86,46 @@ struct i_endpoint_snitch
|
||||
std::vector<inet_address>& l1,
|
||||
std::vector<inet_address>& l2) = 0;
|
||||
|
||||
virtual ~i_endpoint_snitch() {};
|
||||
virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); };
|
||||
|
||||
virtual future<> stop() = 0;
|
||||
|
||||
template <typename SnitchClass, typename... A>
|
||||
friend future<snitch_ptr> make_snitch(A&&... a);
|
||||
|
||||
protected:
|
||||
promise<> _snitch_is_ready;
|
||||
enum class snitch_state {
|
||||
initializing,
|
||||
running,
|
||||
stopping,
|
||||
stopped
|
||||
} _state = snitch_state::initializing;
|
||||
};
|
||||
|
||||
template <typename SnitchClass, typename... A>
|
||||
static future<snitch_ptr> make_snitch(A&&... a) {
|
||||
snitch_ptr s(new SnitchClass(std::forward<A>(a)...));
|
||||
|
||||
auto fu = s->_snitch_is_ready.get_future();
|
||||
return fu.then_wrapped([s = std::move(s)] (auto&& f) mutable {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
return make_ready_future<snitch_ptr>(std::move(s));
|
||||
} catch (...) {
|
||||
auto eptr = std::current_exception();
|
||||
auto fu = s->stop();
|
||||
|
||||
return fu.then([eptr, s = std::move(s)] () mutable {
|
||||
std::rethrow_exception(eptr);
|
||||
// just to make a compiler happy
|
||||
return make_ready_future<snitch_ptr>(std::move(s));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class snitch_base : public i_endpoint_snitch {
|
||||
public:
|
||||
//
|
||||
|
||||
@@ -797,11 +797,16 @@ SEASTAR_TEST_CASE(test_user_type) {
|
||||
false
|
||||
);
|
||||
// We don't have "CREATE TYPE" yet, so we must insert the type manually
|
||||
e.local_db().find_or_create_keyspace(ksm)._user_types.add_type(make_user_type());
|
||||
return e.create_table([make_user_type] (auto ks_name) {
|
||||
return e.local_db().create_keyspace(ksm).then(
|
||||
[&e, make_user_type, ksm] {
|
||||
keyspace& ks = e.local_db().find_keyspace(ksm->name());
|
||||
ks._user_types.add_type(make_user_type());
|
||||
|
||||
return e.create_table([make_user_type] (auto ks_name) {
|
||||
// CQL: "create table cf (id int primary key, t ut1)";
|
||||
return schema({}, ks_name, "cf",
|
||||
{{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type);
|
||||
return schema({}, ks_name, "cf",
|
||||
{{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type);
|
||||
});
|
||||
}).then([&e] {
|
||||
return e.execute_cql("insert into cf (id, t) values (1, (1001, 2001, 'abc1'));").discard_result();
|
||||
}).then([&e] {
|
||||
|
||||
Reference in New Issue
Block a user