BatchlogManager: make endpoint_filter method + implement

This commit is contained in:
Calle Wilund
2015-08-05 10:07:29 +02:00
parent b7cdd189e7
commit 0ded44eeee
2 changed files with 81 additions and 14 deletions

View File

@@ -22,19 +22,23 @@
*/
#include <chrono>
#include <core/future-util.hh>
#include <core/do_with.hh>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/sliced.hpp>
#include "batchlog_manager.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "system_keyspace.hh"
#include "utils/rate_limiter.hh"
#include "core/future-util.hh"
#include "core/do_with.hh"
#include "log.hh"
#include "serializer.hh"
#include "db_clock.hh"
#include "database.hh"
#include "unimplemented.hh"
#include "db/config.hh"
#include "gms/failure_detector.hh"
static logging::logger logger("BatchLog Manager");
@@ -43,6 +47,7 @@ const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp)
: _qp(qp)
, _e1(_rd())
{}
future<> db::batchlog_manager::start() {
@@ -239,3 +244,73 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
_sem.signal();
});
}
std::unordered_set<gms::inet_address> db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
// special case for single-node data centers
if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) {
return endpoints.begin()->second;
}
// strip out dead endpoints and localhost
std::unordered_multimap<sstring, gms::inet_address> validated;
auto is_valid = [](gms::inet_address input) {
return input != utils::fb_utilities::get_broadcast_address()
&& gms::get_local_failure_detector().is_alive(input)
;
};
for (auto& e : endpoints) {
for (auto& a : e.second) {
if (is_valid(a)) {
validated.emplace(e.first, a);
}
}
}
typedef std::unordered_set<gms::inet_address> return_type;
if (validated.size() <= 2) {
return boost::copy_range<return_type>(validated | boost::adaptors::map_values);
}
if (validated.size() - validated.count(local_rack) >= 2) {
// we have enough endpoints in other racks
validated.erase(local_rack);
}
if (validated.bucket_count() == 1) {
// we have only 1 `other` rack
auto res = validated | boost::adaptors::map_values;
if (validated.size() > 2) {
return boost::copy_range<return_type>(
boost::copy_range<std::vector<gms::inet_address>>(res)
| boost::adaptors::sliced(0, 2));
}
return boost::copy_range<return_type>(res);
}
// randomize which racks we pick from if more than 2 remaining
std::vector<sstring> racks = boost::copy_range<std::vector<sstring>>(validated | boost::adaptors::map_keys);
if (validated.bucket_count() > 2) {
std::shuffle(racks.begin(), racks.end(), _e1);
racks.resize(2);
}
std::unordered_set<gms::inet_address> result;
// grab a random member of up to two racks
for (auto& rack : racks) {
auto rack_members = validated.bucket(rack);
auto n = validated.bucket_size(rack_members);
auto cpy = boost::copy_range<std::vector<gms::inet_address>>(validated.equal_range(rack) | boost::adaptors::map_values);
std::uniform_int_distribution<size_t> rdist(0, n - 1);
result.emplace(cpy[rdist(_e1)]);
}
return result;
}

View File

@@ -47,6 +47,9 @@ private:
semaphore _sem;
bool _stop = false;
std::random_device _rd;
std::default_random_engine _e1;
future<> replay_all_failed_batches();
public:
// Takes a QP, not a distributes. Because this object is supposed
@@ -72,18 +75,7 @@ public:
mutation get_batch_log_mutation_for(std::vector<mutation>, const utils::UUID&, int32_t, db_clock::time_point);
db_clock::duration get_batch_log_timeout() const;
class endpoint_filter {
private:
const sstring _local_rack;
const std::unordered_map<sstring, std::vector<gms::inet_address>> _endpoints;
public:
endpoint_filter(sstring, std::unordered_map<sstring, std::vector<gms::inet_address>>);
/**
* @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
*/
std::vector<gms::inet_address> filter() const;
};
std::unordered_set<gms::inet_address> endpoint_filter(const sstring&, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>&);
};
}