diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 073f9b5bf9..4d4561f0d2 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -24,7 +24,7 @@ #pragma once -#include +#include #include "exceptions/exceptions.hh" #include "core/sstring.hh" #include "schema.hh" @@ -227,6 +227,34 @@ inline size_t count_local_endpoints(Range& live_endpoints) { return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local); } +inline std::vector +filter_for_query_dc_local(consistency_level cl, + keyspace& ks, + const std::vector& live_endpoints) { + using namespace gms; + + std::vector local; + std::vector other; + local.reserve(live_endpoints.size()); + other.reserve(live_endpoints.size()); + + std::partition_copy(live_endpoints.begin(), live_endpoints.end(), + std::back_inserter(local), std::back_inserter(other), + is_local); + + // check if blockfor more than we have localep's + size_t bf = block_for(ks, cl); + if (local.size() < bf) { + size_t other_items_count = std::min(bf - local.size(), other.size()); + local.reserve(local.size() + other_items_count); + + std::move(other.begin(), other.begin() + other_items_count, + std::back_inserter(local)); + } + + return local; +} + inline std::vector filter_for_query(consistency_level cl, keyspace& ks, @@ -240,7 +268,7 @@ filter_for_query(consistency_level cl, * ones). */ if (is_datacenter_local(cl)) { - boost::range::sort(live_endpoints, [] (gms::inet_address&, gms::inet_address&) { return 0; }/*, DatabaseDescriptor.getLocalComparator()*/); + boost::range::partition(live_endpoints, is_local); } switch (read_repair) { @@ -254,23 +282,7 @@ filter_for_query(consistency_level cl, case read_repair_decision::GLOBAL: return std::move(live_endpoints); case read_repair_decision::DC_LOCAL: - throw std::runtime_error("DC local read repair is not implemented yet"); -#if 0 - List local = new ArrayList(); - List other = new ArrayList(); - for (InetAddress add : liveEndpoints) - { - if (isLocal(add)) - local.add(add); - else - other.add(add); - } - // check if blockfor more than we have localep's - int blockFor = blockFor(keyspace); - if (local.size() < blockFor) - local.addAll(other.subList(0, Math.min(blockFor - local.size(), other.size()))); - return local; -#endif + return filter_for_query_dc_local(cl, ks, live_endpoints); default: throw std::runtime_error("Unknown read repair type"); }