Merge 'replica: remove rwlock for protecting iteration over storage group map' from Raphael "Raph" Carvalho

rwlock was added to protect iterations against concurrent updates to the map.

the updates can happen when allocating a new tablet replica or removing an old one (tablet cleanup).

the rwlock is very problematic because it can result in topology changes blocked, as updating token metadata takes the exclusive lock, which is serialized with table wide ops like split / major / explicit flush (and those can take a long time).

to get rid of the lock, we can copy the storage group map and guard individual groups with a gate (not a problem since map is expected to have a maximum of ~100 elements). so cleanup can close that gate (carefully closed after stopping individual groups such that migrations aren't blocked by long-running ops like major), and ongoing iterations (e.g. triggered by nodetool flush) can skip a group that was closed, as such a group is being migrated out.

Fixes #18821.

```
WRITE
=====

./build/release/scylla perf-simple-query --smp 1 --memory 2G --initial-tablets 10 --tablets --write

- BEFORE

65559.52 tps ( 59.6 allocs/op,  16.4 logallocs/op,  14.3 tasks/op,   52841 insns/op,   30946 cycles/op,        0 errors)
67408.05 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53018 insns/op,   30874 cycles/op,        0 errors)
67714.72 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53026 insns/op,   30881 cycles/op,        0 errors)
67825.57 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53015 insns/op,   30821 cycles/op,        0 errors)
67810.74 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53009 insns/op,   30828 cycles/op,        0 errors)

         throughput: mean=67263.72 standard-deviation=967.40 median=67714.72 median-absolute-deviation=547.02 maximum=67825.57 minimum=65559.52
instructions_per_op: mean=52981.61 standard-deviation=79.09 median=53014.96 median-absolute-deviation=36.54 maximum=53025.79 minimum=52840.56
  cpu_cycles_per_op: mean=30869.90 standard-deviation=50.23 median=30874.06 median-absolute-deviation=42.11 maximum=30945.94 minimum=30820.89

- AFTER
65448.76 tps ( 59.5 allocs/op,  16.4 logallocs/op,  14.3 tasks/op,   52788 insns/op,   31013 cycles/op,        0 errors)
67290.83 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53025 insns/op,   30950 cycles/op,        0 errors)
67646.81 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53025 insns/op,   30909 cycles/op,        0 errors)
67565.90 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53058 insns/op,   30951 cycles/op,        0 errors)
67537.32 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   52983 insns/op,   30963 cycles/op,        0 errors)

         throughput: mean=67097.93 standard-deviation=931.44 median=67537.32 median-absolute-deviation=467.97 maximum=67646.81 minimum=65448.76
instructions_per_op: mean=52975.85 standard-deviation=108.07 median=53024.55 median-absolute-deviation=49.45 maximum=53057.99 minimum=52788.49
  cpu_cycles_per_op: mean=30957.17 standard-deviation=37.43 median=30951.31 median-absolute-deviation=7.51 maximum=31013.01 minimum=30908.62

READ
=====

./build/release/scylla perf-simple-query --smp 1 --memory 2G --initial-tablets 10 --tablets

- BEFORE

79423.36 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41840 insns/op,   26820 cycles/op,        0 errors)
81076.70 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41837 insns/op,   26583 cycles/op,        0 errors)
80927.36 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41829 insns/op,   26629 cycles/op,        0 errors)
80539.44 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41841 insns/op,   26735 cycles/op,        0 errors)
80793.10 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41864 insns/op,   26662 cycles/op,        0 errors)

         throughput: mean=80551.99 standard-deviation=661.12 median=80793.10 median-absolute-deviation=375.37 maximum=81076.70 minimum=79423.36
instructions_per_op: mean=41842.20 standard-deviation=13.26 median=41840.14 median-absolute-deviation=5.68 maximum=41864.50 minimum=41829.29
  cpu_cycles_per_op: mean=26685.88 standard-deviation=93.31 median=26662.18 median-absolute-deviation=56.47 maximum=26820.08 minimum=26582.68

- AFTER
79464.70 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41799 insns/op,   26761 cycles/op,        0 errors)
80954.58 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41803 insns/op,   26605 cycles/op,        0 errors)
81160.90 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41811 insns/op,   26555 cycles/op,        0 errors)
81263.10 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41814 insns/op,   26527 cycles/op,        0 errors)
81162.97 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41806 insns/op,   26549 cycles/op,        0 errors)

         throughput: mean=80801.25 standard-deviation=755.54 median=81160.90 median-absolute-deviation=361.72 maximum=81263.10 minimum=79464.70
instructions_per_op: mean=41806.47 standard-deviation=5.85 median=41806.05 median-absolute-deviation=4.05 maximum=41813.86 minimum=41799.36
  cpu_cycles_per_op: mean=26599.22 standard-deviation=94.84 median=26554.54 median-absolute-deviation=50.51 maximum=26761.06 minimum=26527.05
```

Closes scylladb/scylladb#19469

* github.com:scylladb/scylladb:
  replica: remove rwlock for protecting iteration over storage group map
  replica: get rid of fragile compaction group intrusive list
This commit is contained in:
Botond Dénes
2024-07-12 15:45:36 +03:00
14 changed files with 407 additions and 155 deletions

View File

@@ -366,6 +366,14 @@ ratio_holder filter_recent_false_positive_as_ratio_holder(const sstables::shared
return ratio_holder(f + sst->filter_get_recent_true_positive(), f);
}
uint64_t accumulate_on_active_memtables(replica::table& t, noncopyable_function<uint64_t(replica::memtable& mt)> action) {
uint64_t ret = 0;
t.for_each_active_memtable([&] (replica::memtable& mt) {
ret += action(mt);
});
return ret;
}
void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace>& sys_ks) {
cf::get_column_family_name.set(r, [&ctx] (const_req req){
std::vector<sstring> res;
@@ -401,13 +409,13 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, req->get_path_param("name"), uint64_t{0}, [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed(std::mem_fn(&replica::memtable::partition_count)), uint64_t(0));
return accumulate_on_active_memtables(cf, std::mem_fn(&replica::memtable::partition_count));
}, std::plus<>());
});
cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, uint64_t{0}, [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed(std::mem_fn(&replica::memtable::partition_count)), uint64_t(0));
return accumulate_on_active_memtables(cf, std::mem_fn(&replica::memtable::partition_count));
}, std::plus<>());
});
@@ -421,33 +429,33 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
cf::get_memtable_off_heap_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, req->get_path_param("name"), int64_t(0), [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed([] (replica::memtable* active_memtable) {
return active_memtable->region().occupancy().total_space();
}), uint64_t(0));
return accumulate_on_active_memtables(cf, [] (replica::memtable& active_memtable) {
return active_memtable.region().occupancy().total_space();
});
}, std::plus<int64_t>());
});
cf::get_all_memtable_off_heap_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, int64_t(0), [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed([] (replica::memtable* active_memtable) {
return active_memtable->region().occupancy().total_space();
}), uint64_t(0));
return accumulate_on_active_memtables(cf, [] (replica::memtable& active_memtable) {
return active_memtable.region().occupancy().total_space();
});
}, std::plus<int64_t>());
});
cf::get_memtable_live_data_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, req->get_path_param("name"), int64_t(0), [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed([] (replica::memtable* active_memtable) {
return active_memtable->region().occupancy().used_space();
}), uint64_t(0));
return accumulate_on_active_memtables(cf, [] (replica::memtable& active_memtable) {
return active_memtable.region().occupancy().used_space();
});
}, std::plus<int64_t>());
});
cf::get_all_memtable_live_data_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, int64_t(0), [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed([] (replica::memtable* active_memtable) {
return active_memtable->region().occupancy().used_space();
}), uint64_t(0));
return accumulate_on_active_memtables(cf, [] (replica::memtable& active_memtable) {
return active_memtable.region().occupancy().used_space();
});
}, std::plus<int64_t>());
});
@@ -485,9 +493,9 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
cf::get_all_cf_all_memtables_live_data_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
warn(unimplemented::cause::INDEXES);
return map_reduce_cf(ctx, int64_t(0), [](replica::column_family& cf) {
return boost::accumulate(cf.active_memtables() | boost::adaptors::transformed([] (replica::memtable* active_memtable) {
return active_memtable->region().occupancy().used_space();
}), uint64_t(0));
return accumulate_on_active_memtables(cf, [] (replica::memtable& active_memtable) {
return active_memtable.region().occupancy().used_space();
});
}, std::plus<int64_t>());
});

