diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 05336c7180..2e03e24f53 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -106,6 +106,7 @@ sstring get_local_dc() { class abstract_write_response_handler { protected: promise<> _ready; // available when cl is achieved + shared_ptr _proxy; db::consistency_level _cl; keyspace& _ks; db::write_type _type; @@ -128,12 +129,10 @@ protected: signal(); } public: - abstract_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, - lw_shared_ptr mutation, - std::unordered_set targets, - size_t pending_endpoints = 0, - std::vector dead_endpoints = {}) - : _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)), + abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, + lw_shared_ptr mutation, std::unordered_set targets, + size_t pending_endpoints = 0, std::vector dead_endpoints = {}) + : _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)), _pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) { } virtual ~abstract_write_response_handler() {}; @@ -195,8 +194,10 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } } public: - datacenter_sync_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, lw_shared_ptr mutation, std::unordered_set targets, size_t pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(ks, cl, type, std::move(mutation), targets, pending_endpoints, dead_endpoints) { + datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, + lw_shared_ptr mutation, std::unordered_set targets, size_t pending_endpoints, + std::vector dead_endpoints) : + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mutation), targets, pending_endpoints, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& target : targets) { @@ -211,7 +212,7 @@ public: storage_proxy::response_id_type storage_proxy::register_response_handler(std::unique_ptr&& h) { auto id = _next_response_id++; - auto e = _response_handlers.emplace(id, rh_entry(std::move(h), shared_from_this(), [this, id] { + auto e = _response_handlers.emplace(id, rh_entry(std::move(h), [this, id] { auto& e = _response_handlers.find(id)->second; if (e.handler->_cl_achieved || e.handler->_cl == db::consistency_level::ANY) { // we are here because either cl was achieved, but targets left in the handler are not @@ -269,11 +270,11 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key if (db::is_datacenter_local(cl)) { pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local); - h = std::make_unique(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = std::make_unique(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); } else { - h = std::make_unique(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); } return register_response_handler(std::move(h)); } @@ -283,7 +284,7 @@ storage_proxy::storage_proxy(distributed& db) : _db(db) { init_messaging_service(); } -storage_proxy::rh_entry::rh_entry(std::unique_ptr&& h, shared_ptr p, std::function&& cb) : handler(std::move(h)), proxy(p), expire_timer(std::move(cb)) {} +storage_proxy::rh_entry::rh_entry(std::unique_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} #if 0 static diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 1bbafb367d..2b32f1c822 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -59,9 +59,8 @@ class abstract_read_executor; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { struct rh_entry { std::unique_ptr handler; - shared_ptr proxy; timer<> expire_timer; - rh_entry(std::unique_ptr&& h, shared_ptr p, std::function&& cb); + rh_entry(std::unique_ptr&& h, std::function&& cb); }; public: