diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index a0d565d1e3..df4bd0864c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -96,6 +96,13 @@ const dht::token& end_token(const query::partition_range& r) { return r.end() ? r.end()->value().token() : max_token; } +static inline +sstring get_local_dc() { + auto local_addr = utils::fb_utilities::get_broadcast_address(); + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + return snitch_ptr->get_datacenter(local_addr); +} + class abstract_write_response_handler { protected: semaphore _ready; // available when cl is achieved @@ -758,8 +765,8 @@ future> storage_proxy::mutate_prepa }); } -future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl, const sstring& local_dc) { - return parallel_for_each(ids, [this, cl, &local_dc] (storage_proxy::response_id_type response_id) { +future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl) { + return parallel_for_each(ids, [this, cl] (storage_proxy::response_id_type response_id) { // it is better to send first and hint afterwards to reduce latency // but request may complete before hint_to_dead_endpoints() is called and // response_id handler will be removed, so we will have to do hint with separate @@ -768,7 +775,7 @@ future<> storage_proxy::mutate_begin(std::vector(exp); @@ -822,10 +829,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) lc.start(); return mutate_prepare(mutations, cl, type).then([this, cl] (std::vector ids) { - auto local_addr = utils::fb_utilities::get_broadcast_address(); - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - sstring local_dc = snitch_ptr->get_datacenter(local_addr); - return mutate_begin(std::move(ids), cl, local_dc); + return mutate_begin(std::move(ids), cl); }).then_wrapped([p = shared_from_this(), lc] (future<> f) { return p->mutate_end(std::move(f), lc); }); @@ -870,42 +874,36 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc std::vector _mutations; db::consistency_level _cl; - const gms::inet_address _local_addr; - const sstring _local_dc; - const utils::UUID _batch_uuid; const std::unordered_set _batchlog_endpoints; public: context(storage_proxy & p, std::vector&& mutations, db::consistency_level cl) - : _p(p), _mutations(std::move(mutations)), _cl(cl), _local_addr( - utils::fb_utilities::get_broadcast_address()), _local_dc( - locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter( - _local_addr)), _batch_uuid( - utils::UUID_gen::get_time_UUID()), _batchlog_endpoints( + : _p(p), _mutations(std::move(mutations)), _cl(cl), _batch_uuid(utils::UUID_gen::get_time_UUID()), + _batchlog_endpoints( [this]() -> std::unordered_set { + auto local_addr = utils::fb_utilities::get_broadcast_address(); auto topology = service::get_storage_service().local().get_token_metadata().get_topology(); - auto local_endpoints = topology.get_datacenter_racks().at(_local_dc); // note: origin copies, so do that here too... - auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(_local_addr); + auto local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); // note: origin copies, so do that here too... + auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(local_addr); auto chosen_endpoints = db::get_batchlog_manager().local().endpoint_filter(local_rack, local_endpoints); if (chosen_endpoints.empty()) { if (_cl == db::consistency_level::ANY) { - return {_local_addr}; + return {local_addr}; } throw exceptions::unavailable_exception(db::consistency_level::ONE, 1, 0); } return chosen_endpoints; - }()) { - } + }()) {} future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); return _p.create_write_response_handler(ks, cl, type, freeze(m), _batchlog_endpoints, {}, {}); }).then([this, cl] (std::vector ids) { - return _p.mutate_begin(std::move(ids), cl, _local_dc); + return _p.mutate_begin(std::move(ids), cl); }); } future<> sync_write_to_batchlog() { @@ -928,7 +926,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc return sync_write_to_batchlog().then_wrapped([this, ids = std::move(ids)] (future<> f) { try { f.get(); - return _p.mutate_begin(std::move(ids), _cl, _local_dc); + return _p.mutate_begin(std::move(ids), _cl); } catch(...) { // writing batchlog failed, remove responce handlers that will not be used now boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1)); @@ -974,7 +972,7 @@ bool storage_proxy::cannot_hint(gms::inet_address target) { * @throws OverloadedException if the hints cannot be written/enqueued */ // returned future is ready when sent is complete, not when mutation is executed on all (or any) targets! -future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id, sstring local_dc) +future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id) { // extra-datacenter replicas, grouped by dc std::unordered_map> dc_groups; @@ -984,7 +982,7 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r for(auto dest: get_write_response_handler(response_id).get_targets()) { sstring dc = snitch_ptr->get_datacenter(dest); - if (dc == local_dc) { + if (dc == get_local_dc()) { local.emplace_back("", std::vector({dest})); } else { dc_groups[dc].push_back(dest); @@ -1241,10 +1239,7 @@ future<> storage_proxy::schedule_repair(std::unordered_mapks_name()); return create_write_response_handler(ks, cl, type, freeze(m), std::unordered_set({ep}, 1), {}, {}); }).then([this] (std::vector ids) { - auto local_addr = utils::fb_utilities::get_broadcast_address(); - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - sstring local_dc = snitch_ptr->get_datacenter(local_addr); - return mutate_begin(std::move(ids), db::consistency_level::ONE, local_dc); + return mutate_begin(std::move(ids), db::consistency_level::ONE); }).then_wrapped([this, lc] (future<> f) { return mutate_end(std::move(f), lc); }); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 2d250149d4..746e6c521a 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -104,7 +104,7 @@ private: response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, const std::vector& pending_endpoints, std::vector); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type); - future<> send_to_live_endpoints(response_id_type response_id, sstring local_data_center); + future<> send_to_live_endpoints(response_id_type response_id); template size_t hint_to_dead_endpoints(lw_shared_ptr m, const Range& targets); void hint_to_dead_endpoints(response_id_type, db::consistency_level); @@ -132,7 +132,7 @@ private: template future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); future> mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type); - future<> mutate_begin(const std::vector ids, db::consistency_level cl, const sstring& local_dc); + future<> mutate_begin(const std::vector ids, db::consistency_level cl); future<> mutate_end(future<> mutate_result, utils::latency_counter); future<> schedule_repair(std::unordered_map> diffs);