storage_proxy: clean up local_dc checking

The only place local_dc is checked during mutation sending is in
send_to_live_endpoints(), but current code pass it there throw several
function call layers. Simplify the code by getting local_dc when it is
used directly.
This commit is contained in:
Gleb Natapov
2015-10-28 15:21:10 +02:00
committed by Avi Kivity
parent 6e5916161c
commit ac5f92db70
2 changed files with 25 additions and 30 deletions

View File

@@ -96,6 +96,13 @@ const dht::token& end_token(const query::partition_range& r) {
return r.end() ? r.end()->value().token() : max_token;
}
static inline
sstring get_local_dc() {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
return snitch_ptr->get_datacenter(local_addr);
}
class abstract_write_response_handler {
protected:
semaphore _ready; // available when cl is achieved
@@ -758,8 +765,8 @@ future<std::vector<storage_proxy::response_id_type>> storage_proxy::mutate_prepa
});
}
future<> storage_proxy::mutate_begin(std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl, const sstring& local_dc) {
return parallel_for_each(ids, [this, cl, &local_dc] (storage_proxy::response_id_type response_id) {
future<> storage_proxy::mutate_begin(std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl) {
return parallel_for_each(ids, [this, cl] (storage_proxy::response_id_type response_id) {
// it is better to send first and hint afterwards to reduce latency
// but request may complete before hint_to_dead_endpoints() is called and
// response_id handler will be removed, so we will have to do hint with separate
@@ -768,7 +775,7 @@ future<> storage_proxy::mutate_begin(std::vector<storage_proxy::response_id_type
// call before send_to_live_endpoints() for the same reason as above
auto f = response_wait(response_id);
send_to_live_endpoints(response_id, local_dc);
send_to_live_endpoints(response_id);
return f.handle_exception([this, response_id] (std::exception_ptr exp) {
remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
return make_exception_future<>(exp);
@@ -822,10 +829,7 @@ storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl)
lc.start();
return mutate_prepare(mutations, cl, type).then([this, cl] (std::vector<storage_proxy::response_id_type> ids) {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
sstring local_dc = snitch_ptr->get_datacenter(local_addr);
return mutate_begin(std::move(ids), cl, local_dc);
return mutate_begin(std::move(ids), cl);
}).then_wrapped([p = shared_from_this(), lc] (future<> f) {
return p->mutate_end(std::move(f), lc);
});
@@ -870,42 +874,36 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
std::vector<mutation> _mutations;
db::consistency_level _cl;
const gms::inet_address _local_addr;
const sstring _local_dc;
const utils::UUID _batch_uuid;
const std::unordered_set<gms::inet_address> _batchlog_endpoints;
public:
context(storage_proxy & p, std::vector<mutation>&& mutations,
db::consistency_level cl)
: _p(p), _mutations(std::move(mutations)), _cl(cl), _local_addr(
utils::fb_utilities::get_broadcast_address()), _local_dc(
locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(
_local_addr)), _batch_uuid(
utils::UUID_gen::get_time_UUID()), _batchlog_endpoints(
: _p(p), _mutations(std::move(mutations)), _cl(cl), _batch_uuid(utils::UUID_gen::get_time_UUID()),
_batchlog_endpoints(
[this]() -> std::unordered_set<gms::inet_address> {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto topology = service::get_storage_service().local().get_token_metadata().get_topology();
auto local_endpoints = topology.get_datacenter_racks().at(_local_dc); // note: origin copies, so do that here too...
auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(_local_addr);
auto local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); // note: origin copies, so do that here too...
auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(local_addr);
auto chosen_endpoints = db::get_batchlog_manager().local().endpoint_filter(local_rack, local_endpoints);
if (chosen_endpoints.empty()) {
if (_cl == db::consistency_level::ANY) {
return {_local_addr};
return {local_addr};
}
throw exceptions::unavailable_exception(db::consistency_level::ONE, 1, 0);
}
return chosen_endpoints;
}()) {
}
}()) {}
future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
return _p.create_write_response_handler(ks, cl, type, freeze(m), _batchlog_endpoints, {}, {});
}).then([this, cl] (std::vector<response_id_type> ids) {
return _p.mutate_begin(std::move(ids), cl, _local_dc);
return _p.mutate_begin(std::move(ids), cl);
});
}
future<> sync_write_to_batchlog() {
@@ -928,7 +926,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
return sync_write_to_batchlog().then_wrapped([this, ids = std::move(ids)] (future<> f) {
try {
f.get();
return _p.mutate_begin(std::move(ids), _cl, _local_dc);
return _p.mutate_begin(std::move(ids), _cl);
} catch(...) {
// writing batchlog failed, remove responce handlers that will not be used now
boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1));
@@ -974,7 +972,7 @@ bool storage_proxy::cannot_hint(gms::inet_address target) {
* @throws OverloadedException if the hints cannot be written/enqueued
*/
// returned future is ready when sent is complete, not when mutation is executed on all (or any) targets!
future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id, sstring local_dc)
future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id)
{
// extra-datacenter replicas, grouped by dc
std::unordered_map<sstring, std::vector<gms::inet_address>> dc_groups;
@@ -984,7 +982,7 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r
for(auto dest: get_write_response_handler(response_id).get_targets()) {
sstring dc = snitch_ptr->get_datacenter(dest);
if (dc == local_dc) {
if (dc == get_local_dc()) {
local.emplace_back("", std::vector<gms::inet_address>({dest}));
} else {
dc_groups[dc].push_back(dest);
@@ -1241,10 +1239,7 @@ future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, st
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
return create_write_response_handler(ks, cl, type, freeze(m), std::unordered_set<gms::inet_address>({ep}, 1), {}, {});
}).then([this] (std::vector<response_id_type> ids) {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
sstring local_dc = snitch_ptr->get_datacenter(local_addr);
return mutate_begin(std::move(ids), db::consistency_level::ONE, local_dc);
return mutate_begin(std::move(ids), db::consistency_level::ONE);
}).then_wrapped([this, lc] (future<> f) {
return mutate_end(std::move(f), lc);
});

View File

@@ -104,7 +104,7 @@ private:
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>);
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type);
future<> send_to_live_endpoints(response_id_type response_id, sstring local_data_center);
future<> send_to_live_endpoints(response_id_type response_id);
template<typename Range>
size_t hint_to_dead_endpoints(lw_shared_ptr<const frozen_mutation> m, const Range& targets);
void hint_to_dead_endpoints(response_id_type, db::consistency_level);
@@ -132,7 +132,7 @@ private:
template<typename Range, typename CreateWriteHandler>
future<std::vector<storage_proxy::response_id_type>> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler);
future<std::vector<storage_proxy::response_id_type>> mutate_prepare(std::vector<mutation>& mutations, db::consistency_level cl, db::write_type type);
future<> mutate_begin(const std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl, const sstring& local_dc);
future<> mutate_begin(const std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl);
future<> mutate_end(future<> mutate_result, utils::latency_counter);
future<> schedule_repair(std::unordered_map<gms::inet_address, std::vector<mutation>> diffs);