Merge 'replica: allocate storage groups dynamically' from Aleksandra Martyniuk

Allocate storage groups dynamically, i.e.:
- on table creation allocate only storage groups that are on this
  shard;
- allocate a storage group for tablet that is moved to this shard;
- deallocate storage group for tablet that is moved out of this shard.

Output of `./build/release/scylla perf-simple-query -c 1 --random-seed=2248493992` before change:
```
random-seed=2248493992
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
64933.90 tps ( 63.2 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42163 insns/op,        0 errors)
65865.36 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42155 insns/op,        0 errors)
66649.36 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42176 insns/op,        0 errors)
67029.60 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42176 insns/op,        0 errors)
68361.21 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42166 insns/op,        0 errors)

median 66649.36 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42176 insns/op,        0 errors)
median absolute deviation: 784.00
maximum: 68361.21
minimum: 64933.90
```

Output of `./build/release/scylla perf-simple-query -c 1 --random-seed=2248493992` after change:
```
random-seed=2248493992
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
63744.12 tps ( 63.2 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42153 insns/op,        0 errors)
66613.16 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42153 insns/op,        0 errors)
69667.39 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42184 insns/op,        0 errors)
67824.78 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42180 insns/op,        0 errors)
67244.21 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42174 insns/op,        0 errors)

median 67244.21 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   42174 insns/op,        0 errors)
median absolute deviation: 631.05
maximum: 69667.39
minimum: 63744.12
```

Fixes: #16877.

Closes scylladb/scylladb#17664

* github.com:scylladb/scylladb:
  test: add test for back and forth tablets migration
  replica: allocate storage groups dynamically
  replica: refresh snapshot in compaction_group::cleanup
  replica: add rwlock to storage_group_manager
  replica: handle reads of non-existing tablets gracefully
  service: move to cleanup stage if allow_write_both_read_old fails
  replica: replace table::as_table_state
  compaction: pass compaction group id to reshape_compaction_group
  replica: open code get_compaction_group in perform_cleanup_compaction
  replica: drop single_compaction_group_if_available
This commit is contained in:
Avi Kivity
2024-05-12 21:22:02 +03:00
18 changed files with 220 additions and 99 deletions

View File

