Merge 'Improve background disposal of tablet_metadata' from Benny Halevy

As seen in #23284, when the tablet_metadata contains many tables, even empty ones,
we're seeing a long queue of seastar tasks coming from the individual destruction of
`tablet_map_ptr = foreign_ptr<lw_shared_ptr<const tablet_map>>`.

This change improves `tablet_metadata::clear_gently` to destroy the `tablet_map_ptr` objects
on their owner shard by sorting them into vectors, per- owner shard.

Also, background call to clear_gently was added to `~token_metadata`, as it is destroyed
arbitrarily when automatic token_metadata_ptr variables go out of scope, so that the
contained tablet_metadata would be cleared gently.

Finally, a unit test was added to reproduce the `Too long queue accumulated for gossip` symptom
and verify that it is gone with this change.

Fixes #24814
Refs #23284

This change is not marked as fixing the issue since we still need to verify that there is no impact on query performance, reactor stalls, or large allocations, with a large number of tablet-based tables.

* Since the issue exists in 2025.1, requesting backport to 2025.1 and upwards

Closes scylladb/scylladb#24618

* github.com:scylladb/scylladb:
  token_metadata_impl: clear_gently: release version tracker early
  test: cluster: test_tablets_merge: add test_tablet_split_merge_with_many_tables
  token_metadata: clear_and_destroy_impl when destroyed
  token_metadata: keep a reference to shared_token_metadata
  token_metadata: move make_token_metadata_ptr into shared_token_metadata class
  replica: database: get and expose a mutable locator::shared_token_metadata
  locator: tablets: tablet_metadata: clear_gently: optimize foreign ptr destruction
This commit is contained in:
Avi Kivity
2025-07-06 19:43:50 +03:00
15 changed files with 231 additions and 54 deletions

View File

@@ -409,19 +409,25 @@ void tablet_metadata::drop_tablet_map(table_id id) {
}
future<> tablet_metadata::clear_gently() {
for (auto&& [id, map] : _tablets) {
const auto shard = map.get_owner_shard();
co_await smp::submit_to(shard, [map = std::move(map)] () mutable {
auto map_ptr = map.release();
// Others copies exist, we simply drop ours, no need to clear anything.
if (map_ptr.use_count() > 1) {
return make_ready_future<>();
}
return const_cast<tablet_map&>(*map_ptr).clear_gently().finally([map_ptr = std::move(map_ptr)] { });
});
tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this));
// First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs
// on this shard. We don't use sharded<> here since it will require a similar
// submit_to to each shard owner per tablet-map.
std::vector<std::vector<tablet_map_ptr>> tablet_maps_per_shard;
tablet_maps_per_shard.resize(smp::count);
for (auto& [_, map_ptr] : _tablets) {
tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr));
}
_tablets.clear();
// Now destroy the foreign tablet map pointers on each shard.
co_await smp::invoke_on_all([&] -> future<> {
for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) {
auto map = map_ptr.release();
co_await utils::clear_gently(map);
}
});
co_await utils::clear_gently(_table_groups);
co_await utils::clear_gently(_base_table);

View File

