diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 74b109ee86..17a1ee5e15 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -22,19 +22,23 @@ */ #include +#include +#include +#include +#include + #include "batchlog_manager.hh" #include "service/storage_service.hh" #include "service/storage_proxy.hh" #include "system_keyspace.hh" #include "utils/rate_limiter.hh" -#include "core/future-util.hh" -#include "core/do_with.hh" #include "log.hh" #include "serializer.hh" #include "db_clock.hh" #include "database.hh" #include "unimplemented.hh" #include "db/config.hh" +#include "gms/failure_detector.hh" static logging::logger logger("BatchLog Manager"); @@ -43,6 +47,7 @@ const uint32_t db::batchlog_manager::page_size; db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) : _qp(qp) + , _e1(_rd()) {} future<> db::batchlog_manager::start() { @@ -239,3 +244,73 @@ future<> db::batchlog_manager::replay_all_failed_batches() { _sem.signal(); }); } + +std::unordered_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map>& endpoints) { + // special case for single-node data centers + if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) { + return endpoints.begin()->second; + } + + // strip out dead endpoints and localhost + std::unordered_multimap validated; + + auto is_valid = [](gms::inet_address input) { + return input != utils::fb_utilities::get_broadcast_address() + && gms::get_local_failure_detector().is_alive(input) + ; + }; + + for (auto& e : endpoints) { + for (auto& a : e.second) { + if (is_valid(a)) { + validated.emplace(e.first, a); + } + } + } + + typedef std::unordered_set return_type; + + if (validated.size() <= 2) { + return boost::copy_range(validated | boost::adaptors::map_values); + } + + if (validated.size() - validated.count(local_rack) >= 2) { + // we have enough endpoints in other racks + validated.erase(local_rack); + } + + if (validated.bucket_count() == 1) { + // we have only 1 `other` rack + auto res = validated | boost::adaptors::map_values; + if (validated.size() > 2) { + return boost::copy_range( + boost::copy_range>(res) + | boost::adaptors::sliced(0, 2)); + } + return boost::copy_range(res); + } + + // randomize which racks we pick from if more than 2 remaining + + std::vector racks = boost::copy_range>(validated | boost::adaptors::map_keys); + + if (validated.bucket_count() > 2) { + std::shuffle(racks.begin(), racks.end(), _e1); + racks.resize(2); + } + + std::unordered_set result; + + // grab a random member of up to two racks + for (auto& rack : racks) { + auto rack_members = validated.bucket(rack); + auto n = validated.bucket_size(rack_members); + auto cpy = boost::copy_range>(validated.equal_range(rack) | boost::adaptors::map_values); + std::uniform_int_distribution rdist(0, n - 1); + result.emplace(cpy[rdist(_e1)]); + } + + return result; +} + + diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh index 71544eb5cd..0e956963aa 100644 --- a/db/batchlog_manager.hh +++ b/db/batchlog_manager.hh @@ -47,6 +47,9 @@ private: semaphore _sem; bool _stop = false; + std::random_device _rd; + std::default_random_engine _e1; + future<> replay_all_failed_batches(); public: // Takes a QP, not a distributes. Because this object is supposed @@ -72,18 +75,7 @@ public: mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t, db_clock::time_point); db_clock::duration get_batch_log_timeout() const; - class endpoint_filter { - private: - const sstring _local_rack; - const std::unordered_map> _endpoints; - - public: - endpoint_filter(sstring, std::unordered_map>); - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - std::vector filter() const; - }; + std::unordered_set endpoint_filter(const sstring&, const std::unordered_map>&); }; }