@@ -159,9 +159,9 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
auto& t = table.as_table_state();
auto& t = table.try_get_table_state_with_static_sharding();
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> {
return table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> {
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
sstables::compaction_descriptor desc(sstlist);
@@ -604,12 +604,12 @@ future<> shard_reshaping_compaction_task_impl::run() {
}
// reshape sstables individually within the compaction groups
for (auto& sstables_in_cg : sstables_grouped_by_compaction_group | boost::adaptors::map_values) {
co_await reshape_compaction_group(sstables_in_cg, table, info);
for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) {
co_await reshape_compaction_group(sstables_in_cg.first, sstables_in_cg.second, table, info);
}
}
future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::unordered_set<sstables::shared_sstable>& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) {
future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(size_t compaction_group_id, std::unordered_set<sstables::shared_sstable>& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) {
while (true) {
auto reshape_candidates = boost::copy_range<std::vector<sstables::shared_sstable>>(sstables_in_cg
@@ -635,8 +635,9 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::uno
desc.creator = _creator;
try {
co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> {
sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state(), progress_monitor);
auto& t = table.get_compaction_group(compaction_group_id)->as_table_state();
co_await table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, &t] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> {
sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, t, progress_monitor);
// update the sstables_in_cg set with new sstables and remove the reshaped ones
for (auto& sst : sstlist) {
sstables_in_cg.erase(sst);

View File

@@ -606,7 +606,7 @@ private:
std::function<bool (const sstables::shared_sstable&)> _filter;
uint64_t& _total_shard_size;
future<> reshape_compaction_group(std::unordered_set<sstables::shared_sstable>& sstables_in_cg, replica::column_family& table, const tasks::task_info& info);
future<> reshape_compaction_group(size_t compaction_group_id, std::unordered_set<sstables::shared_sstable>& sstables_in_cg, replica::column_family& table, const tasks::task_info& info);
public:
shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,

View File

@@ -292,7 +292,7 @@ stateDiagram-v2
use_new --> cleanup
cleanup --> end_migration
end_migration --> [*]
allow_write_both_read_old --> revert_migration: error
allow_write_both_read_old --> cleanup_target: error
write_both_read_old --> cleanup_target: error
streaming --> cleanup_target: error
write_both_read_new --> if_state: error

View File

@@ -8,6 +8,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include "database_fwd.hh"
#include "compaction/compaction_descriptor.hh"
@@ -233,10 +234,16 @@ protected:
// The list entries are unlinked automatically when the storage group, they belong to, is removed.
compaction_group_list _compaction_groups;
storage_group_map _storage_groups;
// Prevents _storage_groups from having its elements inserted or deleted while other layer iterates
// over them (or over _compaction_groups).
seastar::rwlock _lock;
public:
virtual ~storage_group_manager();
seastar::rwlock& get_rwlock() noexcept {
return _lock;
}
const compaction_group_list& compaction_groups() const noexcept {
return _compaction_groups;
}
@@ -244,19 +251,12 @@ public:
return _compaction_groups;
}
const storage_group_map& storage_groups() const noexcept {
return _storage_groups;
}
storage_group_map& storage_groups() noexcept {
return _storage_groups;
}
future<> for_each_storage_group_gently(std::function<future<>(size_t, storage_group&)> f);
void for_each_storage_group(std::function<void(size_t, storage_group&)> f) const;
void remove_storage_group(size_t id);
// FIXME: Cannot return nullptr, signature can be changed to return storage_group&.
storage_group* storage_group_for_id(const schema_ptr&, size_t i) const;
compaction_group* single_compaction_group_if_available() noexcept {
return _compaction_groups.size() == 1 ? &_compaction_groups.front() : nullptr;
}
// Caller must keep the current effective_replication_map_ptr valid
// until the storage_group_manager finishes update_effective_replication_map
virtual future<> update_effective_replication_map(const locator::effective_replication_map& erm) = 0;

View File

@@ -114,6 +114,10 @@ namespace gms {
class gossiper;
}
namespace compaction {
class shard_reshaping_compaction_task_impl;
}
namespace db {
class commitlog;
class config;
@@ -591,8 +595,6 @@ private:
storage_group* storage_group_for_id(size_t i) const;
std::unique_ptr<storage_group_manager> make_storage_group_manager();
// Return compaction group if table owns a single one. Otherwise, null is returned.
compaction_group* single_compaction_group_if_available() const noexcept;
compaction_group* get_compaction_group(size_t id) const noexcept;
// Select a compaction group from a given token.
compaction_group& compaction_group_for_token(dht::token token) const noexcept;
@@ -604,8 +606,6 @@ private:
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept;
// Returns a list of all compaction groups.
compaction_group_list& compaction_groups() const noexcept;
// Returns a list of all storage groups.
const storage_group_map& storage_groups() const noexcept;
// Safely iterate through compaction groups, while performing async operations on them.
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
@@ -838,7 +838,15 @@ public:
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
future<> update_effective_replication_map(locator::effective_replication_map_ptr);
[[gnu::always_inline]] bool uses_tablets() const;
private:
future<> clear_inactive_reads_for_tablet(database& db, storage_group* sg);
future<> stop_compaction_groups(storage_group* sg);
future<> flush_compaction_groups(storage_group* sg);
future<> cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg);
public:
future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id);
// For tests only.
future<> cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid);
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
shard_id shard_of(const mutation& m) const {
@@ -1198,6 +1206,7 @@ public:
friend class distributed_loader;
friend class table_populator;
friend class compaction::shard_reshaping_compaction_task_impl;
private:
timer<> _off_strategy_trigger;
@@ -1207,8 +1216,7 @@ public:
void update_off_strategy_trigger();
void enable_off_strategy_trigger();
// FIXME: get rid of it once no users.
compaction::table_state& as_table_state() const noexcept;
compaction::table_state& try_get_table_state_with_static_sharding() const;
// Safely iterate through table states, while performing async operations on them.
future<> parallel_foreach_table_state(std::function<future<>(compaction::table_state&)> action);

View File

@@ -548,10 +548,31 @@ void table::enable_off_strategy_trigger() {
storage_group_manager::~storage_group_manager() = default;
future<> storage_group_manager::for_each_storage_group_gently(std::function<future<>(size_t, storage_group&)> f) {
rwlock::holder shared_lock = co_await get_rwlock().hold_read_lock();
for (auto& [id, sg]: _storage_groups) {
co_await f(id, *sg);
}
}
void storage_group_manager::for_each_storage_group(std::function<void(size_t, storage_group&)> f) const {
for (auto& [id, sg]: _storage_groups) {
f(id, *sg);
}
}
void storage_group_manager::remove_storage_group(size_t id) {
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
_storage_groups.erase(it);
} else {
throw std::out_of_range(format("remove_storage_group: storage group with id={} not found", id));
}
}
storage_group* storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const {
auto it = storage_groups().find(i);
if (it == storage_groups().end()) [[unlikely]] {
on_internal_error(tlogger, format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name()));
auto it = _storage_groups.find(i);
if (it == _storage_groups.end()) [[unlikely]] {
throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name()));
}
return it->second.get();
}
@@ -661,10 +682,9 @@ public:
auto shard = tmap.get_shard(tid, _my_host_id);
if (shard && *shard == this_shard_id()) {
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
ret[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
}
// FIXME: don't allocate compaction groups for tablets that aren't present in this shard.
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
ret[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
}
_storage_groups = std::move(ret);
}
@@ -805,7 +825,7 @@ bool tablet_storage_group_manager::all_storage_groups_split() {
return true;
}
auto split_ready = std::ranges::all_of(storage_groups() | boost::adaptors::map_values,
auto split_ready = std::ranges::all_of(_storage_groups | boost::adaptors::map_values,
std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(compaction_groups())));
// The table replica will say to coordinator that its split status is ready by
@@ -834,9 +854,9 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com
future<> tablet_storage_group_manager::split_all_storage_groups() {
sstables::compaction_type_options::split opt = split_compaction_options();
for (auto& storage_group : storage_groups() | boost::adaptors::map_values) {
co_await storage_group->split(compaction_groups(), opt);
}
co_await for_each_storage_group_gently([this, opt] (size_t i, storage_group& storage_group) {
return storage_group.split(compaction_groups(), opt);
});
}
future<> table::split_all_storage_groups() {
@@ -849,7 +869,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
return make_ready_future<>();
}
auto& sg = storage_groups()[idx];
auto& sg = _storage_groups[idx];
if (!sg) {
on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard",
idx, schema()->ks_name(), schema()->cf_name()));
@@ -873,10 +893,6 @@ std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
return ret;
}
compaction_group* table::single_compaction_group_if_available() const noexcept {
return _sg_manager->single_compaction_group_if_available();
}
compaction_group* table::get_compaction_group(size_t id) const noexcept {
return storage_group_for_id(id)->main_compaction_group().get();
}
@@ -912,8 +928,8 @@ utils::chunked_vector<compaction_group*> tablet_storage_group_manager::compactio
size_t candidate_end = tr.end() ? storage_group_id_for_token(tr.end()->value()) : (tablet_count() - 1);
while (candidate_start <= candidate_end) {
auto it = storage_groups().find(candidate_start++);
if (it == storage_groups().end()) {
auto it = _storage_groups.find(candidate_start++);
if (it == _storage_groups.end()) {
continue;
}
auto& sg = it->second;
@@ -965,12 +981,8 @@ compaction_group_list& table::compaction_groups() const noexcept {
return _sg_manager->compaction_groups();
}
const storage_group_map& table::storage_groups() const noexcept {
return _sg_manager->storage_groups();
}
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
// TODO: place a barrier here when we allow dynamic groups.
rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock();
co_await coroutine::parallel_for_each(compaction_groups(), [&] (compaction_group& cg) {
return action(cg);
});
@@ -1009,11 +1021,14 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
compaction_group& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
// atomically load all opened sstables into column family.
compaction_group& cg = compaction_group_for_sstable(sst);
if (!offstrategy) {
add_sstable(cg, sst);
} else {
@@ -1789,7 +1804,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
co_await flush();
}
auto& cg = *get_compaction_group(0);
auto& cg = *storage_group_for_id(0)->main_compaction_group().get();
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state(), info);
}
@@ -2024,13 +2039,13 @@ locator::table_load_stats table::table_load_stats(std::function<bool(locator::gl
locator::table_load_stats stats;
stats.split_ready_seq_number = _sg_manager->split_ready_seq_number();
for (auto& [id, sg] : storage_groups()) {
_sg_manager->for_each_storage_group([&] (size_t id, storage_group& sg) {
locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) };
if (!tablet_filter(gid)) {
continue;
return;
}
stats.size_in_bytes += sg->live_disk_space_used();
}
stats.size_in_bytes += sg.live_disk_space_used();
});
return stats;
}
@@ -2058,7 +2073,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
// Stop the released main compaction groups asynchronously
future<> stop_fut = make_ready_future<>();
for (auto& [id, sg] : storage_groups()) {
for (auto& [id, sg] : _storage_groups) {
if (!sg->main_compaction_group()->empty()) {
on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \
"therefore groups cannot be remapped with the new tablet count.",
@@ -2103,12 +2118,35 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
if (new_tablet_count > old_tablet_count) {
tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets",
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
return handle_tablet_split_completion(*old_tablet_map, *new_tablet_map);
co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map);
co_return;
}
return make_ready_future();
// Allocate storage group if tablet is migrating in.
auto this_replica = locator::tablet_replica{
.host = erm.get_token_metadata().get_my_id(),
.shard = this_shard_id()
};
auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) {
return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica;
};
for (auto& transition : new_tablet_map->transitions()) {
auto tid = transition.first;
auto transition_info = transition.second;
if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) {
auto range = new_tablet_map->get_token_range(tid);
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
_storage_groups[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
}
}
co_return;
}
future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
// Exclusive lock is meant to protect storage groups, but we hold it here to prevent preemption
// between erm and storage groups updates as they need to be consistent.
rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock();
auto old_erm = std::exchange(_erm, std::move(erm));
if (uses_tablets()) {
@@ -3330,8 +3368,10 @@ compaction::table_state& compaction_group::as_table_state() const noexcept {
return *_table_state;
}
compaction::table_state& table::as_table_state() const noexcept {
// FIXME: kill it once we're done with all remaining users.
compaction::table_state& table::try_get_table_state_with_static_sharding() const {
if (!uses_static_sharding()) {
throw std::runtime_error("Getting table state is allowed only with static sharding");
}
return get_compaction_group(0)->as_table_state();
}
@@ -3412,25 +3452,30 @@ future<> compaction_group::cleanup() {
tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.",
p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name());
co_await _t._cache.invalidate(std::move(updater), p_range);
_t._cache.refresh_snapshot();
}
future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
auto holder = async_gate().hold();
auto* sg = storage_group_for_id(tid.value());
future<> table::clear_inactive_reads_for_tablet(database& db, storage_group* sg) {
for (auto& cg_ptr : sg->compaction_groups()) {
if (!cg_ptr) {
throw std::runtime_error(format("Cannot cleanup tablet {} of table {}.{} because it is not allocated in this shard",
tid, _schema->ks_name(), _schema->cf_name()));
}
co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range());
}
}
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
// FIXME: to be able to stop group and provide guarantee above, we must first be able to reallocate a new group if tablet is migrated back.
//co_await _cg.stop();
future<> table::stop_compaction_groups(storage_group* sg) {
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
for (auto& cg_ptr : sg->compaction_groups()) {
co_await cg_ptr->stop();
}
}
future<> table::flush_compaction_groups(storage_group* sg) {
for (auto& cg_ptr : sg->compaction_groups()) {
co_await cg_ptr->flush();
}
}
future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg) {
for (auto& cg_ptr : sg->compaction_groups()) {
co_await cg_ptr->cleanup();
// FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation,
// and can cover some mutations which weren't cleaned, causing them to be lost during replay.
@@ -3448,8 +3493,29 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
}
tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name());
}
// FIXME: Deallocate compaction group in this shard
future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
auto holder = async_gate().hold();
rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock();
auto* sg = storage_group_for_id(tid.value());
co_await clear_inactive_reads_for_tablet(db, sg);
// compaction_group::stop takes care of flushing.
co_await stop_compaction_groups(sg);
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
_sg_manager->remove_storage_group(tid.value());
}
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
auto holder = async_gate().hold();
// Hold shared lock to keep storage group alive.
rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock();
auto* sg = storage_group_for_id(tid.value());
co_await clear_inactive_reads_for_tablet(db, sg);
co_await flush_compaction_groups(sg);
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
}
} // namespace replica