@@ -357,6 +357,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
}
future<> token_metadata_impl::clear_gently() noexcept {
_version_tracker = {};
co_await utils::clear_gently(_token_to_endpoint_map);
co_await utils::clear_gently(_normal_token_owners);
co_await utils::clear_gently(_bootstrap_tokens);
@@ -834,16 +835,30 @@ token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
{
}
token_metadata::token_metadata(config cfg)
: _impl(std::make_unique<token_metadata_impl>(cfg))
token_metadata::token_metadata(shared_token_metadata& stm, config cfg)
: _shared_token_metadata(&stm)
, _impl(std::make_unique<token_metadata_impl>(std::move(cfg)))
{
}
token_metadata::~token_metadata() = default;
token_metadata::~token_metadata() {
clear_and_dispose_impl();
}
token_metadata::token_metadata(token_metadata&&) noexcept = default;
token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default;
token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept {
if (this != &o) {
clear_and_dispose_impl();
_shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr);
_impl = std::exchange(o._impl, nullptr);
}
return *this;
}
void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) {
_shared_token_metadata = &stm;
}
const std::vector<token>&
token_metadata::sorted_tokens() const {
@@ -1027,6 +1042,15 @@ token_metadata::clone_after_all_left() const noexcept {
co_return token_metadata(co_await _impl->clone_after_all_left());
}
void token_metadata::clear_and_dispose_impl() noexcept {
if (!_shared_token_metadata) {
return;
}
if (auto impl = std::exchange(_impl, nullptr)) {
_shared_token_metadata->clear_and_dispose(std::move(impl));
}
}
future<> token_metadata::clear_gently() noexcept {
return _impl->clear_gently();
}
@@ -1143,6 +1167,17 @@ version_tracker shared_token_metadata::new_tracker(token_metadata::version_t ver
return tracker;
}
future<> shared_token_metadata::stop() noexcept {
co_await _background_dispose_gate.close();
}
void shared_token_metadata::clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept {
// Safe to drop the future since the gate is closed in stop()
if (auto gh = _background_dispose_gate.try_hold()) {
(void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {});
}
}
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
@@ -1154,6 +1189,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
_stale_versions_in_use = _versions_barrier.advance_and_await();
}
tmptr->set_shared_token_metadata(*this);
_shared = std::move(tmptr);
_shared->set_version_tracker(new_tracker(_shared->get_version()));
@@ -1216,7 +1252,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async());
auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async());
auto& tm = *tmptr;
// bump the token_metadata ring_version
// to invalidate cached token/replication mappings
@@ -1227,7 +1263,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
// Apply the mutated token_metadata only after successfully cloning it on all shards.
pending_token_metadata_ptr[base_shard] = tmptr;
co_await smp::invoke_on_others(base_shard, [&] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async());
pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async());
});
co_await stm.invoke_on_all([&] (shared_token_metadata& stm) {

View File

@@ -47,7 +47,7 @@ class abstract_replication_strategy;
using token = dht::token;
class token_metadata;
class shared_token_metadata;
class tablet_metadata;
struct host_id_or_endpoint {
@@ -166,6 +166,7 @@ private:
};
class token_metadata final {
shared_token_metadata* _shared_token_metadata = nullptr;
std::unique_ptr<token_metadata_impl> _impl;
private:
friend class token_metadata_ring_splitter;
@@ -178,7 +179,7 @@ public:
using version_t = service::topology::version_t;
using version_tracker_t = version_tracker;
token_metadata(config cfg);
token_metadata(shared_token_metadata& stm, config cfg);
explicit token_metadata(std::unique_ptr<token_metadata_impl> impl);
token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr
token_metadata& operator=(token_metadata&&) noexcept;
@@ -355,6 +356,11 @@ public:
friend class shared_token_metadata;
private:
void set_version_tracker(version_tracker_t tracker);
void set_shared_token_metadata(shared_token_metadata& stm);
// Clears and disposes the token metadata impl in the background, if present.
void clear_and_dispose_impl() noexcept;
};
struct topology_change_info {
@@ -371,12 +377,8 @@ struct topology_change_info {
using token_metadata_lock = semaphore_units<>;
using token_metadata_lock_func = noncopyable_function<future<token_metadata_lock>() noexcept>;
template <typename... Args>
mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) {
return make_lw_shared<token_metadata>(std::forward<Args>(args)...);
}
class shared_token_metadata {
class shared_token_metadata : public peering_sharded_service<shared_token_metadata> {
named_gate _background_dispose_gate{"shared_token_metadata::background_dispose_gate"};
mutable_token_metadata_ptr _shared;
token_metadata_lock_func _lock_func;
std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2);
@@ -408,7 +410,7 @@ public:
// used to construct the shared object as a sharded<> instance
// lock_func returns semaphore_units<>
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg)
: _shared(make_token_metadata_ptr(std::move(cfg)))
: _shared(make_lw_shared<token_metadata>(*this, cfg))
, _lock_func(std::move(lock_func))
, _versions_barrier("shared_token_metadata::versions_barrier")
{
@@ -418,6 +420,17 @@ public:
shared_token_metadata(const shared_token_metadata& x) = delete;
shared_token_metadata(shared_token_metadata&& x) = default;
future<> stop() noexcept;
mutable_token_metadata_ptr make_token_metadata_ptr() {
return make_lw_shared<token_metadata>(*this, token_metadata::config{_shared->get_topology().get_config()});
}
mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) {
tm.set_shared_token_metadata(*this);
return make_lw_shared<token_metadata>(std::move(tm));
}
token_metadata_ptr get() const noexcept {
return _shared;
}
@@ -467,6 +480,8 @@ public:
// Must be called on shard 0.
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
void clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept;
private:
// for testing only, unsafe to be called without awaiting get_lock() first
void mutate_token_metadata_for_test(seastar::noncopyable_function<void (token_metadata&)> func);

View File

@@ -2244,7 +2244,7 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
auto cloned_tmptr = _db.local().get_shared_token_metadata().make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
auto source_dc = utils::optional_param(myloc.dc);

View File

@@ -355,7 +355,7 @@ database::view_update_read_concurrency_sem() {
return *sem;
}
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory& scf, const abort_source& abort, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))

View File

@@ -1599,7 +1599,7 @@ private:
service::migration_notifier& _mnotifier;
gms::feature_service& _feat;
std::vector<std::any> _listeners;
const locator::shared_token_metadata& _shared_token_metadata;
locator::shared_token_metadata& _shared_token_metadata;
lang::manager& _lang_manager;
reader_concurrency_semaphore_group _reader_concurrency_semaphores_group;
@@ -1684,7 +1684,7 @@ public:
// (keyspace/table definitions, column mappings etc.)
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&, const abort_source& abort,
utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
database(database&&) = delete;
@@ -1719,7 +1719,7 @@ public:
return _compaction_manager;
}
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); }
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }

