db: consistency_level.hh: Complete the implementation of filter_for_query()

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>

New in v2:
   - Use std::partition_copy() and boost::range::algorithm::partition().
   - Don't use std::move() when returning a local vector variable.
This commit is contained in:
Vlad Zolotarov
2015-07-05 17:19:31 +03:00
parent a9a3bd1927
commit 501737cb84

View File

@@ -24,7 +24,7 @@
#pragma once
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm/partition.hpp>
#include "exceptions/exceptions.hh"
#include "core/sstring.hh"
#include "schema.hh"
@@ -227,6 +227,34 @@ inline size_t count_local_endpoints(Range& live_endpoints) {
return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local);
}
inline std::vector<gms::inet_address>
filter_for_query_dc_local(consistency_level cl,
keyspace& ks,
const std::vector<gms::inet_address>& live_endpoints) {
using namespace gms;
std::vector<inet_address> local;
std::vector<inet_address> other;
local.reserve(live_endpoints.size());
other.reserve(live_endpoints.size());
std::partition_copy(live_endpoints.begin(), live_endpoints.end(),
std::back_inserter(local), std::back_inserter(other),
is_local);
// check if blockfor more than we have localep's
size_t bf = block_for(ks, cl);
if (local.size() < bf) {
size_t other_items_count = std::min(bf - local.size(), other.size());
local.reserve(local.size() + other_items_count);
std::move(other.begin(), other.begin() + other_items_count,
std::back_inserter(local));
}
return local;
}
inline std::vector<gms::inet_address>
filter_for_query(consistency_level cl,
keyspace& ks,
@@ -240,7 +268,7 @@ filter_for_query(consistency_level cl,
* ones).
*/
if (is_datacenter_local(cl)) {
boost::range::sort(live_endpoints, [] (gms::inet_address&, gms::inet_address&) { return 0; }/*, DatabaseDescriptor.getLocalComparator()*/);
boost::range::partition(live_endpoints, is_local);
}
switch (read_repair) {
@@ -254,23 +282,7 @@ filter_for_query(consistency_level cl,
case read_repair_decision::GLOBAL:
return std::move(live_endpoints);
case read_repair_decision::DC_LOCAL:
throw std::runtime_error("DC local read repair is not implemented yet");
#if 0
List<InetAddress> local = new ArrayList<InetAddress>();
List<InetAddress> other = new ArrayList<InetAddress>();
for (InetAddress add : liveEndpoints)
{
if (isLocal(add))
local.add(add);
else
other.add(add);
}
// check if blockfor more than we have localep's
int blockFor = blockFor(keyspace);
if (local.size() < blockFor)
local.addAll(other.subList(0, Math.min(blockFor - local.size(), other.size())));
return local;
#endif
return filter_for_query_dc_local(cl, ks, live_endpoints);
default:
throw std::runtime_error("Unknown read repair type");
}