View File

@@ -367,15 +367,13 @@ public:
: _schema(std::move(s))
, _tablet_map(tmap.tablet_count())
{
for (const auto& [id, sg] : sgm.storage_groups()) {
if (sg) {
auto set = sg->make_sstable_set();
_size += set->size();
_bytes_on_disk += set->bytes_on_disk();
_sstable_sets[id] = std::move(set);
_sstable_set_ids.insert(id);
}
}
sgm.for_each_storage_group([this] (size_t id, storage_group& sg) {
auto set = sg.make_sstable_set();
_size += set->size();
_bytes_on_disk += set->bytes_on_disk();
_sstable_sets[id] = std::move(set);
_sstable_set_ids.insert(id);
});
}
tablet_sstable_set(const tablet_sstable_set& o)
@@ -568,7 +566,7 @@ public:
auto token = pos.token();
if (!_cur_set || pos.token() >= _lowest_next_token) {
auto idx = _tset.group_of(token);
if (!token.is_maximum()) {
if (!token.is_maximum() && _tset._sstable_set_ids.contains(idx)) {
_cur_set = _tset.find_sstable_set(idx);
}
// Set the next token to point to the next engaged storage group.

View File

@@ -1047,7 +1047,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case locator::tablet_transition_stage::allow_write_both_read_old:
if (action_failed(tablet_state.barriers[trinfo.stage])) {
if (check_excluded_replicas()) {
transition_to_with_barrier(locator::tablet_transition_stage::revert_migration);
transition_to_with_barrier(locator::tablet_transition_stage::cleanup_target);
break;
}
}

View File

@@ -134,7 +134,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanups) {
// Cleanup the tablet.
e.db().invoke_on_all([&] (replica::database& db) {
return db.find_column_family("ks", "cf").cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0));
return db.find_column_family("ks", "cf").cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0));
}).get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 0);
@@ -179,7 +179,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanup_record_gc) {
};
auto cleanup_tablet = [&] (std::string cf) {
auto& db = e.local_db();
db.find_column_family("ks", cf).cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get();
db.find_column_family("ks", cf).cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get();
};
auto get_num_records = [&] {
auto res = e.execute_cql("select * from system.commitlog_cleanups;").get();

View File

@@ -124,7 +124,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
flush(e);
e.db().invoke_on_all([] (replica::database& dbi) {
return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr<replica::table> t) {
return dbi.get_compaction_manager().perform_major_compaction(t->as_table_state());
return dbi.get_compaction_manager().perform_major_compaction(t->try_get_table_state_with_static_sharding());
});
}).get();

