storage_proxy: move proxy pointer to write response handler

We will need it later there.
This commit is contained in:
Gleb Natapov
2015-11-02 10:07:59 +02:00
parent ca0177f5cc
commit 59d8f9f392
2 changed files with 15 additions and 15 deletions

View File

@@ -106,6 +106,7 @@ sstring get_local_dc() {
class abstract_write_response_handler {
protected:
promise<> _ready; // available when cl is achieved
shared_ptr<storage_proxy> _proxy;
db::consistency_level _cl;
keyspace& _ks;
db::write_type _type;
@@ -128,12 +129,10 @@ protected:
signal();
}
public:
abstract_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type,
lw_shared_ptr<const frozen_mutation> mutation,
std::unordered_set<gms::inet_address> targets,
size_t pending_endpoints = 0,
std::vector<gms::inet_address> dead_endpoints = {})
: _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)),
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
: _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)),
_pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) {
}
virtual ~abstract_write_response_handler() {};
@@ -195,8 +194,10 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
}
}
public:
datacenter_sync_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(ks, cl, type, std::move(mutation), targets, pending_endpoints, dead_endpoints) {
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints,
std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mutation), targets, pending_endpoints, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& target : targets) {
@@ -211,7 +212,7 @@ public:
storage_proxy::response_id_type storage_proxy::register_response_handler(std::unique_ptr<abstract_write_response_handler>&& h) {
auto id = _next_response_id++;
auto e = _response_handlers.emplace(id, rh_entry(std::move(h), shared_from_this(), [this, id] {
auto e = _response_handlers.emplace(id, rh_entry(std::move(h), [this, id] {
auto& e = _response_handlers.find(id)->second;
if (e.handler->_cl_achieved || e.handler->_cl == db::consistency_level::ANY) {
// we are here because either cl was achieved, but targets left in the handler are not
@@ -269,11 +270,11 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key
if (db::is_datacenter_local(cl)) {
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
h = std::make_unique<datacenter_write_response_handler>(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = std::make_unique<datacenter_sync_write_response_handler>(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
} else {
h = std::make_unique<write_response_handler>(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
}
return register_response_handler(std::move(h));
}
@@ -283,7 +284,7 @@ storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
init_messaging_service();
}
storage_proxy::rh_entry::rh_entry(std::unique_ptr<abstract_write_response_handler>&& h, shared_ptr<storage_proxy> p, std::function<void()>&& cb) : handler(std::move(h)), proxy(p), expire_timer(std::move(cb)) {}
storage_proxy::rh_entry::rh_entry(std::unique_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {}
#if 0
static

View File

@@ -59,9 +59,8 @@ class abstract_read_executor;
class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*implements StorageProxyMBean*/ {
struct rh_entry {
std::unique_ptr<abstract_write_response_handler> handler;
shared_ptr<storage_proxy> proxy;
timer<> expire_timer;
rh_entry(std::unique_ptr<abstract_write_response_handler>&& h, shared_ptr<storage_proxy> p, std::function<void()>&& cb);
rh_entry(std::unique_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb);
};
public: