Files
scylladb/db/heat_load_balance.cc
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

462 lines
21 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <vector>
#include <list>
#include <random>
#include <ranges>
#include <fmt/ranges.h>
#include "heat_load_balance.hh"
logging::logger hr_logger("heat_load_balance");
// Return a uniformly-distributed random number in [0,1)
// We use per-thread state for thread safety. We seed the random number generator
// once with a real random value, if available,
static thread_local std::default_random_engine random_engine{std::random_device{}()};
float
rand_float() {
static thread_local std::uniform_real_distribution<float> u(0, 1);
float ret = u(random_engine);
// Gcc 5 has a bug (fixed in Gcc 6) where the above random number
// generator could return 1.0, contradicting the documentation. Let's
// replace 1.0 by the largest number below it. It's not really important
// what we replace it with... Could have also chosen any arbitrary
// constant in [0,1), or to run the random number generator again (this
// is what the fix in Gcc 6 does).
if (ret == 1.0f) {
ret = std::nextafter(ret, 0.0f);
}
return ret;
}
// randone() takes a vector of N probability, and randomly returns one of
// the indexes in this vector, with the probability to choose each index
// given by the probability in the vector.
//
// The given probabilities must sum up to 1.0. This assumption is not
// verified by randone().
//
// TODO:
// This implementation has complexity O(N). If we plan to call randone()
// many times on the same probability vector, and if N can grow large,
// we should consider a different implementation, known as "The Alias Method",
// which has O(N) preparation stage but then only O(1) for each call.
// The alias method was first suggested by A.J. Walker in 1977 and later
// refined by Knuth and others. Here is a short overview of this method:
// The O(N) implementation of randone() divides the interval [0,1) into
// consecutive intervals of length p[i] (which sum to 1), then picks a random
// point in [0,1) and checks which of these intervals it covers. The
// observation behind the Alias Method is that the same technique will
// continue to work if we take these intervals and rearrange and/or cut them
// up, as long as we keep their total lengths. The goal would be to cut them
// up in such a way that it makes it easy (O(1)) to find which interval is
// underneath each point we pick on [0,1).
// To do that, we begin by dividing [0,1) to N intervals of equal length 1/N,
// and then packing in each of those at most two intervals belonging to
// different is. Now, to find to which i a point belongs to, all we need
// to do is to find in which of the equal-length interval it is (a trivial
// division and truncation), and then finding out which one of the two
// possibilities that are left holds (one array indexing and comparison).
// How do we pack the equal-length 1/N intervals correctly? We begin by
// putting in the first one a p[i] such that p[i] <= 1/N (always possible,
// of course). If the inequality was strict, so p[i] did not completely fill
// the first 1/N-length interval, we pick another p[j] where p[j] >= 1/N
// (again, possible), take away from it what is needed to fill up the
// 1/N-length interval, reducing p[j] for the rest of the algorithm. Now,
// we continue the same algorithm with one interval less and one less value,
// so it will end in O(N) time.
// For really large N (which we'll never need here...) there are even papers
// on how to ensure that the initialization stage is really O(N) and not
// O(NlogN) - see https://web.archive.org/web/20131029203736/http://web.eecs.utk.edu/~vose/Publications/random.pdf
static unsigned
randone(const std::vector<float>& p, float rnd = rand_float()) {
unsigned last = p.size() - 1;
for (unsigned i = 0; i < last; i++) {
rnd -= p[i];
if (rnd < 0) {
return i;
}
}
// Note: if we're here and rnd isn't 0 (or very close to 0) then the
// p[i]s do not sum to 1... But we don't check this assumption here.
return last;
}
// ssample() produces a random combination (i.e., unordered subset) of
// length K out of N items 0..N-1, where the different items should be
// included with different probabilities, given by a vector p of N
// probabilities, whose sum should be 1.0.
// It returns a vector<int> with size K whose items are different integers
// between 0 and N-1.
//
// The meaning of a probability p[i] is that if we count the individual
// items appearing in returned combinations, the count of item i will be a
// fraction p[i] of the overall count. Note that p[i] must not be higher
// than 1/K: even if we return item i in *every* K-combination, item i will
// still be only 1/K of the produced items. To reach p[i] > 1/K will mean
// some combinations will need to contain more than one copy of i - which
// contradicts the definition of a "combination".
//
// Though ssample() is required to fulfill the first-order inclusion
// probabilities p (the probability of each item appearing in the returned
// combination), it is NOT required to make any guarantees on the high-order
// inclusion probabilities, i.e., the probabilities for pairs of items to
// be returned together in the same combination. This greatly simplifies
// the implementation, and means we can use the "Systematic Sampling"
// technique (explained below) which only makes guarantees on the first-order
// inclusion probabilities. In our use case, fulfilling *only* the 1st order
// inclusion probabilities is indeed enough: We want that each node gets a
// given amount of work, but don't care if the different K nodes we choose
// in one request are correlated.
//
// Not making any guarantees on high-order inclusion probabilities basically
// means that the items are not independent. To understand what this means,
// consider a simple example: say we have N=4 items with equal probability
// and want to draw random pairs (K=2). Our implementation will return {0,1}
// half of the time, and {2,3} the other half of the time. That distribution
// achieves and achieve the desired probabilities (each item will be given 1/4
// of the work), but the pair {1,2}, for example, will never appear in any
// individual draw.
//
// "Systematic Sampling" is a very simple method of reproducing a set of
// desired 1st-order inclusion probabilities. A good overview can found in
// http://stats.stackexchange.com/questions/139279/systematic-sampling-with-unequal-probabilities
// Basically, Systematic Sampling is a simple extension of the randone()
// algorithm above. Both start by putting the given probabilities one after
// another on the segment [0,1). randone() then drew one random number in
// [0,1) and looked on which of the segments this point falls. Here, we draw
// a random number x in [0, 1/K), look under it, but then look under x+1/K,
// x+2/K, ..., x + (K-1)/K, and these produce K different items with
// appropriate probabilities:
// 1. The items are necessarily different because of our assumption that
// none of the p[i] are larger than 1/K),
// 2. The probability to choose each item is exactly p_i*K.
//
// ssample() only calls for one random number generation (this is important
// for performance) but calls randone() on the same probability vector K times,
// which makes it even more interesting to implement the Alias Method
// described above. However, for very small N like 3, the difference is not
// likely to be noticeable.
//
// TODO: For the special case of K == N-1, we can have a slightly more
// efficient implementation, which calculates the probability for each of
// the N combinations (the combination lacking item i can be proven to have
// probability 1 - K*p[i]) and then uses one randone() call with these
// modified probabilities.
// TODO: Consider making this a template of K, N and have specialized
// implementations for low N (e.g., 3), K=N-1, etc.
// TODO: write to a pre-allocated return vector to avoid extra allocation.
std::vector<int>
ssample(unsigned k, const std::vector<float>& p) {
const float interval = 1.0 / k;
const float rnd = rand_float() * interval; // random number in [0, 1/k)
std::vector<int> ret;
ret.reserve(k);
float offset = 0;
for (unsigned i = 0; i < k; i++) {
ret.emplace_back(randone(p, rnd + offset));
offset += interval;
}
hr_logger.trace("ssample returning {}", ret);
return ret;
}
// Given the cache hit rate (cache hits / request) of N different nodes,
// calculate the fraction of requests that we'd like to send of each of
// these nodes to achieve the same number of misses per second on all nodes
std::vector<float>
miss_equalizing_probablities(const std::vector<float>& hit_rates) {
std::vector<float> ret;
ret.reserve(hit_rates.size());
// R[i] is the reciprocal miss rate 1/(1-H[i]).
float r_sum = 0;
for (float h : hit_rates) {
float r = 1 / (1 - h);
ret.emplace_back(r);
r_sum += r;
}
for (float& r : ret) {
r /= r_sum;
}
return ret;
}
// Given a set of desired probabilities with sum 1, clip the probabilities
// to be not higher than the given limit. The rest of the probabilities are
// increased, in an attempt to preserve the ratios between probabilities,
// if possible - but keep all the probabilities below the limit.
void
clip_probabilities(std::vector<float>& p, float limit) {
// TODO: We have iterations here because it's possible that increasing
// one proability will bring it also over the limit. Can we find a
// single-step algorithm to do this?
float ratio = 1.0;
for (;;) {
float clipped = 0;
float sum_unclipped = 0;
for (float& x : p) {
if (x >= limit) {
clipped += x - limit;
x = limit;
} else {
x *= ratio;
sum_unclipped += x;
}
}
// "ratio" is how much we need to increase the unclipped
// probabilities
if (clipped == 0) {
return; // done
}
ratio = (sum_unclipped + clipped) / sum_unclipped;
}
}
// Run the "probability redistribution" algorithm, which aims for the
// desired probability distribution of the nodes, but does as much work
// as we can (i.e., 1/k) locally and redistributing the rest.
// Returns the vector of proabilities that node "me" should use to send
// requests.
std::vector<float>
redistribute(const std::vector<float>& p, unsigned me, unsigned k) {
unsigned rf = p.size();
std::vector<float> pp(rf);
// "Keep for node i"
// A surplus node keeps its entire desired amount of request, N*p,
// for itself. A mixed node is cut off by 1/C.
pp[me] = std::min(rf * p[me], 1.0f / k);
hr_logger.trace("pp[me({})] = {}", me, pp[me]);
std::vector<float> deficit(rf);
int mixed_count = 0;
for (unsigned j = 0; j < rf; j++) {
float NPj = rf * p[j];
float deficit_j = NPj - 1.0f / k;
if (deficit_j >= 0) {
// mixed node
mixed_count++;
deficit[j] = deficit_j;
}
}
// Each of the mixed nodes have the same same surplus:
float mixed_surplus = 1 - 1.0f / k;
hr_logger.trace("starting distribution of mixed-node surplus to other mixed nodes:"
" mixed_count={}, deficit={}, mixed_surplus={}", mixed_count, deficit, mixed_surplus);
float my_surplus;
if (deficit[me] == 0) {
// surplus node
my_surplus = 1 - rf * p[me];
} else {
// mixed node, which will be converted below to either a deficit
// node or a surplus node. We can easily calculate now how much
// surplus will be left. It will be useful to know below if "me"
// will be a surplus node, because we only need to know how much
// work "me" *sends*, so if me is not a surplus node, we won't need
// to do the second step (of distributing surplus to the deficit
// nodes), and won't even need to update deficit[].
if (deficit[me] <= mixed_surplus) {
// Node will be converted to a surplus node
my_surplus = mixed_surplus - deficit[me];
} else {
// Node will be converted to a deficit node, and will not be
// left with any surplus
my_surplus = 0;
}
}
hr_logger.trace("my_surplus={}", my_surplus);
// Mixed node redistribution algorithm, to "convert" mixed nodes into
// pure surplus or pure deficit nodes, while flowing probability between
// the mixed nodes (we only need to track this flow here if "me" is the
// node doing the sending - in pp[]).
if (deficit[me]) {
// "me" is a mixed node.
hr_logger.trace("CASE1");
// We need a list of the mixed nodes sorted in increasing deficit order.
// Actually, we only need to sort those nodes with deficit <=
// min(deficit[me], mixed_surplus).
// TODO: use NlgN sort instead of this ridiculous N^2 implementation.
// TODO: can we do this without a NlgN (although very small N, not even
// the full rf)? Note also the distribution code below is N^2 anyway
// (two nested for loops).
std::list<std::pair<unsigned, float>> sorted_deficits;
for (unsigned i = 0; i < rf; i++) {
if (deficit[i] && deficit[i] <= deficit[me] &&
deficit[i] < mixed_surplus) {
auto it = sorted_deficits.begin();
while (it != sorted_deficits.end() && it->second < deficit[i])
++it;
sorted_deficits.insert(it, std::make_pair(i, deficit[i]));
}
}
hr_logger.trace("sorted_deficits={}{}", sorted_deficits | std::views::keys, sorted_deficits | std::views::values);
float s = 0;
int count = mixed_count;
for (auto& d : sorted_deficits) {
hr_logger.trace("next sorted deficit={{{}, {}}}", d.first, d.second);
// What "diff" to distribute
auto diff = d.second - s;
s = d.second;
hr_logger.trace("diff={}, pp before={}, count={}", diff, pp, count);
--count;
// Distribute diff among all the mixed nodes with higher deficit.
// There should be exactly "count" of those excluding me.
if (!count) {
break;
}
for (unsigned i = 0; i < rf; i++) {
hr_logger.trace("{} {} {} {}", i, d.first, deficit[i], d.second);
// The ">=" here is ok: If several deficits are tied, the first one
// contributes the diff to all those nodes (all are equal, so >=),
// while when we get to the following nodes, they have diff==0
// (because of the tied deficit) so we don't care that this loop
// doesn't quite match count nodes.
if (i != me && deficit[i] >= d.second) {
pp[i] += diff / count;
hr_logger.trace("pp[{}]={} (case a)", i, pp[i]);
}
}
hr_logger.trace(" pp after1=", pp);
if (d.first == me) {
// We only care what "me" sends, and only the elements in
// the sorted list earlier than me could have forced it to
// send, so the rest of the algorithm isn't interesting.
break;
}
}
// additionally, if me is converted to a deficit node, we need to
// take the remaining surplus (mixed_surplus minus the last deficit
// in sorted_deficits) and distribute it to the other count-1
// converted-to-surplus nodes. Of course we can only do this if
// count > 1 - if count==1, we remain with just one mixed node
// and cannot eliminate its surplus without "fixing" some of the
// decisions made earlier
if (deficit[me] > mixed_surplus) {
auto last_deficit = sorted_deficits.back().second;
auto diff = mixed_surplus - last_deficit;
if (count > 1) {
hr_logger.trace("CASE4. surplus {} count {}", diff, count);
for (unsigned i = 0; i < rf; i++) {
if (i != me && deficit[i] > last_deficit) {
hr_logger.trace("adding {} to pp[{}]={}", (diff / (count-1)), i, pp[i]);
pp[i] += diff / (count - 1);
}
}
// TODO: confirm that this loop worked exactly count - 1 times.
} else {
hr_logger.trace("CASE3a. surplus={}", diff);
// CASE3: count == 1 is possible. example for p = 0.2, 0.3, 0.5:
// surplus 0.5 0.5 0.5
// deficit 0.1 0.4 1.0
// after first step redistributing 0.1 to 3 nodes:
// surplus 0.4 0.4 0.4
// deficit 0.0 0.3 0.9
// after first step redistributing 0.3 to 2 nodes:
// surplus 0.4 0.1 0.1
// deficit 0.0 0.0 0.6
// So we're left with 1 mixed node (count=1), and can't
// redistribute its surplus to itself!
// This happens because the original distribution step was
// already a mistake: In this case the *only* solution is for node
// 0 and 1 is to send all their surplus (total of 1.0) to fill
// node 2's entire deficit (1.0). Node 0 can't afford to send
// any of its surplus to node 1 - and if it does (like we did in
// the first step redistributing 0.1), we end up with
// deficit remaining on node 2!
//
// Special case of one remaining mixed node. Tell the other
// nodes not to give each other as much (we don't have to
// do this here, as we only care about "me") and instead
// "me" will give them their surplus
for (unsigned i = 0; i < rf; i++) {
if (i != me) {
pp[i] += diff / (mixed_count - 1);
hr_logger.trace("pp[{}]={} (case b)", i, pp[i]);
}
}
}
hr_logger.trace(" pp after2={}", pp);
} else {
// Additionally, if the algorithm ends with a single mixed node
// we need to apply a fix. Above we already handled the case that
// this single mixed node is "me", so it needs to send more to the
// other nodes. Here we need to handle the opposite side - me is
// one of the nodes which sent too much to other nodes and needs
// to send to the mixed node instead.
// TODO: find a more efficient way to check if the algorithm will
// end with just one mixed node and its surplus :-(
unsigned n_converted_to_deficit = 0;
unsigned mix_i = 0; // only used if n_converted_to_deficit==1
float last_deficit = 0;
for (unsigned i = 0; i < rf; i++) {
if (deficit[i] > mixed_surplus) {
n_converted_to_deficit++;
mix_i = i;
} else {
last_deficit = std::max(last_deficit, deficit[i]);
}
}
if (n_converted_to_deficit == 1) {
auto diff = mixed_surplus - last_deficit;
hr_logger.trace("CASE3b. surplus={}", diff);
pp[mix_i] += diff / (mixed_count - 1);
hr_logger.trace("pp[{}]={} (case c)", mix_i, pp[mix_i]);
for (unsigned i = 0; i < rf; i++) {
if (deficit[i] > 0) { // mixed node
if (i != mix_i && i != me) {
pp[i] -= diff / (mixed_count - 1) / (mixed_count - 2);
hr_logger.trace("pp[{}]={} (case d)", i, pp[i]);
}
}
}
}
}
}
if (my_surplus) {
// "me" is a surplus node, or became one during the mixed node
// redistribution algorithm. We need to know the new deficit nodes
// produced by that algorithm. i.e., we need to update deficit[].
float new_total_deficit = 0;
for (unsigned i = 0; i < rf; i++) {
if (deficit[i] > 0) {
// Mixed node.
if (deficit[i] > mixed_surplus) {
// The mixed-node redistribution algorithm converted it
// to a deficit node, with this deficit:
deficit[i] -= mixed_surplus;
new_total_deficit += deficit[i];
} else {
// The mixed-node redistribution algorithm converted it
// to a surplus node, with no deficit:
deficit[i] = 0;
}
}
}
// Split "me"'s surplus to the other nodes' remaining deficit,
// according to their share in the total remaining deficit.
for (unsigned j = 0; j < rf ; j++) {
if (deficit[j] > 0) {
// note j!= me because surplus node has deficit==0.
// Note pp[j] +=, not =, because this node might have
// already flowed some work to other nodes in the
// mixed node redistribution algorithm above.
pp[j] += deficit[j] / new_total_deficit * my_surplus;
hr_logger.trace("pp[{}]={} (case e)", j, pp[j]);
}
}
}
return pp;
}