View File

@@ -740,9 +740,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
auto saved_tmpr = get_token_metadata_ptr();
{
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = make_token_metadata_ptr(token_metadata::config {
get_token_metadata().get_topology().get_config()
});
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
tmptr->invalidate_cached_rings();
tmptr->set_version(_topology_state_machine._topology.version);
@@ -3147,9 +3145,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
try {
auto base_shard = this_shard_id();
pending_token_metadata_ptr[base_shard] = tmptr;
auto& sharded_token_metadata = _shared_token_metadata.container();
// clone a local copy of updated token_metadata on all other shards
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async());
pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
});
// Precalculate new effective_replication_map for all keyspaces

View File

@@ -287,12 +287,12 @@ private:
future<> snitch_reconfigured();
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) {
return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) {
// bump the token_metadata ring_version
// to invalidate cached token/replication mappings
// when the modified token_metadata is committed.
tm.invalidate_cached_rings();
return make_ready_future<mutable_token_metadata_ptr>(make_token_metadata_ptr(std::move(tm)));
return _shared_token_metadata.make_token_metadata_ptr(std::move(tm));
});
}

View File

@@ -14,6 +14,7 @@
#include <functional>
#include <seastar/core/on_internal_error.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/closeable.hh>
#include "locator/types.hh"
#include "test/lib/scylla_test_case.hh"
@@ -213,6 +214,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, node1_shard_count);

View File

