Currently, we use storage_proxy/get_cas_shard -> sharder.shard_for_reads to decide which shard to use for LWT code execution on both replicas and the coordinator. If the coordinator is not a replica, shard_for_reads returns 0 — the 'default' shard. This behavior has at least two problems: * Shard 0 may become overloaded, because all LWT coordinators that are not replicas will be served on it. * The zero shard does not match shard_for_reads on replicas, which hinders the "same shard for client and server" RPC-level optimization. To fix this, we need to know whether the current node hosts a replica for the tablet corresponding to the given token. Currently, there is no API we could use for this. For historical reasons, sharder::shard_for_reads returns 0 when the node does not host the shard, which leads to ambiguity. This commit introduces try_get_shard_for_reads, which returns a disengaged std::optional when the tablet is not present on the local node. We leave shard_for_reads method in the base sharder class, it calls try_get_shard_for_reads and returns zero by default. We need to rename tablet_sharder private methods shard_for_reads and shard_for_writes so that they don't conflict with the sharder::shard_for_reads.
192 lines
7.7 KiB
C++
192 lines
7.7 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "dht/token.hh"
|
|
#include <seastar/core/smp.hh>
|
|
#include <boost/container/static_vector.hpp>
|
|
|
|
namespace dht {
|
|
|
|
inline sstring cpu_sharding_algorithm_name() {
|
|
return "biased-token-round-robin";
|
|
}
|
|
|
|
std::vector<uint64_t> init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits);
|
|
|
|
unsigned shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t);
|
|
|
|
token token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans);
|
|
|
|
struct shard_and_token {
|
|
shard_id shard;
|
|
token token;
|
|
};
|
|
|
|
/// Represents a set of shards that own a given token on a single host.
|
|
/// It can be either of: none, single shard (no tablet migration), or two shards (during intra-node tablet migration).
|
|
using shard_replica_set = boost::container::static_vector<unsigned, 2>;
|
|
|
|
/// Describes which replica set should be used during tablet migration.
|
|
enum class write_replica_set_selector {
|
|
previous, both, next
|
|
};
|
|
|
|
/**
|
|
* Describes mapping between token space of a given table and owning shards on the local node.
|
|
* The mapping reflected by this instance is constant for the lifetime of this sharder object.
|
|
* It is not guaranteed to be the same for different sharder instances, even within a single process lifetime.
|
|
*
|
|
* For vnode-based replication strategies, the mapping is constant for the lifetime of the local process even
|
|
* across different sharder instances.
|
|
*/
|
|
class sharder {
|
|
protected:
|
|
unsigned _shard_count;
|
|
unsigned _sharding_ignore_msb_bits;
|
|
public:
|
|
sharder(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
|
|
virtual ~sharder() = default;
|
|
|
|
/**
|
|
* Returns the shard that handles a particular token for reads, or empty if this
|
|
* node doesn't contain data for it.
|
|
*/
|
|
virtual std::optional<unsigned> try_get_shard_for_reads(const token& t) const = 0;
|
|
|
|
/**
|
|
* Returns the shard that handles a particular token for reads.
|
|
* Use shard_for_writes() to determine the set of shards that should receive writes.
|
|
*
|
|
* When there is no tablet migration, the function is equivalent to the lambda:
|
|
*
|
|
* [] (const token& t) {
|
|
* auto shards = shard_for_writes();
|
|
* SCYLLA_ASSERT(shards.size() <= 1);
|
|
* return shards.empty() ? 0 : shards[0];
|
|
* }
|
|
*
|
|
*/
|
|
unsigned shard_for_reads(const token& t) const {
|
|
// FIXME: Consider throwing when there is no owning shard on the current host rather than returning 0.
|
|
// It's a coordination mistake to route requests to non-owners. Topology coordinator should synchronize
|
|
// with request coordinators before moving the shard away.
|
|
return try_get_shard_for_reads(t).value_or(0);
|
|
}
|
|
|
|
/**
|
|
* Returns the set of shards which should receive a write to token t.
|
|
* During intra-node tablet migration, local writes need to be replicated to both the old and new shard.
|
|
* You should keep the effective_replication_map_ptr used to obtain this sharder alive around performing
|
|
* the writes so that topology coordinator can wait for all writes using this particular topology version.
|
|
*
|
|
* Unlike shard_for_reads(), returns an empty vector (instead of 0) if the token is not owned by this node.
|
|
*/
|
|
virtual shard_replica_set shard_for_writes(const token& t, std::optional<write_replica_set_selector> sel = std::nullopt) const = 0;
|
|
|
|
/**
|
|
* Gets the first token greater than `t` that is in shard `shard` for reads, and is a shard boundary (its first token).
|
|
*
|
|
* It holds that:
|
|
*
|
|
* shard_for_reads(token_for_next_shard(t, shard)) == shard
|
|
*
|
|
* If the `spans` parameter is greater than zero, the result is the same as if the function
|
|
* is called `spans` times, each time applied to its return value, but efficiently. This allows
|
|
* selecting ranges that include multiple round trips around the 0..smp::count-1 shard span:
|
|
*
|
|
* token_for_next_shard(t, shard, spans) == token_for_next_shard(token_for_next_shard(t, shard, 1), spans - 1)
|
|
*
|
|
* On overflow, maximum_token() is returned.
|
|
*/
|
|
virtual token token_for_next_shard_for_reads(const token& t, shard_id shard, unsigned spans = 1) const = 0;
|
|
|
|
/**
|
|
* Finds the next token greater than t which is owned by a different shard than the owner of t
|
|
* and returns that token and its owning shard. If no such token exists, returns nullopt.
|
|
*
|
|
* The next shard may not necessarily be a successor of the shard owning t.
|
|
*
|
|
* The returned shard is the shard for reads:
|
|
*
|
|
* shard_for_reads(next_shard(t)->token) == next_shard(t)->shard
|
|
*/
|
|
virtual std::optional<shard_and_token> next_shard_for_reads(const token& t) const = 0;
|
|
|
|
/**
|
|
* @return number of shards configured for this partitioner
|
|
*/
|
|
unsigned shard_count() const {
|
|
return _shard_count;
|
|
}
|
|
|
|
unsigned sharding_ignore_msb() const {
|
|
return _sharding_ignore_msb_bits;
|
|
}
|
|
};
|
|
|
|
/**
|
|
* A sharder for vnode-based tables.
|
|
* Shard assignment for a given token is constant for a given sharder configuration (shard count and ignore MSB bits).
|
|
* Doesn't change on topology changes.
|
|
*/
|
|
class static_sharder : public sharder {
|
|
std::vector<uint64_t> _shard_start;
|
|
public:
|
|
static_sharder(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
|
|
|
|
virtual unsigned shard_of(const token& t) const;
|
|
virtual std::optional<shard_and_token> next_shard(const token& t) const;
|
|
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const;
|
|
|
|
virtual std::optional<unsigned> try_get_shard_for_reads(const token& t) const override;
|
|
virtual shard_replica_set shard_for_writes(const token& t, std::optional<write_replica_set_selector> sel) const override;
|
|
virtual token token_for_next_shard_for_reads(const token& t, shard_id shard, unsigned spans = 1) const override;
|
|
virtual std::optional<shard_and_token> next_shard_for_reads(const token& t) const override;
|
|
|
|
bool operator==(const static_sharder& o) const {
|
|
return _shard_count == o._shard_count && _sharding_ignore_msb_bits == o._sharding_ignore_msb_bits;
|
|
}
|
|
};
|
|
|
|
/*
|
|
* Finds the first token in token range (`start`, `end`] that belongs to shard shard_idx.
|
|
*
|
|
* If there is no token that belongs to shard shard_idx in this range,
|
|
* `end` is returned.
|
|
*
|
|
* The first token means the one that appears first on the ring when going
|
|
* from `start` to `end`.
|
|
* 'first token' is not always the smallest.
|
|
* For example, if in vnode (100, 10] only tokens 110 and 1 belong to
|
|
* shard shard_idx then token 110 is the first because it appears first
|
|
* when going from 100 to 10 on the ring.
|
|
*/
|
|
dht::token find_first_token_for_shard(
|
|
const dht::static_sharder& sharder, dht::token start, dht::token end, size_t shard_idx);
|
|
|
|
} //namespace dht
|
|
|
|
template<>
|
|
struct fmt::formatter<dht::static_sharder> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(const dht::static_sharder& sharder, fmt::format_context& ctx) const {
|
|
return fmt::format_to(ctx.out(), "sharder[shard_count={}, ignore_msb_bits={}]",
|
|
sharder.shard_count(), sharder.sharding_ignore_msb());
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<dht::shard_replica_set> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const dht::shard_replica_set& rs, FormatContext& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{{{}}}", fmt::join(rs, ", "));
|
|
}
|
|
};
|