View File

@@ -551,6 +551,14 @@ protected:
// the exclusive lock can be freed to let regular compaction run in parallel to major
lock_holder.return_all();
co_await utils::get_local_injector().inject("major_compaction_wait", [this] (auto& handler) -> future<> {
cmlog.info("major_compaction_wait: waiting");
while (!handler.poll_for_message() && !_compaction_data.is_stop_requested()) {
co_await sleep(std::chrono::milliseconds(5));
}
cmlog.info("major_compaction_wait: released");
});
co_await compact_sstables_and_update_history(std::move(descriptor), _compaction_data, on_replace);
finish_compaction();

View File

@@ -551,10 +551,10 @@ public:
return map_reduce_tables<stats>([] (replica::table& t) {
logalloc::occupancy_stats s;
uint64_t partition_count = 0;
for (replica::memtable* active_memtable : t.active_memtables()) {
s += active_memtable->region().occupancy();
partition_count += active_memtable->partition_count();
}
t.for_each_active_memtable([&] (replica::memtable& active_memtable) {
s += active_memtable.region().occupancy();
partition_count += active_memtable.partition_count();
});
return stats{s.total_space(), s.free_space(), partition_count};
}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{

View File

@@ -61,8 +61,6 @@ class compaction_group {
seastar::condition_variable _staging_done_condition;
// Gates async operations confined to a single group.
seastar::gate _async_gate;
using list_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
list_hook_t _list_hook;
private:
// Adds new sstable to the set of sstables
// Doesn't update the cache. The cache must be synchronized in order for reads to see
@@ -82,10 +80,6 @@ private:
// it to be moved from its original sstable set (e.g. maintenance) into a new one (e.g. main).
future<> delete_unused_sstables(sstables::compaction_completion_desc desc);
public:
using list_t = boost::intrusive::list<compaction_group,
boost::intrusive::member_hook<compaction_group, compaction_group::list_hook_t, &compaction_group::_list_hook>,
boost::intrusive::constant_time_size<false>>;
compaction_group(table& t, size_t gid, dht::token_range token_range);
~compaction_group();
@@ -175,13 +169,14 @@ public:
}
compaction_manager& get_compaction_manager() noexcept;
const compaction_manager& get_compaction_manager() const noexcept;
friend class storage_group;
};
using compaction_group_ptr = std::unique_ptr<compaction_group>;
using compaction_group_ptr = lw_shared_ptr<compaction_group>;
using const_compaction_group_ptr = lw_shared_ptr<const compaction_group>;
using compaction_group_vector = utils::chunked_vector<compaction_group_ptr>;
using compaction_group_list = compaction_group::list_t;
// Storage group is responsible for storage that belongs to a single tablet.
// A storage group can manage 1 or more compaction groups, each of which can be compacted independently.
@@ -191,74 +186,119 @@ using compaction_group_list = compaction_group::list_t;
class storage_group {
compaction_group_ptr _main_cg;
std::vector<compaction_group_ptr> _split_ready_groups;
seastar::gate _async_gate;
private:
bool splitting_mode() const {
return !_split_ready_groups.empty();
}
size_t to_idx(locator::tablet_range_side) const;
public:
storage_group(compaction_group_ptr cg, compaction_group_list* list);
storage_group(compaction_group_ptr cg);
seastar::gate& async_gate() {
return _async_gate;
}
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const noexcept;
compaction_group_ptr& main_compaction_group() noexcept;
std::vector<compaction_group_ptr> split_ready_compaction_groups() &&;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept;
uint64_t live_disk_space_used() const noexcept;
utils::small_vector<compaction_group*, 3> compaction_groups() noexcept;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept;
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
// Puts the storage group in split mode, in which it internally segregates data
// into two sstable sets and two memtable sets corresponding to the two adjacent
// tablets post-split.
// Preexisting sstables and memtables are not split yet.
// Returns true if post-conditions for split() are met.
bool set_split_mode(compaction_group_list&);
bool set_split_mode();
// Like set_split_mode() but triggers splitting for old sstables and memtables and waits
// for it:
// 1) Flushes all memtables which were created in non-split mode, and waits for that to complete.
// 2) Compacts all sstables which overlap with the split point
// Returns a future which resolves when this process is complete.
future<> split(compaction_group_list&, sstables::compaction_type_options::split opt);
future<> split(sstables::compaction_type_options::split opt);
// Make an sstable set spanning all sstables in the storage_group
lw_shared_ptr<const sstables::sstable_set> make_sstable_set() const;
// Flush all memtables.
future<> flush() noexcept;
bool can_flush() const;
api::timestamp_type min_memtable_timestamp() const;
bool compaction_disabled() const;
// Returns true when all compacted sstables were already deleted.
bool no_compacted_sstable_undeleted() const;
future<> stop() noexcept;
};
using storage_group_map = absl::flat_hash_map<size_t, std::unique_ptr<storage_group>, absl::Hash<size_t>>;
using storage_group_ptr = lw_shared_ptr<storage_group>;
using storage_group_map = absl::flat_hash_map<size_t, storage_group_ptr, absl::Hash<size_t>>;
class storage_group_manager {
protected:
// The compaction group list is only a helper for accessing the groups managed by the storage groups.
// The list entries are unlinked automatically when the storage group, they belong to, is removed.
compaction_group_list _compaction_groups;
storage_group_map _storage_groups;
// Prevents _storage_groups from having its elements inserted or deleted while other layer iterates
// over them (or over _compaction_groups).
seastar::rwlock _lock;
public:
virtual ~storage_group_manager();
seastar::rwlock& get_rwlock() noexcept {
return _lock;
}
// How concurrent loop and updates on the group map works without a lock:
//
// Firstly, all yielding loops will work on a copy of map, to prevent a
// concurrent update to the map from interfering with it.
//
// scenario 1:
// T
// 1 loop on the map
// 2 storage group X is stopped on cleanup
// 3 loop reaches X
//
// Here, X is stopped before it is reached. This is handled by teaching
// iteration to skip groups that were stopped by cleanup (implemented
// using gate).
// X survives its removal from the map since it is a lw_shared_ptr.
//
//
// scenario 2:
// T
// 1 loop on the map
// 2 loop reaches X
// 3 storage group X is stopped on cleanup
//
// Here, X is stopped while being used, but that also happens during shutdown.
// When X is stopped, flush happens and compactions are all stopped (exception
// is not propagated upwards) and new ones cannot start afterward.
//
//
// scenario 3:
// T
// 1 loop on the map
// 2 storage groups are split
// 3 loop reaches old groups
//
// Here, the loop continues post storage group split, which rebuilds the old
// map into a new one. This is handled by allowing the old map to still access
// the compaction groups that were reassigned according to the new tablet count.
// We don't move the compaction groups, but rather they're still visible by old
// and new storage groups.
const compaction_group_list& compaction_groups() const noexcept {
return _compaction_groups;
}
compaction_group_list& compaction_groups() noexcept {
return _compaction_groups;
}
future<> for_each_storage_group_gently(std::function<future<>(size_t, storage_group&)> f);
// Important to not return storage_group_id in yielding variants, since ids can be
// invalidated when storage group count changes (e.g. split or merge).
future<> parallel_foreach_storage_group(std::function<future<>(storage_group&)> f);
future<> for_each_storage_group_gently(std::function<future<>(storage_group&)> f);
void for_each_storage_group(std::function<void(size_t, storage_group&)> f) const;
const storage_group_map& storage_groups() const;
future<> stop_storage_groups() noexcept;
void remove_storage_group(size_t id);
storage_group& storage_group_for_id(const schema_ptr&, size_t i) const;

View File

@@ -603,10 +603,12 @@ private:
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept;
// Select a compaction group from a given sstable based on its token range.
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept;
// Returns a list of all compaction groups.
compaction_group_list& compaction_groups() const noexcept;
// Safely iterate through compaction groups, while performing async operations on them.
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
void for_each_compaction_group(std::function<void(compaction_group&)> action);
void for_each_const_compaction_group(std::function<void(const compaction_group&)> action) const;
// Unsafe reference to all storage groups. Don't use it across preemption points.
const storage_group_map& storage_groups() const;
// Safely iterate through SSTables, with deletion guard taken to make sure they're not
// removed during iteration.
@@ -795,9 +797,8 @@ public:
// FIXME: in case a query is satisfied from a single memtable, avoid a copy
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
using const_row_ptr = std::unique_ptr<const row>;
// Return all active memtables, where there will be one per compaction group
// TODO: expose stats, whatever, instead of exposing active memtables themselves.
std::vector<memtable*> active_memtables();
// Allow an action to be performed on each active memtable, each of which belongs to a different compaction group.
void for_each_active_memtable(noncopyable_function<void(memtable&)> action);
api::timestamp_type min_memtable_timestamp() const;
const row_cache& get_row_cache() const {
return _cache;

View File

@@ -192,9 +192,9 @@ table::add_memtables_to_reader_list(std::vector<mutation_reader>& readers,
const dht::ring_position& pos = range.start()->value();
auto& sg = storage_group_for_token(pos.token());
reserve_fn(sg.memtable_count());
for (auto& cg : sg.compaction_groups()) {
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
add_memtables_from_cg(*cg);
}
});
return;
}
auto token_range = range.transform(std::mem_fn(&dht::ring_position::token));
@@ -372,10 +372,10 @@ future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db
return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout);
}
std::vector<memtable*> table::active_memtables() {
return boost::copy_range<std::vector<memtable*>>(compaction_groups() | boost::adaptors::transformed([] (compaction_group& cg) {
return &cg.memtables()->active_memtable();
}));
void table::for_each_active_memtable(noncopyable_function<void(memtable&)> action) {
for_each_compaction_group([&] (compaction_group& cg) {
action(cg.memtables()->active_memtable());
});
}
api::timestamp_type compaction_group::min_memtable_timestamp() const {
@@ -399,10 +399,15 @@ bool compaction_group::memtable_has_key(const dht::decorated_key& key) const {
std::bind(&memtable::contains_partition, std::placeholders::_1, std::ref(key)));
}
api::timestamp_type table::min_memtable_timestamp() const {
api::timestamp_type storage_group::min_memtable_timestamp() const {
return *boost::range::min_element(compaction_groups() | boost::adaptors::transformed(std::mem_fn(&compaction_group::min_memtable_timestamp)));
}
api::timestamp_type table::min_memtable_timestamp() const {
return *boost::range::min_element(storage_groups() | boost::adaptors::map_values
| boost::adaptors::transformed(std::mem_fn(&storage_group::min_memtable_timestamp)));
}
// Not performance critical. Currently used for testing only.
future<bool>
table::for_all_partitions_slow(schema_ptr s, reader_permit permit, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
@@ -555,10 +560,28 @@ void table::enable_off_strategy_trigger() {
storage_group_manager::~storage_group_manager() = default;
future<> storage_group_manager::for_each_storage_group_gently(std::function<future<>(size_t, storage_group&)> f) {
rwlock::holder shared_lock = co_await get_rwlock().hold_read_lock();
for (auto& [id, sg]: _storage_groups) {
co_await f(id, *sg);
// exception-less attempt to hold gate.
// TODO: move it to seastar.
static std::optional<gate::holder> try_hold_gate(gate& g) noexcept {
return g.is_closed() ? std::nullopt : std::make_optional(g.hold());
}
future<> storage_group_manager::parallel_foreach_storage_group(std::function<future<>(storage_group&)> f) {
co_await coroutine::parallel_for_each(_storage_groups | boost::adaptors::map_values, [&] (const storage_group_ptr sg) -> future<> {
// Table-wide ops, like 'nodetool compact', are inherently racy with migrations, so it's okay to skip
// storage of tablets being migrated away.
if (auto holder = try_hold_gate(sg->async_gate())) {
co_await f(*sg.get());
}
});
}
future<> storage_group_manager::for_each_storage_group_gently(std::function<future<>(storage_group&)> f) {
auto storage_groups = boost::copy_range<std::vector<storage_group_ptr>>(_storage_groups | boost::adaptors::map_values);
for (auto& sg: storage_groups) {
if (auto holder = try_hold_gate(sg->async_gate())) {
co_await f(*sg.get());
}
}
}
@@ -568,6 +591,14 @@ void storage_group_manager::for_each_storage_group(std::function<void(size_t, st
}
}
const storage_group_map& storage_group_manager::storage_groups() const {
return _storage_groups;
}
future<> storage_group_manager::stop_storage_groups() noexcept {
return parallel_for_each(_storage_groups | boost::adaptors::map_values, std::mem_fn(&storage_group::stop));
}
void storage_group_manager::remove_storage_group(size_t id) {
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
_storage_groups.erase(it);
@@ -587,17 +618,19 @@ storage_group& storage_group_manager::storage_group_for_id(const schema_ptr& s,
class single_storage_group_manager final : public storage_group_manager {
replica::table& _t;
storage_group* _single_sg;
compaction_group* _single_cg;
compaction_group& get_compaction_group() const noexcept {
return const_cast<compaction_group&>(*_compaction_groups.begin());
return *_single_cg;
}
public:
single_storage_group_manager(replica::table& t)
: _t(t)
{
storage_group_map r;
auto cg = std::make_unique<compaction_group>(_t, size_t(0), dht::token_range::make_open_ended_both_sides());
auto sg = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
auto cg = make_lw_shared<compaction_group>(_t, size_t(0), dht::token_range::make_open_ended_both_sides());
_single_cg = cg.get();
auto sg = make_lw_shared<storage_group>(std::move(cg));
_single_sg = sg.get();
r[0] = std::move(sg);
_storage_groups = std::move(r);
@@ -637,7 +670,7 @@ public:
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
return _compaction_groups.begin()->make_sstable_set();
return get_compaction_group().make_sstable_set();
}
};
@@ -710,8 +743,8 @@ public:
if (tmap.has_replica(tid, local_replica)) {
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
ret[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range));
ret[tid.value()] = make_lw_shared<storage_group>(std::move(cg));
}
}
_storage_groups = std::move(ret);
@@ -746,24 +779,20 @@ bool table::uses_tablets() const {
return _erm && _erm->get_replication_strategy().uses_tablets();
}
storage_group::storage_group(compaction_group_ptr cg, compaction_group_list* list)
storage_group::storage_group(compaction_group_ptr cg)
: _main_cg(std::move(cg)) {
// FIXME: get rid of compaction group list.
if (list) {
list->push_back(*_main_cg);
}
}
const dht::token_range& storage_group::token_range() const noexcept {
return _main_cg->token_range();
}
compaction_group_ptr& storage_group::main_compaction_group() noexcept {
const compaction_group_ptr& storage_group::main_compaction_group() const noexcept {
return _main_cg;
}
std::vector<compaction_group_ptr> storage_group::split_ready_compaction_groups() && {
return std::exchange(_split_ready_groups, {});
const std::vector<compaction_group_ptr>& storage_group::split_ready_compaction_groups() const {
return _split_ready_groups;
}
size_t storage_group::to_idx(locator::tablet_range_side side) const {
@@ -777,21 +806,34 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
return _main_cg;
}
utils::small_vector<compaction_group*, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group*, 3> cgs = {_main_cg.get()};
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
action(_main_cg);
for (auto& cg : _split_ready_groups) {
cgs.push_back(cg.get());
action(cg);
}
}
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
});
return cgs;
}
bool storage_group::set_split_mode(compaction_group_list& list) {
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const noexcept {
utils::small_vector<const_compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
});
return cgs;
}
bool storage_group::set_split_mode() {
if (!splitting_mode()) {
auto create_cg = [this, &list] () -> compaction_group_ptr {
auto create_cg = [this] () -> compaction_group_ptr {
// TODO: use the actual sub-ranges instead, to help incremental selection on the read path.
auto cg = std::make_unique<compaction_group>(_main_cg->_t, _main_cg->group_id(), _main_cg->token_range());
list.push_back(*cg);
return cg;
return make_lw_shared<compaction_group>(_main_cg->_t, _main_cg->group_id(), _main_cg->token_range());
};
std::vector<compaction_group_ptr> split_ready_groups(2);
split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg();
@@ -803,8 +845,8 @@ bool storage_group::set_split_mode(compaction_group_list& list) {
return _main_cg->empty();
}
future<> storage_group::split(compaction_group_list& list, sstables::compaction_type_options::split opt) {
if (set_split_mode(list)) {
future<> storage_group::split(sstables::compaction_type_options::split opt) {
if (set_split_mode()) {
co_return;
}
@@ -833,7 +875,7 @@ bool tablet_storage_group_manager::all_storage_groups_split() {
}
auto split_ready = std::ranges::all_of(_storage_groups | boost::adaptors::map_values,
std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(compaction_groups())));
std::mem_fn(&storage_group::set_split_mode));
// The table replica will say to coordinator that its split status is ready by
// mirroring the sequence number from tablet metadata into its local state,
@@ -861,8 +903,8 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com
future<> tablet_storage_group_manager::split_all_storage_groups() {
sstables::compaction_type_options::split opt = split_compaction_options();
co_await for_each_storage_group_gently([this, opt] (size_t i, storage_group& storage_group) {
return storage_group.split(compaction_groups(), opt);
co_await for_each_storage_group_gently([opt] (storage_group& storage_group) {
return storage_group.split(opt);
});
}
@@ -882,7 +924,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
idx, schema()->ks_name(), schema()->cf_name()));
}
return sg->split(compaction_groups(), split_compaction_options());
return sg->split(split_compaction_options());
}
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
@@ -937,7 +979,7 @@ utils::chunked_vector<compaction_group*> tablet_storage_group_manager::compactio
auto& sg = it->second;
for (auto& cg : sg->compaction_groups()) {
if (cg && tr.overlaps(cg->token_range(), cmp)) {
ret.push_back(cg);
ret.push_back(cg.get());
}
}
}
@@ -979,17 +1021,44 @@ compaction_group& table::compaction_group_for_sstable(const sstables::shared_sst
return _sg_manager->compaction_group_for_sstable(sst);
}
compaction_group_list& table::compaction_groups() const noexcept {
return _sg_manager->compaction_groups();
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> {
co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> {
tlogger.info("foreach_compaction_group_wait: waiting");
while (!handler.poll_for_message() && !_async_gate.is_closed() && !sg.async_gate().is_closed()) {
co_await sleep(std::chrono::milliseconds(5));
}
tlogger.info("foreach_compaction_group_wait: released");
});
co_await coroutine::parallel_for_each(sg.compaction_groups(), [&] (compaction_group_ptr cg) -> future<> {
if (auto holder = try_hold_gate(cg->async_gate())) {
co_await action(*cg);
}
});
});
}
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock();
co_await coroutine::parallel_for_each(compaction_groups(), [&] (compaction_group& cg) {
return action(cg);
void table::for_each_compaction_group(std::function<void(compaction_group&)> action) {
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
action(*cg);
});
});
}
void table::for_each_const_compaction_group(std::function<void(const compaction_group&)> action) const {
_sg_manager->for_each_storage_group([&] (size_t, storage_group& sg) {
sg.for_each_compaction_group([&] (const compaction_group_ptr& cg) {
action(*cg);
});
});
}
const storage_group_map& table::storage_groups() const {
return _sg_manager->storage_groups();
}
future<> table::safe_foreach_sstable(const sstables::sstable_set& set, noncopyable_function<future<>(const sstables::shared_sstable&)> action) {
auto deletion_guard = co_await get_units(_sstable_deletion_sem, 1);
@@ -1027,9 +1096,9 @@ future<utils::chunked_vector<sstables::entry_descriptor>>
table::clone_tablet_storage(locator::tablet_id tid) {
utils::chunked_vector<sstables::entry_descriptor> ret;
auto holder = async_gate().hold();
// FIXME: guard storage group with shared lock.
auto& sg = storage_group_for_id(tid.value());
auto sg_holder = sg.async_gate().hold();
co_await sg.flush();
auto set = sg.make_sstable_set();
co_await safe_foreach_sstable(*set, [&] (const sstables::shared_sstable& sst) -> future<> {
@@ -1423,13 +1492,13 @@ table::stop() {
// while they may still hold the table _async_gate
auto gate_closed_fut = _async_gate.close();
co_await await_pending_ops();
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::stop));
co_await _sg_manager->stop_storage_groups();
co_await _sstable_deletion_gate.close();
co_await std::move(gate_closed_fut);
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
for (compaction_group& cg : compaction_groups()) {
for_each_compaction_group([] (compaction_group& cg) {
cg.clear_sstables();
}
});
_sstables = make_compound_sstable_set();
}));
_cache.refresh_snapshot();
@@ -1561,11 +1630,11 @@ void table::rebuild_statistics() {
_stats.live_sstable_count = 0;
_stats.total_disk_space_used = 0;
for (const compaction_group& cg : compaction_groups()) {
for_each_const_compaction_group([this] (const compaction_group& cg) {
_stats.live_disk_space_used += cg.live_disk_space_used();
_stats.total_disk_space_used += cg.total_disk_space_used();
_stats.live_sstable_count += cg.live_sstable_count();
}
});
}
void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept {
@@ -1781,9 +1850,9 @@ void table::start_compaction() {
}
void table::trigger_compaction() {
for (compaction_group& cg : compaction_groups()) {
for_each_compaction_group([] (compaction_group& cg) {
cg.trigger_compaction();
}
});
}
void table::try_trigger_compaction(compaction_group& cg) noexcept {
@@ -1840,9 +1909,11 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
}
unsigned table::estimate_pending_compactions() const {
return boost::accumulate(compaction_groups() | boost::adaptors::transformed([this] (const compaction_group& cg) {
return _compaction_strategy.estimated_pending_compactions(cg.as_table_state());
}), unsigned(0));
unsigned ret = 0;
for_each_const_compaction_group([this, &ret] (const compaction_group& cg) {
ret += _compaction_strategy.estimated_pending_compactions(cg.as_table_state());
});
return ret;
}
void compaction_group::set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept {
@@ -1889,11 +1960,11 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
};
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
for (compaction_group& cg : compaction_groups()) {
for_each_compaction_group([&] (compaction_group& cg) {
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
updater.prepare(new_cs);
cg_sstable_set_updaters.push_back(std::move(updater));
}
});
// now exception safe:
_compaction_strategy = std::move(new_cs);
for (auto& updater : cg_sstable_set_updaters) {
@@ -1955,6 +2026,12 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
return _sstables->select(range);
}
bool storage_group::no_compacted_sstable_undeleted() const {
return std::ranges::all_of(compaction_groups(), [] (const_compaction_group_ptr& cg) {
return cg->compacted_undeleted_sstables().empty();
});
}
// Gets the list of all sstables in the column family, including ones that are
// not used for active queries because they have already been compacted, but are
// waiting for delete_atomically() to return.
@@ -1963,18 +2040,17 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
// garbage-collect a tombstone that covers data in an sstable that may not be
// successfully deleted.
lw_shared_ptr<const sstable_list> table::get_sstables_including_compacted_undeleted() const {
bool no_compacted_undeleted_sstable = std::ranges::all_of(compaction_groups(), [] (const compaction_group& cg) {
return cg.compacted_undeleted_sstables().empty();
});
bool no_compacted_undeleted_sstable = std::ranges::all_of(storage_groups() | boost::adaptors::map_values,
std::mem_fn(&storage_group::no_compacted_sstable_undeleted));
if (no_compacted_undeleted_sstable) {
return get_sstables();
}
auto ret = make_lw_shared<sstable_list>(*_sstables->all());
for (const compaction_group& cg : compaction_groups()) {
for_each_const_compaction_group([&ret] (const compaction_group& cg) {
for (auto&& s: cg.compacted_undeleted_sstables()) {
ret->insert(s);
}
}
});
return ret;
}
@@ -2106,7 +2182,7 @@ future<> compaction_group::stop() noexcept {
if (_async_gate.is_closed()) {
co_return;
}
co_await _async_gate.close();
auto closed_gate_fut = _async_gate.close();
auto flush_future = co_await seastar::coroutine::as_future(flush());
co_await _t._compaction_manager.remove(as_table_state());
@@ -2114,6 +2190,7 @@ future<> compaction_group::stop() noexcept {
if (flush_future.failed()) {
co_await seastar::coroutine::return_exception_ptr(flush_future.get_exception());
}
co_await std::move(closed_gate_fut);
}
bool compaction_group::empty() const noexcept {
@@ -2206,7 +2283,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
id, table_id));
}
// Remove old main groups, they're unused, but they need to be deregistered properly
auto cg_ptr = std::move(sg->main_compaction_group());
auto cg_ptr = sg->main_compaction_group();
auto f = cg_ptr->stop();
if (!f.available() || f.failed()) [[unlikely]] {
stop_fut = stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable {
@@ -2216,14 +2293,14 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
});
}
unsigned first_new_id = id << growth_factor;
auto split_ready_groups = std::move(*sg).split_ready_compaction_groups();
auto split_ready_groups = sg->split_ready_compaction_groups();
if (split_ready_groups.size() != split_size) {
on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size));
}
for (unsigned i = 0; i < split_size; i++) {
auto group_id = first_new_id + i;
split_ready_groups[i]->update_id_and_range(group_id, new_tmap.get_token_range(locator::tablet_id(group_id)));
new_storage_groups[group_id] = std::make_unique<storage_group>(std::move(split_ready_groups[i]), nullptr);
new_storage_groups[group_id] = make_lw_shared<storage_group>(std::move(split_ready_groups[i]));
}
tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].",
@@ -2262,8 +2339,8 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
auto transition_info = transition.second;
if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) {
auto range = new_tablet_map->get_token_range(tid);
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
_storage_groups[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range));
_storage_groups[tid.value()] = make_lw_shared<storage_group>(std::move(cg));
tablet_migrating_in = true;
}
}
@@ -2279,10 +2356,6 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
}
future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
// Exclusive lock is meant to protect storage groups, but we hold it here to prevent preemption
// between erm and storage groups updates as they need to be consistent.
rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock();
auto old_erm = std::exchange(_erm, std::move(erm));
auto refresh_mutation_source = [this] {
@@ -2374,11 +2447,11 @@ table::~table() {
logalloc::occupancy_stats table::occupancy() const {
logalloc::occupancy_stats res;
for (compaction_group& cg : compaction_groups()) {
for (auto& m : *cg.memtables()) {
for_each_const_compaction_group([&] (const compaction_group& cg) {
for (auto& m : *const_cast<compaction_group&>(cg).memtables()) {
res += m->region().occupancy();
}
}
});
return res;
}
@@ -2630,10 +2703,14 @@ future<> table::flush(std::optional<db::replay_position> pos) {
_flush_rp = std::max(_flush_rp, fp);
}
bool table::can_flush() const {
bool storage_group::can_flush() const {
return std::ranges::any_of(compaction_groups(), std::mem_fn(&compaction_group::can_flush));
}
bool table::can_flush() const {
return std::ranges::any_of(storage_groups() | boost::adaptors::map_values, std::mem_fn(&storage_group::can_flush));
}
future<> compaction_group::clear_memtables() {
if (_t.commitlog()) {
for (auto& t : *_memtables) {
@@ -2654,6 +2731,11 @@ future<> table::clear() {
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
}
bool storage_group::compaction_disabled() const {
return std::ranges::all_of(compaction_groups(), [] (const_compaction_group_ptr& cg) {
return cg->get_compaction_manager().compaction_disabled(cg->as_table_state()); });
}
// NOTE: does not need to be futurized, but might eventually, depending on
// if we implement notifications, whatnot.
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
@@ -2663,8 +2745,9 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
// materialized view was created right after truncation started, and it
// would not have compaction disabled when this function is called on it.
if (!schema()->is_view()) {
if (!std::ranges::all_of(compaction_groups(), [this] (const compaction_group& cg) {
return _compaction_manager.compaction_disabled(cg.as_table_state()); })) {
auto compaction_disabled = std::ranges::all_of(storage_groups() | boost::adaptors::map_values,
std::mem_fn(&storage_group::compaction_disabled));
if (!compaction_disabled) {
utils::on_internal_error(fmt::format("compaction not disabled on table {}.{} during TRUNCATE",
schema()->ks_name(), schema()->cf_name()));
}
@@ -2680,7 +2763,7 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
co_await _cache.invalidate(row_cache::external_updater([this, &rp, &remove, truncated_at] {
// FIXME: the following isn't exception safe.
for (compaction_group& cg : compaction_groups()) {
for_each_compaction_group([&] (compaction_group& cg) {
auto gc_trunc = to_gc_clock(truncated_at);
auto pruned = make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema));
@@ -2705,7 +2788,7 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
cg.set_main_sstables(std::move(pruned));
cg.set_maintenance_sstables(std::move(maintenance_pruned));
}
});
refresh_compound_sstable_set();
tlogger.debug("cleaning out row cache");
}));
@@ -2747,11 +2830,11 @@ void table::set_schema(schema_ptr s) {
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",
_schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version());
for (compaction_group& cg : compaction_groups()) {
for_each_compaction_group([&] (compaction_group& cg) {
for (auto& m: *cg.memtables()) {
m->set_schema(s);
}
}
});
_cache.set_schema(s);
if (_counter_cell_locks) {
@@ -3426,6 +3509,10 @@ compaction_manager& compaction_group::get_compaction_manager() noexcept {
return _t.get_compaction_manager();
}
const compaction_manager& compaction_group::get_compaction_manager() const noexcept {
return _t.get_compaction_manager();
}
compaction::table_state& compaction_group::as_table_state() const noexcept {
return *_table_state;
}
@@ -3531,11 +3618,24 @@ future<> table::clear_inactive_reads_for_tablet(database& db, storage_group& sg)
}
}
future<> table::stop_compaction_groups(storage_group& sg) {
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
for (auto& cg_ptr : sg.compaction_groups()) {
co_await cg_ptr->stop();
future<> storage_group::stop() noexcept {
if (_async_gate.is_closed()) {
co_return;
}
// Carefully waits for close of gate after stopping compaction groups, since we don't want
// to wait on an ongoing compaction, *but* start it earlier to prevent iterations from
// picking this group that is being stopped.
auto closed_gate_fut = _async_gate.close();
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
co_await coroutine::parallel_for_each(compaction_groups(), [] (const compaction_group_ptr& cg_ptr) {
return cg_ptr->stop();
});
co_await std::move(closed_gate_fut);
}
future<> table::stop_compaction_groups(storage_group& sg) {
return sg.stop();
}
future<> table::flush_compaction_groups(storage_group& sg) {
@@ -3567,7 +3667,6 @@ future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys
future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
auto holder = async_gate().hold();
rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock();
auto& sg = storage_group_for_id(tid.value());
co_await clear_inactive_reads_for_tablet(db, sg);
@@ -3579,8 +3678,6 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {
auto holder = async_gate().hold();
// Hold shared lock to keep storage group alive.
rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock();
auto& sg = storage_group_for_id(tid.value());
co_await clear_inactive_reads_for_tablet(db, sg);

View File

@@ -618,7 +618,10 @@ class absl_container:
break
if ctrl_t >= 0:
# NOTE: this only works for flat_hash_map
yield slots[i]['key'], slots[i]['value']
if slots[i]['value'].type.name.find("::map_slot_type") != -1:
yield slots[i]['key'], slots[i]['value']['second']
else:
yield slots[i]['key'], slots[i]['value']
def __nonzero__(self):
return self.size > 0
@@ -4504,6 +4507,17 @@ class scylla_memtables(gdb.Command):
def invoke(self, arg, from_tty):
for table in for_each_table():
gdb.write('table %s:\n' % schema_ptr(table['_schema']).table_name())
try:
sg_manager = std_unique_ptr(table["_sg_manager"]).get().dereference()
for (sg_id, sg_ptr) in absl_container(sg_manager["_storage_groups"]):
sg = seastar_lw_shared_ptr(sg_ptr).get()
scylla_memtables.dump_compaction_group_memtables(seastar_lw_shared_ptr(sg["_main_cg"]).get())
for cg_ptr in std_vector(sg["_split_ready_groups"]):
scylla_memtables.dump_compaction_group_memtables(seastar_lw_shared_ptr(cg_ptr).get())
return
except gdb.error:
pass
try:
sg_manager = std_unique_ptr(table["_sg_manager"]).get().dereference()
for cg in intrusive_list(sg_manager["_compaction_groups"], link='_list_hook'):

View File

@@ -926,7 +926,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
auto& cl = *table.commitlog();
auto s = table.schema();
auto& sharder = table.get_effective_replication_map()->get_sharder(*table.schema());
auto memtables = table.active_memtables();
auto memtables = active_memtables(table);
auto add_entry = [&cl, s, &sharder] (const partition_key& key) mutable {
auto md = tests::data_model::mutation_description(key.explode());

View File

@@ -1053,7 +1053,7 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
replica::table& t = db.find_column_family("ks", "cf");
auto memtables = t.active_memtables();
auto memtables = active_memtables(t);
// Insert something so that we have data in memtable to flush
// it has to be somewhat large, as automatic flushing picks the

View File

@@ -48,6 +48,7 @@
#include "test/lib/mutation_assertions.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/log.hh"
#include "types/map.hh"
@@ -598,7 +599,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
assert_that_scanner3.produces(mutations[i]);
}
auto ms = cf.active_memtables(); // held by scanners
auto ms = active_memtables(cf); // held by scanners
auto flushed = cf.flush();

View File

@@ -41,6 +41,14 @@ lw_shared_ptr<replica::memtable> make_memtable(schema_ptr s, const std::vector<m
return mt;
}
std::vector<replica::memtable*> active_memtables(replica::table& t) {
std::vector<replica::memtable*> active_memtables;
t.for_each_active_memtable([&] (replica::memtable& mt) {
active_memtables.push_back(&mt);
});
return active_memtables;
}
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, lw_shared_ptr<replica::memtable> mt) {
return make_sstable_containing(sst_factory(), std::move(mt));
}

View File

@@ -274,3 +274,4 @@ inline shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::me
}
lw_shared_ptr<replica::memtable> make_memtable(schema_ptr s, const std::vector<mutation>& muts);
std::vector<replica::memtable*> active_memtables(replica::table& t);

View File

@@ -1744,6 +1744,14 @@ replica::table& find_table(replica::database& db, dataset& ds) {
return db.find_column_family("ks", ds.table_name());
}
static std::vector<replica::memtable*> active_memtables(replica::table& t) {
std::vector<replica::memtable*> active_memtables;
t.for_each_active_memtable([&] (replica::memtable& mt) {
active_memtables.push_back(&mt);
});
return active_memtables;
}
static
void populate(const std::vector<dataset*>& datasets, cql_test_env& env, const table_config& cfg, size_t flush_threshold) {
drop_keyspace_if_exists(env, "ks");
@@ -1775,7 +1783,7 @@ void populate(const std::vector<dataset*>& datasets, cql_test_env& env, const ta
auto gen = ds.make_generator(s, cfg);
while (auto mopt = gen()) {
++fragments;
replica::memtable& active_memtable = *cf.active_memtables().front();
replica::memtable& active_memtable = *active_memtables(cf).front();
active_memtable.apply(*mopt);
if (active_memtable.region().occupancy().used_space() > flush_threshold) {
metrics_snapshot before;

View File

@@ -714,9 +714,10 @@ async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_
f"table_id = {table_id}", host=host)
return rows[0].tablet_count
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split(manager: ManagerClient):
async def test_tablet_split(manager: ManagerClient, injection_error: str):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
@@ -762,6 +763,10 @@ async def test_tablet_split(manager: ManagerClient):
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, "test"))
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
@@ -775,6 +780,67 @@ async def test_tablet_split(manager: ManagerClient):
tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test')
assert tablet_count > 1
await manager.api.message_injection(servers[0].ip_addr, injection_error)
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
await compaction_task
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, injection_error):
logger.info("Bootstrapping cluster")
cmdline = []
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, "test")
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
s1_host_id = await manager.get_host_id(servers[1].server_id)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
logger.info("Started major compaction")
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, "test"))
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
t = tablet_replicas[0]
logger.info("Migrating tablet")
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
await manager.api.message_injection(servers[0].ip_addr, injection_error)
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
await compaction_task
if injection_error == "major_compaction_wait":
logger.info("Check that major was successfully aborted on migration")
await s1_log.wait_for(f"Compaction for test/test was stopped due to: table removal", from_mark=s1_mark)
await check()
async def assert_tablet_count_metric_value_for_shards(manager: ManagerClient, server: ServerInfo, expected_count_per_shard: list[int]):
tablet_count_metric_name = "scylla_tablets_count"
metrics = await manager.metrics.query(server.ip_addr)