@@ -280,6 +280,7 @@ void simple_test() {
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
@@ -363,6 +364,7 @@ void heavy_origin_test() {
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); },
locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_stm = deferred_stop(stm);
std::vector<int> dc_racks = {2, 4, 8};
std::vector<int> dc_endpoints = {128, 256, 512};
@@ -476,6 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -567,6 +570,7 @@ static void test_random_balancing(sharded<snitch_ptr>& snitch, gms::inet_address
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -897,6 +901,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
for (size_t run = 0; run < RUNS; ++run) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
std::unordered_set<dht::token> random_tokens;
while (random_tokens.size() < nodes.size() * VNODES) {
@@ -1043,6 +1048,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) {
auto& topo = tm.get_topology();
generate_topology(topo, datacenters, nodes);
@@ -1087,6 +1093,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) {
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
generate_topology(tm.get_topology(), datacenters, nodes);
return make_ready_future();
@@ -1122,6 +1129,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
.local_dc_rack = ip1_dc_rack,
}
});
auto stop_stm = deferred_stop(stm);
// get_location() should work before any node is added
@@ -1249,6 +1257,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -1401,6 +1410,7 @@ void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {

View File

@@ -14,7 +14,9 @@
#include <fmt/std.h>
#include <seastar/core/future.hh>
#include <seastar/util/closeable.hh>
#include "seastarx.hh"
#include "service/qos/qos_common.hh"
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
@@ -107,6 +109,7 @@ SEASTAR_THREAD_TEST_CASE(subscriber_simple) {
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -180,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) {
sl_options.workload = service_level_options::workload_type::interactive;
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg1", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -256,6 +260,7 @@ SEASTAR_THREAD_TEST_CASE(add_remove_bad_sequence) {
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg3", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -282,6 +287,7 @@ SEASTAR_THREAD_TEST_CASE(verify_unset_shares_in_cache_when_service_level_created
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();

View File

@@ -52,9 +52,11 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
}
};
auto& stm = e.shared_token_metadata().local();
{
// Ring with minimum token
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
auto tmptr = stm.make_token_metadata_ptr();
const auto host_id = locator::host_id{utils::UUID(0, 1)};
tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);
tmptr->update_normal_tokens(std::unordered_set<dht::token>({dht::minimum_token()}), host_id).get();
@@ -69,7 +71,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
}
{
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
auto tmptr = stm.make_token_metadata_ptr();
const auto id1 = locator::host_id{utils::UUID(0, 1)};
const auto id2 = locator::host_id{utils::UUID(0, 2)};
tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);

View File

@@ -841,6 +841,7 @@ SEASTAR_TEST_CASE(test_get_shard) {
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
auto stop_stm = deferred_stop(stm);
tablet_id tid(0);
tablet_id tid1(0);
@@ -1090,7 +1091,7 @@ SEASTAR_TEST_CASE(test_sharder) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
tokm.get_topology().add_or_update_endpoint(h1);
std::vector<tablet_id> tablet_ids;
@@ -1305,7 +1306,14 @@ SEASTAR_TEST_CASE(test_intranode_sharding) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = h1;
tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location;
semaphore sem(1);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto tmptr = stm.make_token_metadata_ptr();
auto& tokm = *tmptr;
tokm.get_topology().add_or_update_endpoint(h1);
auto leaving_replica = tablet_replica{h1, 5};
@@ -3779,6 +3787,7 @@ static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id;
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {

View File

@@ -8,6 +8,7 @@
#include <boost/test/unit_test.hpp>
#include <fmt/ranges.h>
#include <seastar/util/closeable.hh>
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
#include "locator/token_metadata.hh"
@@ -31,13 +32,11 @@ namespace {
};
}
mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) {
return make_lw_shared<token_metadata>(token_metadata::config {
topology::config {
.this_host_id = this_host_id,
.local_dc_rack = get_dc_rack(this_host_id)
}
});
token_metadata::config create_token_metadata_config(host_id this_host_id) {
return token_metadata::config{topology::config{
.this_host_id = this_host_id,
.local_dc_rack = get_dc_rack(this_host_id)
}};
}
template <typename Strategy>
@@ -55,7 +54,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy
const auto t1 = dht::token::from_int64(10);
const auto t2 = dht::token::from_int64(20);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_normal_tokens({t1}, e1_id).get();
@@ -75,7 +78,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_normal_tokens({t1}, e1_id).get();
@@ -103,7 +110,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -133,7 +144,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -165,7 +180,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
const auto e3_id = gen_id(3);
const auto e4_id = gen_id(4);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -201,7 +220,11 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -254,7 +277,11 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
const auto e1_id1 = gen_id(1);
const auto e1_id2 = gen_id(2);
auto token_metadata = create_token_metadata(e1_id2);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id2);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced);
token_metadata->update_normal_tokens({t1}, e1_id1).get();

View File

@@ -376,6 +376,71 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks)
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
# Reproduces #23284
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2):
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, 200)])
async def check_logs(when):
for server in servers:
log = await manager.server_open_log(server.server_id)
matches = await log.grep("Too long queue accumulated for gossip")
if matches:
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
await check_logs("after creating tables")
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await check_logs("after split completion")
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
await check_logs("after merge completion")
# Reproduces use-after-free when migration right after merge, but concurrently to background
# merge completion handler.
# See: https://github.com/scylladb/scylladb/issues/24045