View File

@@ -3027,7 +3027,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt
desc.replacer = replacer_fn_no_op();
auto cdata = compaction_manager::create_compaction_data();
compaction_progress_monitor progress_monitor;
sstables::compact_sstables(std::move(desc), cdata, cf->as_table_state(), progress_monitor).get();
sstables::compact_sstables(std::move(desc), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get();
return compacted_sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice());
}

View File

@@ -4991,7 +4991,7 @@ static future<> run_incremental_compaction_test(sstables::offstrategy offstrateg
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
run_compaction(t, std::move(owned_ranges_ptr)).get();
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty());
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty());
testlog.info("Cleanup has finished");
}
@@ -5099,7 +5099,7 @@ SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) {
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty());
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty());
testlog.info("Cleanup has finished");
}

View File

@@ -38,7 +38,7 @@ public:
return _cf->add_sstable_and_update_cache(sstable, offstrategy);
}
auto new_sstables = { sstable };
return _cf->as_table_state().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no);
return _cf->try_get_table_state_with_static_sharding().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no);
}
future<> rebuild_sstable_list(compaction::table_state& table_s, const std::vector<sstables::shared_sstable>& new_sstables,

View File

@@ -842,7 +842,7 @@ void test_commutative_row_deletion(cql_test_env& e, std::function<void()>&& mayb
}});
});
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get();
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get();
}
SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) {
@@ -1078,7 +1078,7 @@ void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::func
}});
});
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get();
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get();
eventually([&] {
auto msg = e.execute_cql("select * from vcf limit 1").get();
assert_that(msg).is_rows().with_rows({{

View File

@@ -70,10 +70,10 @@ public:
return true;
}
const sstables::sstable_set& main_sstable_set() const override {
return table().as_table_state().main_sstable_set();
return table().try_get_table_state_with_static_sharding().main_sstable_set();
}
const sstables::sstable_set& maintenance_sstable_set() const override {
return table().as_table_state().maintenance_sstable_set();
return table().try_get_table_state_with_static_sharding().maintenance_sstable_set();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(*this, sstables, query_time);
@@ -105,7 +105,7 @@ public:
}
bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
return table().as_table_state().on_compaction_completion(std::move(desc), offstrategy);
return table().try_get_table_state_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy);
}
bool is_auto_compaction_disabled_by_user() const noexcept override {
return table().is_auto_compaction_disabled_by_user();

View File

@@ -1762,7 +1762,7 @@ void populate(const std::vector<dataset*>& datasets, cql_test_env& env, const ta
output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names());
db.get_compaction_manager().run_with_compaction_disabled(cf.as_table_state(), [&] {
db.get_compaction_manager().run_with_compaction_disabled(cf.try_get_table_state_with_static_sharding(), [&] {
return seastar::async([&] {
auto gen = ds.make_generator(s, cfg);
while (auto mopt = gen()) {
@@ -1869,7 +1869,7 @@ auto make_compaction_disabling_guard(replica::database& db, std::vector<replica:
shared_promise<> pr;
for (auto&& t : tables) {
// FIXME: discarded future.
(void)db.get_compaction_manager().run_with_compaction_disabled(t->as_table_state(), [f = shared_future<>(pr.get_shared_future())] {
(void)db.get_compaction_manager().run_with_compaction_disabled(t->try_get_table_state_with_static_sharding(), [f = shared_future<>(pr.get_shared_future())] {
return f.get_future();
});
}

View File

@@ -240,7 +240,7 @@ public:
descriptor.replacer = sstables::replacer_fn_no_op();
auto cdata = compaction_manager::create_compaction_data();
compaction_progress_monitor progress_monitor;
auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->as_table_state(), progress_monitor).get();
auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get();
auto end = perf_sstable_test_env::now();
auto partitions_per_sstable = _cfg.partitions / _cfg.sstables;

View File

@@ -242,3 +242,51 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
assert len(replicas) == 1
for r in replicas[0].replicas:
assert r[0] != host_ids[failer.fail_idx]
@pytest.mark.asyncio
async def test_tablet_back_and_forth_migration(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'experimental_features': ['tablets', 'consistent-topology-changes']}
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
async def assert_rows(num):
res = await cql.run_async(f"SELECT * FROM test.test")
assert len(res) == num
await make_server()
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await make_server()
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({1}, {1});")
await assert_rows(1)
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
old_replica = replicas[0].replicas[0]
assert old_replica[0] != host_ids[1]
new_replica = (host_ids[1], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await assert_rows(1)
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({2}, {2});")
await assert_rows(2)
logger.info(f"Moving tablet {new_replica} -> {old_replica}")
manager.api.move_tablet(servers[0].ip_addr, "test", "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0)
await assert_rows(2)
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({3}, {3});")
await assert_rows(3)