diff --git a/dht/auto_refreshing_sharder.hh b/dht/auto_refreshing_sharder.hh index 5c8572dc89..1caf30106a 100644 --- a/dht/auto_refreshing_sharder.hh +++ b/dht/auto_refreshing_sharder.hh @@ -8,7 +8,15 @@ #pragma once -#include "replica/database.hh" +#include "dht/sharder.hh" +#include "locator/abstract_replication_strategy.hh" + +#include +#include + +namespace replica { +class table; +} namespace dht { @@ -27,41 +35,19 @@ class auto_refreshing_sharder : public dht::sharder { optimized_optional _callback; std::optional _sel; private: - void refresh() { - _erm = _table->get_effective_replication_map(); - _sharder = &_erm->get_sharder(*_table->schema()); - _callback = _erm->get_validity_abort_source().subscribe([this] () noexcept { - refresh(); - }); - } + void refresh(); public: - auto_refreshing_sharder(lw_shared_ptr table, std::optional sel = std::nullopt) - : _table(std::move(table)) - , _sel(sel) - { - refresh(); - } + auto_refreshing_sharder(lw_shared_ptr table, std::optional sel = std::nullopt); - virtual ~auto_refreshing_sharder() = default; + virtual ~auto_refreshing_sharder(); - virtual unsigned shard_for_reads(const token& t) const override { - return _sharder->shard_for_reads(t); - } + virtual unsigned shard_for_reads(const token& t) const override; - virtual dht::shard_replica_set shard_for_writes(const token& t, std::optional sel) const override { - if (!sel) { - sel = _sel; - } - return _sharder->shard_for_writes(t, sel); - } + virtual dht::shard_replica_set shard_for_writes(const token& t, std::optional sel) const override; - virtual std::optional next_shard_for_reads(const dht::token& t) const override { - return _sharder->next_shard_for_reads(t); - } + virtual std::optional next_shard_for_reads(const dht::token& t) const override; - virtual dht::token token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans = 1) const override { - return _sharder->token_for_next_shard_for_reads(t, shard, spans); - } + virtual dht::token token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans = 1) const override; }; } // namespace dht diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 5b9dd28122..affcd65241 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -8,6 +8,7 @@ #include "i_partitioner.hh" #include "sharder.hh" +#include "auto_refreshing_sharder.hh" #include #include #include "dht/ring_position.hh" @@ -15,6 +16,7 @@ #include "utils/assert.hh" #include "utils/class_registrator.hh" #include "sstables/key.hh" +#include "replica/database.hh" #include #include #include "utils/log.hh" @@ -492,6 +494,46 @@ std::optional is_single_shard(const dht::sharder& sharder, const schem return shard; } +auto_refreshing_sharder::auto_refreshing_sharder(lw_shared_ptr table, std::optional sel) + : _table(std::move(table)) + , _sel(sel) +{ + refresh(); +} + +auto_refreshing_sharder::~auto_refreshing_sharder() = default; + +void +auto_refreshing_sharder::refresh() { + _erm = _table->get_effective_replication_map(); + _sharder = &_erm->get_sharder(*_table->schema()); + _callback = _erm->get_validity_abort_source().subscribe([this] () noexcept { + refresh(); + }); +} + +unsigned auto_refreshing_sharder::shard_for_reads(const token& t) const { + return _sharder->shard_for_reads(t); +} + +dht::shard_replica_set +auto_refreshing_sharder::shard_for_writes(const token& t, std::optional sel) const { + if (!sel) { + sel = _sel; + } + return _sharder->shard_for_writes(t, sel); +} + +std::optional +auto_refreshing_sharder::next_shard_for_reads(const dht::token& t) const { + return _sharder->next_shard_for_reads(t); +} + +dht::token +auto_refreshing_sharder::token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans) const { + return _sharder->token_for_next_shard_for_reads(t, shard, spans); +} + } auto fmt::formatter::format(const dht::ring_position_view& pos, fmt::format_context& ctx) const