Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
c06760cf15 Fix multiple issues in test_out_of_space_prevention.py
- Fix variable name error: host[0] → hosts[0] on line 98
- Add missing await keywords for async operations on lines 209 and 385
- Rename class random_content_file to RandomContentFile (PascalCase)
- Fix function name typo: test_autotoogle_compaction → test_autotoggle_compaction

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-23 09:25:16 +00:00
copilot-swe-agent[bot]
c684456eba Initial plan 2025-12-23 09:21:06 +00:00
64 changed files with 269 additions and 2952 deletions

View File

@@ -1,13 +0,0 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
vp.insert(b.second);
}
}
std::vector<sstring> res;
replica::database& db = vb.local().get_db();
auto uuid = validate_table(db, ks, cf_name);
replica::column_family& cf = db.find_column_family(uuid);
co_return cf.get_index_manager().list_indexes()
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
| std::ranges::to<std::vector>();
res.reserve(cf.get_index_manager().list_indexes().size());
for (auto&& i : cf.get_index_manager().list_indexes()) {
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
res.emplace_back(i.metadata().name());
}
}
co_return res;
});
}

View File

@@ -12,7 +12,6 @@
#include <seastar/core/condition-variable.hh>
#include "schema/schema_fwd.hh"
#include "sstables/open_info.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -45,7 +44,7 @@ public:
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;

View File

@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
}
descriptor.creator = [&t] (shard_id) {
// All compaction types going through this path will work on normal input sstables only.
// Off-strategy, for example, waits until the sstables move out of staging state.
return t.make_sstable(sstables::sstable_state::normal);
return t.make_sstable();
};
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
@@ -1849,10 +1847,6 @@ protected:
throw make_compaction_stopped_exception();
}
}, false);
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
throw make_compaction_stopped_exception();
}
co_return co_await do_rewrite_sstable(std::move(sst));
}
};
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
// Throw an error if split cannot be performed due to e.g. out of space prevention.
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
// which is uneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
if (is_disabled()) {
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
"reason might be out of space prevention", sst->get_filename()))));
if (!can_proceed(&t)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
std::vector<sstables::shared_sstable> ret;
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
compaction_progress_monitor monitor;
compaction_data info = create_compaction_data();
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t, sst] (shard_id _) {
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
// For example, if base table has views, it's important that sstable produced by repair will be
// in the staging state.
return t.make_sstable(sst->state());
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
};
desc.replacer = [&] (compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));

View File

@@ -376,8 +376,7 @@ public:
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately

View File

@@ -1698,18 +1698,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
# We need to link these files to all Boost tests to make sure that
# we can execute `--list_json_content` on them. That will produce
# a similar result as calling `--list_content={HRF,DOT}`.
# Unfortunately, to be able to do that, we're forced to link the
# relevant code by hand.
for key in deps.keys():
for prefix in boost_tests_prefixes:
if key.startswith(prefix):
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
wasm_deps = {}
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'

View File

@@ -200,7 +200,6 @@ public:
static constexpr auto DICTS = "dicts";
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
static constexpr auto CLIENT_ROUTES = "client_routes";
static constexpr auto VERSIONS = "versions";
// auth
static constexpr auto ROLES = "roles";

View File

@@ -605,8 +605,8 @@ public:
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)

View File

@@ -2,11 +2,8 @@
## What is ScyllaDB?
ScyllaDB is a high-performance NoSQL database optimized for speed and scalability.
It is designed to efficiently handle large volumes of data with minimal latency,
making it ideal for data-intensive applications.
ScyllaDB is distributed under the [ScyllaDB Source Available License](https://github.com/scylladb/scylladb/blob/master/LICENSE-ScyllaDB-Source-Available.md).
ScyllaDB is a high-performance NoSQL database system, fully compatible with Apache Cassandra.
ScyllaDB is released under the GNU Affero General Public License version 3 and the Apache License, ScyllaDB is free and open-source software.
> [ScyllaDB](http://www.scylladb.com/)

View File

@@ -45,9 +45,7 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
: _tombstone(x._tombstone)
, _static_row(s, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _rows()
, _row_tombstones(x._row_tombstones)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
@@ -56,30 +54,10 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
if (use_single_row_storage(s)) {
// Copy single row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(s, *x_row);
}
} else if (!x.get_rows_storage().empty()) {
// Converting from multi-row to single-row - take the first row
// This shouldn't normally happen as schema doesn't change this way
on_internal_error(mplog, "mutation_partition: cannot convert multi-row partition to single-row");
}
} else {
// Multi-row storage
if (x.uses_single_row_storage()) {
// Converting from single-row to multi-row - this shouldn't normally happen
on_internal_error(mplog, "mutation_partition: cannot convert single-row partition to multi-row");
} else {
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
get_rows_storage().clone_from(x.get_rows_storage(), cloner, current_deleter<rows_entry>());
}
}
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
}
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
@@ -87,9 +65,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
: _tombstone(x._tombstone)
, _static_row(schema, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows(use_single_row_storage(schema) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _rows()
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(schema.version())
@@ -98,37 +74,19 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
if (use_single_row_storage(schema)) {
// Single-row storage: just copy the row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(schema, *x_row);
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
_rows.insert_before_hint(_rows.end(), std::move(ce), rows_entry::tri_compare(schema));
}
} else {
// Filtering from multi-row - shouldn't happen with consistent schema
on_internal_error(mplog, "mutation_partition: filtering from multi-row to single-row storage");
}
} else {
// Multi-row storage with filtering
if (x.uses_single_row_storage()) {
on_internal_error(mplog, "mutation_partition: filtering from single-row to multi-row storage");
} else {
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(ce), rows_entry::tri_compare(schema));
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
}
}
} catch (...) {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
throw;
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
}
}
} catch (...) {
_rows.clear_and_dispose(current_deleter<rows_entry>());
throw;
}
}
@@ -146,20 +104,14 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
if (use_single_row_storage(schema)) {
// Single-row storage: no filtering needed, row either exists or doesn't
// The move constructor has already moved the row if it exists
} else {
// Multi-row storage: filter the rows
if (!uses_single_row_storage()) {
auto deleter = current_deleter<rows_entry>();
auto it = get_rows_storage().begin();
for (auto&& range : ck_ranges.ranges()) {
get_rows_storage().erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
}
get_rows_storage().erase_and_dispose(it, get_rows_storage().end(), deleter);
{
auto deleter = current_deleter<rows_entry>();
auto it = _rows.begin();
for (auto&& range : ck_ranges.ranges()) {
_rows.erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
}
_rows.erase_and_dispose(it, _rows.end(), deleter);
}
{
for (auto&& range : ck_ranges.ranges()) {
@@ -175,11 +127,7 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
}
mutation_partition::~mutation_partition() {
if (uses_single_row_storage()) {
// Single-row storage: optional destructor handles cleanup
} else {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
}
_rows.clear_and_dispose(current_deleter<rows_entry>());
}
mutation_partition&
@@ -193,14 +141,10 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
void mutation_partition::ensure_last_dummy(const schema& s) {
check_schema(s);
if (uses_single_row_storage()) {
// Single-row storage doesn't use dummy entries
return;
}
if (get_rows_storage().empty() || !get_rows_storage().rbegin()->is_last_dummy()) {
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
_rows.insert_before(_rows.end(), std::move(e));
}
}
@@ -475,18 +419,9 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
check_schema(schema);
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
if (use_single_row_storage(schema)) {
// Single-row storage: check if the single row exists and has tombstone
const auto& row_opt = get_single_row_storage();
if (row_opt) {
t.apply(row_opt->deleted_at(), row_opt->marker());
}
} else {
// Multi-row storage: search in B-tree
auto j = get_rows_storage().find(key, rows_entry::tri_compare(schema));
if (j != get_rows_storage().end()) {
t.apply(j->row().deleted_at(), j->row().marker());
}
auto j = _rows.find(key, rows_entry::tri_compare(schema));
if (j != _rows.end()) {
t.apply(j->row().deleted_at(), j->row().marker());
}
return t;
@@ -569,178 +504,97 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
clustered_row(s, key).apply(row_marker(created_at, ttl, expiry));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
if (use_single_row_storage(s)) {
// Single-row storage: just set the row
get_single_row_storage() = std::move(row);
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
check_schema(s);
if (use_single_row_storage(s)) {
// Single-row storage: just copy the row
get_single_row_storage() = row;
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
}
const row*
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
check_schema(s);
if (use_single_row_storage(s)) {
// Single-row storage: return the single row's cells if it exists
const auto& row_opt = get_single_row_storage();
if (row_opt) {
return &row_opt->cells();
}
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
return nullptr;
} else {
// Multi-row storage: search in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
return nullptr;
}
return &i->row().cells();
}
return &i->row().cells();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
rows_entry&
mutation_partition::clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
if (use_single_row_storage(s)) {
// Single-row storage doesn't use rows_entry - this shouldn't be called
on_internal_error(mplog, "mutation_partition::clustered_rows_entry() called with single-row storage");
}
auto i = get_rows_storage().find(pos, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto i = _rows.find(pos, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return *i;
}
deletable_row&
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
if (use_single_row_storage(s)) {
// Single-row storage: ignore dummy/continuous flags, just get/create the row
check_row_key(s, pos, dummy);
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
return clustered_rows_entry(s, pos, dummy, continuous).row();
}
return clustered_rows_entry(s, pos, dummy, continuous).row();
}
deletable_row&
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
if (use_single_row_storage(s)) {
// Single-row storage: just create/get the row
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
}
const auto cmp = rows_entry::tri_compare(s);
auto i = get_rows_storage().end();
if (!get_rows_storage().empty() && (cmp(*std::prev(i), pos) >= 0)) {
auto i = _rows.end();
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
on_internal_error(mplog, format("mutation_partition::append_clustered_row(): cannot append clustering row with key {} to the partition"
", last clustering row is equal or greater: {}", pos, std::prev(i)->position()));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = get_rows_storage().insert_before_hint(i, std::move(e), cmp).first;
i = _rows.insert_before_hint(i, std::move(e), cmp).first;
return i->row();
}
@@ -748,33 +602,19 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
mutation_partition::rows_type::const_iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
if (!r.start()) {
return std::cbegin(get_rows_storage());
return std::cbegin(_rows);
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
}
mutation_partition::rows_type::const_iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
if (!r.end()) {
return std::cend(get_rows_storage());
return std::cend(_rows);
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
}
std::ranges::subrange<mutation_partition::rows_type::const_iterator>
@@ -785,32 +625,17 @@ mutation_partition::range(const schema& schema, const query::clustering_range& r
std::ranges::subrange<mutation_partition::rows_type::iterator>
mutation_partition::range(const schema& schema, const query::clustering_range& r) {
if (use_single_row_storage(schema)) {
// Single-row storage: return empty range (rows_entry iteration not applicable)
static rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end());
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->range(schema, r));
return unconst(_rows, static_cast<const mutation_partition*>(this)->range(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) {
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
return unconst(_rows, static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) {
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
return unconst(_rows, static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
}
template<typename Func>
@@ -1552,15 +1377,7 @@ bool mutation_partition::empty() const
if (_tombstone.timestamp != api::missing_timestamp) {
return false;
}
if (_static_row.size() || !_row_tombstones.empty()) {
return false;
}
if (uses_single_row_storage()) {
return !get_single_row_storage().has_value();
} else {
return get_rows_storage().empty();
}
return !_static_row.size() && _rows.empty() && _row_tombstones.empty();
}
bool
@@ -1605,11 +1422,7 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
uint64_t
mutation_partition::row_count() const {
if (uses_single_row_storage()) {
return get_single_row_storage().has_value() ? 1 : 0;
} else {
return get_rows_storage().calculate_size();
}
return _rows.calculate_size();
}
rows_entry::rows_entry(rows_entry&& o) noexcept
@@ -2406,22 +2219,15 @@ public:
mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const schema& s, tombstone t)
: _tombstone(t)
, _static_row_continuous(!s.has_static_columns())
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _rows()
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{
if (use_single_row_storage(s)) {
// Single-row storage: no dummy entries needed, leave row as empty optional
} else {
// Multi-row storage: add last dummy entry for discontinuous partition
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
_rows.insert_before(_rows.end(), std::move(e));
}
bool mutation_partition::is_fully_continuous() const {

View File

@@ -9,7 +9,6 @@
#pragma once
#include <iosfwd>
#include <variant>
#include <boost/intrusive/parent_from_member.hpp>
#include <seastar/util/optimized_optional.hh>
@@ -1189,12 +1188,6 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
}
}
// Returns true if the schema has no clustering keys, meaning partitions can have at most one row.
// When true, mutation_partition uses std::optional<deletable_row> instead of full rows_type container.
inline bool use_single_row_storage(const schema& s) {
return s.clustering_key_size() == 0;
}
// Represents a set of writes made to a single partition.
//
// The object is schema-dependent. Each instance is governed by some
@@ -1235,45 +1228,20 @@ inline bool use_single_row_storage(const schema& s) {
class mutation_partition final {
public:
using rows_type = rows_entry::container_type;
using rows_storage_type = std::variant<rows_type, std::optional<deletable_row>>;
friend class size_calculator;
private:
tombstone _tombstone;
lazy_row _static_row;
bool _static_row_continuous = true;
rows_storage_type _rows;
rows_type _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
// Note: empty when using single-row storage (std::optional<deletable_row> variant)
range_tombstone_list _row_tombstones;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class converting_mutation_partition_applier;
// Returns true if this partition uses single-row storage
bool uses_single_row_storage() const {
return std::holds_alternative<std::optional<deletable_row>>(_rows);
}
// Get reference to rows container (multi-row storage)
rows_type& get_rows_storage() {
return std::get<rows_type>(_rows);
}
const rows_type& get_rows_storage() const {
return std::get<rows_type>(_rows);
}
// Get reference to single row storage
std::optional<deletable_row>& get_single_row_storage() {
return std::get<std::optional<deletable_row>>(_rows);
}
const std::optional<deletable_row>& get_single_row_storage() const {
return std::get<std::optional<deletable_row>>(_rows);
}
public:
struct copy_comparators_only {};
struct incomplete_tag {};
@@ -1283,14 +1251,14 @@ public:
return mutation_partition(incomplete_tag(), s, t);
}
mutation_partition(const schema& s)
: _rows(use_single_row_storage(s) ? rows_storage_type(std::optional<deletable_row>{}) : rows_storage_type(rows_type{}))
: _rows()
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
: _rows(other._rows.index() == 0 ? rows_storage_type(rows_type{}) : rows_storage_type(std::optional<deletable_row>{}))
: _rows()
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
@@ -1301,8 +1269,6 @@ public:
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
~mutation_partition();
// Returns the mutation_partition containing the given rows_type.
// Can only be used when the mutation_partition uses multi-row storage.
static mutation_partition& container_of(rows_type&);
mutation_partition& operator=(mutation_partition&& x) noexcept;
bool equal(const schema&, const mutation_partition&) const;
@@ -1496,31 +1462,9 @@ public:
const lazy_row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
// For single-row storage (clustering_key_size() == 0), returns an empty container.
// Callers should check uses_single_row_storage() and use get_single_row() for single-row case.
const rows_type& clustered_rows() const noexcept {
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return empty_rows;
}
return get_rows_storage();
}
utils::immutable_collection<rows_type> clustered_rows() noexcept {
return const_cast<const mutation_partition*>(this)->clustered_rows();
}
rows_type& mutable_clustered_rows() noexcept {
// Should only be called when NOT using single-row storage
return get_rows_storage();
}
// Access the single row when using single-row storage (clustering_key_size() == 0)
const std::optional<deletable_row>& get_single_row() const {
return get_single_row_storage();
}
std::optional<deletable_row>& get_single_row() {
return get_single_row_storage();
}
const rows_type& clustered_rows() const noexcept { return _rows; }
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
rows_type& mutable_clustered_rows() noexcept { return _rows; }
const range_tombstone_list& row_tombstones() const noexcept { return _row_tombstones; }
utils::immutable_collection<range_tombstone_list> row_tombstones() noexcept { return _row_tombstones; }
@@ -1538,14 +1482,8 @@ public:
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
std::ranges::subrange<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
// Returns an iterator range of rows_entry, with only non-dummy entries.
// For single-row storage, returns an empty range.
auto non_dummy_rows() const {
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
return std::ranges::subrange(get_rows_storage().begin(), get_rows_storage().end())
return std::ranges::subrange(_rows.begin(), _rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
@@ -1579,21 +1517,7 @@ private:
inline
mutation_partition& mutation_partition::container_of(rows_type& rows) {
// This method can only be called when using multi-row storage (rows_type variant alternative).
// With std::variant, when rows_type is the active alternative (index 0), it's stored at the beginning of the variant.
// We can use pointer arithmetic to get back to the mutation_partition.
// Calculate offset from rows_type to the containing variant
// The rows reference should be the active rows_type inside the variant
static_assert(std::is_same_v<std::variant_alternative_t<0, rows_storage_type>, rows_type>,
"rows_type must be the first alternative in rows_storage_type");
// Get address of the variant containing this rows_type
// When rows_type is active (index 0), it's at offset 0 in the variant's storage
rows_storage_type* variant_ptr = reinterpret_cast<rows_storage_type*>(&rows);
// Now get the mutation_partition from the variant
return *boost::intrusive::get_parent_from_member(variant_ptr, &mutation_partition::_rows);
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition::_rows);
}
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),

View File

@@ -1195,8 +1195,6 @@ private:
rlogger.info("{}", msg);
throw std::runtime_error(msg);
}
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
_rs._repair_compaction_locks[_frozen_topology_guard].push_back(std::move(lock_holder));

View File

@@ -84,10 +84,6 @@ class compaction_group {
seastar::named_gate _async_gate;
// Gates flushes.
seastar::named_gate _flush_gate;
// Gates sstable being added to the group.
// This prevents the group from being considered empty when sstables are being added.
// Crucial for tablet split which ACKs split for a table when all pre-split groups are empty.
seastar::named_gate _sstable_add_gate;
bool _tombstone_gc_enabled = true;
std::optional<compaction::compaction_backlog_tracker> _backlog_tracker;
repair_classifier_func _repair_sstable_classifier;
@@ -252,10 +248,6 @@ public:
return _flush_gate;
}
seastar::named_gate& sstable_add_gate() noexcept {
return _sstable_add_gate;
}
compaction::compaction_manager& get_compaction_manager() noexcept;
const compaction::compaction_manager& get_compaction_manager() const noexcept;
@@ -442,7 +434,7 @@ public:
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;

View File

@@ -604,28 +604,9 @@ public:
data_dictionary::table as_data_dictionary() const;
// The usage of these functions are restricted to preexisting sstables that aren't being
// moved anywhere, so should never be used in the context of file streaming and intra
// node migration. The only user today is distributed loader, which populates the
// sstables for each column family on boot.
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
// Restricted to new sstables produced by external processes such as repair.
// The sstable might undergo split if table is in split mode.
// If no need for split, the input sstable will only be attached to the sstable set.
// If split happens, the output sstables will be attached and the input sstable unlinked.
// On failure, the input sstable is unlinked and exception propagated to the caller.
// The on_add callback will be called on all sstables to be added into the set.
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
std::function<future<>(sstables::shared_sstable)> on_add,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
std::function<future<>(sstables::shared_sstable)> on_add);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable();
void set_truncation_time(db_clock::time_point truncated_at) noexcept {
@@ -743,9 +724,7 @@ private:
return _config.enable_cache && _schema->caching_options().enabled();
}
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
// This function can throw even if the sstable was added into the set. When the sstable was successfully
// added, the sstable ptr @sst will be set to nullptr. Allowing caller to optionally discard the sstable.
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy, bool trigger_compaction);
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction);
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);
@@ -1379,8 +1358,7 @@ public:
// Clones storage of a given tablet. Memtable is flushed first to guarantee that the
// snapshot (list of sstables) will include all the data written up to the time it was taken.
// If leave_unsealead is set, all the destination sstables will be left unsealed.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid);
friend class compaction_group;
friend class compaction::compaction_task_impl;

View File

@@ -721,7 +721,7 @@ public:
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override {
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
}
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
@@ -879,7 +879,7 @@ public:
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override;
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
return tablet_map().get_token_range_after_split(token);
}
@@ -1130,8 +1130,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
}
future<std::vector<sstables::shared_sstable>>
tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
co_await utils::get_local_injector().inject("maybe_split_new_sstable_wait", utils::wait_for_message(120s));
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
if (!tablet_map().needs_split()) {
co_return std::vector<sstables::shared_sstable>{sst};
}
@@ -1139,7 +1138,8 @@ tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sst
auto& cg = compaction_group_for_sstable(sst);
auto holder = cg.async_gate().hold();
auto& view = cg.view_for_sstable(sst);
co_return co_await _t.get_compaction_manager().maybe_split_new_sstable(sst, view, co_await split_compaction_options());
auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable");
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, co_await split_compaction_options());
}
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
@@ -1149,7 +1149,7 @@ future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
auto holder = async_gate().hold();
co_return co_await _sg_manager->maybe_split_new_sstable(sst);
co_return co_await _sg_manager->maybe_split_sstable(sst);
}
dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept {
@@ -1330,7 +1330,7 @@ future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_
}
future<utils::chunked_vector<sstables::entry_descriptor>>
table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
table::clone_tablet_storage(locator::tablet_id tid) {
utils::chunked_vector<sstables::entry_descriptor> ret;
auto holder = async_gate().hold();
@@ -1342,7 +1342,7 @@ table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
// by compaction while we are waiting for the lock.
auto deletion_guard = co_await get_sstable_list_permit();
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed));
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
});
co_return ret;
}
@@ -1354,10 +1354,10 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
}
future<>
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy offstrategy,
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
bool trigger_compaction) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () mutable noexcept {
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
// atomically load all opened sstables into column family.
if (!offstrategy) {
@@ -1369,8 +1369,6 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
if (trigger_compaction) {
try_trigger_compaction(cg);
}
// Reseting sstable ptr to inform the caller the sstable has been loaded successfully.
sst = nullptr;
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) {
return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key()));
});
@@ -1378,10 +1376,12 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
auto& cg = compaction_group_for_sstable(new_sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, new_sst, offstrategy, trigger_compaction);
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
}
}
future<>
@@ -1399,85 +1399,6 @@ table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>
trigger_compaction();
}
future<std::vector<sstables::shared_sstable>>
table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
std::function<future<>(sstables::shared_sstable)> on_add,
sstables::offstrategy offstrategy) {
std::vector<sstables::shared_sstable> ret, ssts;
std::exception_ptr ex;
try {
bool trigger_compaction = offstrategy == sstables::offstrategy::no;
auto& cg = compaction_group_for_sstable(new_sst);
// This prevents compaction group from being considered empty until the holder is released.
// Helpful for tablet split, where split is acked for a table when all pre-split groups are empty.
auto sstable_add_holder = cg.sstable_add_gate().hold();
ret = ssts = co_await maybe_split_new_sstable(new_sst);
// on sucessful split, input sstable is unlinked.
new_sst = nullptr;
for (auto& sst : ssts) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make sure compaction group is alive.
auto holder = cg.async_gate().hold();
co_await on_add(sst);
// If do_add_sstable_and_update_cache() throws after sstable has been loaded, the pointer
// sst passed by reference will be set to nullptr, so it won't be unlinked in the exception
// handler below.
co_await do_add_sstable_and_update_cache(cg, sst, offstrategy, trigger_compaction);
sst = nullptr;
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
// on failed split, input sstable is unlinked here.
if (new_sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
co_await new_sst->unlink();
}
// on failure after sucessful split, sstables not attached yet will be unlinked
co_await coroutine::parallel_for_each(ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
if (sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
co_await sst->unlink();
}
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(ret);
}
future<std::vector<sstables::shared_sstable>>
table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
std::function<future<>(sstables::shared_sstable)> on_add) {
std::exception_ptr ex;
std::vector<sstables::shared_sstable> ret;
// We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it,
// so the exception handling below will only have to unlink sstables not processed yet.
try {
for (auto& sst: new_ssts) {
auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add);
std::ranges::move(ssts, std::back_inserter(ret));
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
if (sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
co_await sst->unlink();
}
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(ret);
}
future<>
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
@@ -2691,8 +2612,8 @@ public:
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _t.get_sstables_manager();
}
sstables::shared_sstable make_sstable(sstables::sstable_state state) const override {
return _t.make_sstable(state);
sstables::shared_sstable make_sstable() const override {
return _t.make_sstable();
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
@@ -2810,7 +2731,6 @@ future<> compaction_group::stop(sstring reason) noexcept {
auto flush_future = co_await seastar::coroutine::as_future(flush());
co_await _flush_gate.close();
co_await _sstable_add_gate.close();
// FIXME: indentation
_compaction_disabler_for_views.clear();
co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s));
@@ -2824,7 +2744,7 @@ future<> compaction_group::stop(sstring reason) noexcept {
}
bool compaction_group::empty() const noexcept {
return _memtables->empty() && live_sstable_count() == 0 && _sstable_add_gate.get_count() == 0;
return _memtables->empty() && live_sstable_count() == 0;
}
const schema_ptr& compaction_group::schema() const {
@@ -3280,7 +3200,7 @@ db::replay_position table::highest_flushed_replay_position() const {
}
struct manifest_json : public json::json_base {
json::json_chunked_list<std::string_view> files;
json::json_chunked_list<sstring> files;
manifest_json() {
register_params();
@@ -3304,7 +3224,7 @@ table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets)
manifest_json manifest;
for (const auto& fsp : file_sets) {
for (auto& rf : *fsp) {
manifest.files.push(std::string_view(rf));
manifest.files.push(std::move(rf));
}
}
auto streamer = json::stream_object(std::move(manifest));

View File

@@ -390,11 +390,9 @@ dark_green = (195, 215, 195)
light_red = (255, 200, 200)
light_green = (200, 255, 200)
light_gray = (240, 240, 240)
scylla_blue = (87, 209, 229)
tablet_colors = {
(Tablet.STATE_NORMAL, None): GRAY,
(Tablet.STATE_NORMAL, 'repair'): scylla_blue,
(Tablet.STATE_JOINING, 'allow_write_both_read_old'): dark_green,
(Tablet.STATE_LEAVING, 'allow_write_both_read_old'): dark_red,
(Tablet.STATE_JOINING, 'write_both_read_old'): dark_green,
@@ -534,8 +532,6 @@ def update_from_cql(initial=False):
state = (Tablet.STATE_JOINING, tablet.stage)
elif replica in leaving:
state = (Tablet.STATE_LEAVING, tablet.stage)
elif tablet.stage == 'repair':
state = (Tablet.STATE_NORMAL, tablet.stage)
else:
state = (Tablet.STATE_NORMAL, None)

View File

@@ -224,13 +224,7 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
}
static const std::unordered_set<auth::resource> vector_search_system_resources = {
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY),
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::VERSIONS),
};
if ((cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) ||
(cmd.permission == auth::permission::SELECT && vector_search_system_resources.contains(cmd.resource))) {
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});

View File

@@ -6526,19 +6526,14 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
leaving.host, pending.host));
}
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
bool leave_unsealed = true;
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto& table = _db.local().find_column_family(tablet.table);
auto op = table.stream_in_progress();
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
co_return co_await table.clone_tablet_storage(tablet.tablet);
});
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto& mng = t.get_sstables_manager();
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
@@ -6546,8 +6541,7 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
// will still point to leaving replica at this stage in migration. If node goes down,
// SSTables will be loaded at pending replica and migration is retried, so correctness
// wise, we're good.
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
.unsealed_sstable = leave_unsealed };
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true };
co_await sst->load(sharder, cfg);
co_return sst;
};
@@ -6555,23 +6549,16 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
// Loads cloned sstables from leaving replica into pending one.
auto& table = _db.local().find_column_family(tablet.table);
auto& sstm = table.get_sstables_manager();
auto op = table.stream_in_progress();
dht::auto_refreshing_sharder sharder(table.shared_from_this());
std::unordered_set<sstables::shared_sstable> ssts;
std::vector<sstables::shared_sstable> ssts;
ssts.reserve(d.size());
for (auto&& sst_desc : d) {
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (ssts.contains(loading_sst)) {
auto cfg = sstm.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
co_await table.add_sstables_and_update_cache(ssts);
_view_building_worker.local().load_sstables(tablet.table, ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
}

View File

@@ -1931,10 +1931,6 @@ public:
const auto& table_groups = _tm->tablets().all_table_groups();
auto finalize_decision = [&] {
if (utils::get_local_injector().enter("tablet_resize_finalization_postpone")) {
return;
}
_stats.for_cluster().resizes_finalized++;
resize_plan.finalize_resize.insert(table);
};

View File

@@ -2623,10 +2623,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await _voter_handler.on_node_removed(replaced_node_id, _as);
}
}
utils::get_local_injector().inject("crash_coordinator_before_stream", [] {
rtlogger.info("crash_coordinator_before_stream: aborting");
abort();
});
utils::get_local_injector().inject("crash_coordinator_before_stream", [] { abort(); });
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
auto state = node.rs->state;
try {

View File

@@ -1696,9 +1696,7 @@ void writer::consume_end_of_stream() {
.map = _collector.get_ext_timestamp_stats()
});
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats));
if (!_cfg.leave_unsealed) {
_sst.seal_sstable(_cfg.backup).get();
}
_sst.seal_sstable(_cfg.backup).get();
}
uint64_t writer::data_file_position_for_tests() const {

View File

@@ -83,8 +83,6 @@ struct sstable_open_config {
bool current_shard_as_sstable_owner = false;
// Do not move the sharding metadata to the sharder, keeping it in the scylla metadata..
bool keep_sharding_metadata = false;
// Allows unsealed sstable to be loaded, since it must read components from temporary TOC instead.
bool unsealed_sstable = false;
};
}

View File

@@ -836,14 +836,13 @@ future<std::vector<sstring>> sstable::read_and_parse_toc(file f) {
// This is small enough, and well-defined. Easier to just read it all
// at once
future<> sstable::read_toc(sstable_open_config cfg) noexcept {
future<> sstable::read_toc() noexcept {
if (_recognized_components.size()) {
co_return;
}
try {
auto toc_type = cfg.unsealed_sstable ? component_type::TemporaryTOC : component_type::TOC;
co_await do_read_simple(toc_type, [&] (version_types v, file f) -> future<> {
co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> {
auto comps = co_await read_and_parse_toc(f);
for (auto& c: comps) {
// accept trailing newlines
@@ -901,8 +900,8 @@ future<std::unordered_map<component_type, file>> sstable::readable_file_for_all_
co_return std::move(files);
}
future<entry_descriptor> sstable::clone(generation_type new_generation, bool leave_unsealed) const {
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation, storage::leave_unsealed(leave_unsealed));
future<entry_descriptor> sstable::clone(generation_type new_generation) const {
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation);
co_return entry_descriptor(new_generation, _version, _format, component_type::TOC, _state);
}
@@ -1726,7 +1725,7 @@ void sstable::disable_component_memory_reload() {
}
future<> sstable::load_metadata(sstable_open_config cfg) noexcept {
co_await read_toc(cfg);
co_await read_toc();
// read scylla-meta after toc. Might need it to parse
// rest (hint extensions)
co_await read_scylla_metadata();
@@ -3961,13 +3960,11 @@ class sstable_stream_sink_impl : public sstable_stream_sink {
shared_sstable _sst;
component_type _type;
bool _last_component;
bool _leave_unsealed;
public:
sstable_stream_sink_impl(shared_sstable sst, component_type type, sstable_stream_sink_cfg cfg)
sstable_stream_sink_impl(shared_sstable sst, component_type type, bool last_component)
: _sst(std::move(sst))
, _type(type)
, _last_component(cfg.last_component)
, _leave_unsealed(cfg.leave_unsealed)
, _last_component(last_component)
{}
private:
future<> load_metadata() const {
@@ -4014,12 +4011,10 @@ public:
co_return co_await make_file_output_stream(std::move(f), stream_options);
}
future<shared_sstable> close() override {
future<shared_sstable> close_and_seal() override {
if (_last_component) {
// If we are the last component in a sequence, we can seal the table.
if (!_leave_unsealed) {
co_await _sst->_storage->seal(*_sst);
}
co_await _sst->_storage->seal(*_sst);
co_return std::move(_sst);
}
_sst = {};
@@ -4036,7 +4031,7 @@ public:
}
};
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, sstable_stream_sink_cfg cfg) {
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, bool last_component) {
auto desc = parse_path(component_filename, schema->ks_name(), schema->cf_name());
auto sst = sstm.make_sstable(schema, s_opts, desc.generation, state, desc.version, desc.format);
@@ -4047,7 +4042,7 @@ std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstab
type = component_type::TemporaryTOC;
}
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, cfg);
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, last_component);
}
generation_type

View File

@@ -109,7 +109,6 @@ struct sstable_writer_config {
size_t promoted_index_auto_scale_threshold;
uint64_t max_sstable_size = std::numeric_limits<uint64_t>::max();
bool backup = false;
bool leave_unsealed = false;
mutation_fragment_stream_validation_level validation_level;
std::optional<db::replay_position> replay_position;
std::optional<int> sstable_level;
@@ -418,8 +417,8 @@ public:
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
component_name get_filename(component_type f = component_type::Data) const {
return component_name(*this, f);
component_name get_filename() const {
return component_name(*this, component_type::Data);
}
component_name toc_filename() const {
@@ -694,7 +693,7 @@ private:
future<> update_info_for_opened_data(sstable_open_config cfg = {});
future<> read_toc(sstable_open_config cfg = {}) noexcept;
future<> read_toc() noexcept;
future<> read_summary() noexcept;
void write_summary() {
@@ -1070,9 +1069,8 @@ public:
future<std::unordered_map<component_type, file>> readable_file_for_all_components() const;
// Clones this sstable with a new generation, under the same location as the original one.
// If leave_unsealed is true, the destination sstable is left unsealed.
// Implementation is underlying storage specific.
future<entry_descriptor> clone(generation_type new_generation, bool leave_unsealed = false) const;
future<entry_descriptor> clone(generation_type new_generation) const;
struct lesser_reclaimed_memory {
// comparator class to be used by the _reclaimed set in sstables manager
@@ -1246,18 +1244,13 @@ public:
// closes this component. If this is the last component in a set (see "last_component" in creating method below)
// the table on disk will be sealed.
// Returns sealed sstable if last, or nullptr otherwise.
virtual future<shared_sstable> close() = 0;
virtual future<shared_sstable> close_and_seal() = 0;
virtual future<> abort() = 0;
};
struct sstable_stream_sink_cfg {
bool last_component = false;
bool leave_unsealed = false;
};
// Creates a sink object which can receive a component file sourced from above source object data.
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, sstable_stream_sink_cfg cfg);
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, bool last_component);
} // namespace sstables

View File

@@ -50,14 +50,7 @@ class filesystem_storage final : public sstables::storage {
std::optional<std::filesystem::path> _temp_dir; // Valid while the sstable is being created, until sealed
private:
struct mark_for_removal_tag {};
struct leave_unsealed_tag {};
enum class link_mode {
default_mode,
mark_for_removal,
leave_unsealed,
};
using mark_for_removal = bool_class<class mark_for_removal_tag>;
template <typename Comp>
requires std::is_same_v<Comp, component_type> || std::is_same_v<Comp, sstring>
@@ -68,9 +61,7 @@ private:
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> remove_temp_dir();
virtual future<> create_links(const sstable& sst, const std::filesystem::path& dir) const override;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, link_mode mode) const;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const;
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> dst_gen) const;
future<> touch_temp_dir(const sstable& sst);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay) override;
@@ -92,7 +83,7 @@ public:
{}
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
@@ -365,13 +356,8 @@ future<> filesystem_storage::check_create_links_replay(const sstable& sst, const
/// \param sst - the sstable to work on
/// \param dst_dir - the destination directory.
/// \param generation - the generation of the destination sstable
/// \param mode - what will be done after all components were linked
/// mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
/// leave_unsealed - leaves the destination sstable unsealed
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, link_mode mode) const {
// They're mutually exclusive, so we can assume only one is set.
bool mark_for_removal = mode == link_mode::mark_for_removal;
bool leave_unsealed = mode == link_mode::leave_unsealed;
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
auto comps = sst.all_components();
co_await check_create_links_replay(sst, dst_dir, generation, comps);
@@ -380,11 +366,7 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
co_await sst.sstable_write_io_check(idempotent_link_file, fmt::to_string(sst.filename(component_type::TOC)), std::move(dst));
auto dir = opened_directory(dst_dir);
co_await dir.sync(sst._write_error_handler);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation, leave_unsealed] (auto p) {
// Skips the linking of TOC file if the destination will be left unsealed.
if (leave_unsealed && p.first == component_type::TOC) {
return make_ready_future<>();
}
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
auto src = filename(sst, _dir.native(), sst._generation, p.second);
auto dst = filename(sst, dst_dir, generation, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
@@ -397,10 +379,9 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
auto src_temp_toc = filename(sst, _dir.native(), sst._generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await _dir.sync(sst._write_error_handler);
} else if (!leave_unsealed) {
} else {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
// This is bypassed if destination will be left unsealed.
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
co_await dir.sync(sst._write_error_handler);
@@ -408,23 +389,15 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
}
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const {
return create_links_common(sst, dst_dir, dst_gen, link_mode::mark_for_removal);
}
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const {
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::leave_unsealed);
}
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen) const {
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::default_mode);
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), mark_for_removal::no);
}
future<> filesystem_storage::create_links(const sstable& sst, const std::filesystem::path& dir) const {
return create_links_common(sst, dir.native(), sst._generation, link_mode::default_mode);
return create_links_common(sst, dir.native(), sst._generation, mark_for_removal::no);
}
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed leave_unsealed) const {
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
std::filesystem::path snapshot_dir;
if (abs) {
snapshot_dir = dir;
@@ -432,11 +405,7 @@ future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_
snapshot_dir = _dir.path() / dir;
}
co_await sst.sstable_touch_directory_io_check(snapshot_dir);
if (leave_unsealed) {
co_await create_links_common(sst, snapshot_dir, std::move(gen), leave_unsealed_tag{});
} else {
co_await create_links_common(sst, snapshot_dir, std::move(gen));
}
co_await create_links_common(sst, snapshot_dir, std::move(gen));
}
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
@@ -444,7 +413,7 @@ future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generatio
sstring old_dir = _dir.native();
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal_tag{});
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
co_await change_dir(new_dir);
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
@@ -629,7 +598,7 @@ public:
{}
future<> seal(const sstable& sst) override;
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>, storage::leave_unsealed) const override;
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>) const override;
future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
void open(sstable& sst) override;
@@ -846,7 +815,7 @@ future<> object_storage_base::unlink_component(const sstable& sst, component_typ
}
}
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const {
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
co_return;
}

View File

@@ -97,10 +97,9 @@ public:
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage
using leave_unsealed = bool_class<struct leave_unsealed_tag>;
virtual future<> seal(const sstable& sst) = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}, leave_unsealed lu = leave_unsealed::no) const = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}) const = 0;
virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0;
// runs in async context
virtual void open(sstable& sst) = 0;

View File

@@ -63,45 +63,30 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
}
schema_ptr s = reader.schema();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
auto cfg = cf->get_sstables_manager().configure_writer(origin);
cfg.leave_unsealed = true;
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cfg, encoding_stats{}).then([sst] {
return sst->open_data();
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] -> future<std::vector<sstables::shared_sstable>> {
auto on_add = [sst, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] (sstables::shared_sstable loading_sst) -> future<> {
if (repaired_at && sstables::repair_origin == origin) {
loading_sst->being_repaired = frozen_guard;
if (sstable_list_to_mark_as_repaired) {
sstable_list_to_mark_as_repaired->insert(loading_sst);
}
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] -> future<> {
if (repaired_at && sstables::repair_origin == origin) {
sst->being_repaired = frozen_guard;
if (sstable_list_to_mark_as_repaired) {
sstable_list_to_mark_as_repaired->insert(sst);
}
if (loading_sst == sst) {
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
}
if (offstrategy && sstables::repair_origin == origin) {
sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name());
cf->enable_off_strategy_trigger();
}
co_return co_await cf->add_new_sstable_and_update_cache(sst, on_add, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vb, &vbw] (std::vector<sstables::shared_sstable> new_sstables) mutable -> future<> {
auto& vb_ = vb;
auto new_sstables_ = std::move(new_sstables);
auto table = cf;
co_await cf->add_sstable_and_update_cache(sst, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vb, &vbw]() mutable -> future<> {
if (use_view_update_path == db::view::sstable_destination_decision::staging_managed_by_vbc) {
co_return co_await vbw.local().register_staging_sstable_tasks(new_sstables_, cf->schema()->id());
return vbw.local().register_staging_sstable_tasks({sst}, cf->schema()->id());
} else if (use_view_update_path == db::view::sstable_destination_decision::staging_directly_to_generator) {
co_await coroutine::parallel_for_each(new_sstables_, [&vb_, &table] (sstables::shared_sstable sst) -> future<> {
return vb_.local().register_staging_sstable(sst, table);
});
return vb.local().register_staging_sstable(sst, std::move(cf));
}
co_return;
return make_ready_future<>();
});
};
if (!offstrategy) {

View File

@@ -52,16 +52,8 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
sstables::sstable_open_config cfg { .unsealed_sstable = true };
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto cfg = sstm.configure_writer(sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
co_await sst->load(erm->get_sharder(*t.schema()));
co_await t.add_sstable_and_update_cache(sst);
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
if (state == sstables::sstable_state::staging) {
@@ -72,7 +64,7 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
// so then, the view building coordinator can decide to process it once the migration
// is finished.
// (Instead of registering the sstable to view update generator which may process it immediately.)
co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id());
co_await sharded_vbw.local().register_staging_sstable_tasks({sst}, t.schema()->id());
}
});
}
@@ -351,11 +343,7 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
.leave_unsealed = true };
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, meta.fops == file_ops::load_sstables);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{
[sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> {
@@ -363,7 +351,7 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
co_await sstable_sink->abort();
co_return;
}
auto sst = co_await sstable_sink->close();
auto sst = co_await sstable_sink->close_and_seal();
if (sst) {
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id);
auto desc = sst->get_descriptor(sstables::component_type::TOC);

View File

@@ -110,7 +110,7 @@ public:
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); }
virtual sstables::shared_sstable make_sstable() const override { return _sstable_factory(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }

View File

@@ -387,27 +387,4 @@ SEASTAR_TEST_CASE(select_from_vector_indexed_table) {
enable_tablets(db_config_with_auth()));
}
SEASTAR_TEST_CASE(select_from_vector_search_system_table) {
return do_with_cql_env_thread(
[](auto&& env) {
create_user_if_not_exists(env, bob);
with_user(env, bob, [&env] {
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.group0_history").get(), exceptions::unauthorized_exception,
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
});
with_user(env, bob, [&env] {
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.versions").get(), exceptions::unauthorized_exception,
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
});
cquery_nofail(env, "GRANT VECTOR_SEARCH_INDEXING ON ALL KEYSPACES TO bob");
with_user(env, bob, [&env] {
cquery_nofail(env, "SELECT * FROM system.group0_history");
});
with_user(env, bob, [&env] {
cquery_nofail(env, "SELECT * FROM system.versions");
});
},
db_config_with_auth());
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -6275,11 +6275,11 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
auto& cm = t->get_compaction_manager();
auto split_opt = compaction::compaction_type_options::split{classify_fn};
auto new_ssts = cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), split_opt).get();
auto new_ssts = cm.maybe_split_sstable(input, t.as_compaction_group_view(), split_opt).get();
BOOST_REQUIRE(new_ssts.size() == expected_output_size);
for (auto& sst : new_ssts) {
// split sstables don't require further split.
auto ssts = cm.maybe_split_new_sstable(sst, t.as_compaction_group_view(), split_opt).get();
auto ssts = cm.maybe_split_sstable(sst, t.as_compaction_group_view(), split_opt).get();
BOOST_REQUIRE(ssts.size() == 1);
BOOST_REQUIRE(ssts.front() == sst);
}
@@ -6291,97 +6291,9 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
}
return classify_fn(t);
};
BOOST_REQUIRE_THROW(cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
BOOST_REQUIRE_THROW(cm.maybe_split_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
std::runtime_error);
});
}
SEASTAR_TEST_CASE(unsealed_sstable_compaction_test) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder("tests", "unsealed_sstable_compaction_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto t = env.make_table_for_tests(s);
auto close_t = deferred_stop(t);
t->start();
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
sstable_writer_config sst_cfg = env.manager().configure_writer();
sst_cfg.leave_unsealed = true;
auto unsealed_sstable = make_sstable_easy(env, make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)), sst_cfg);
BOOST_REQUIRE(file_exists(unsealed_sstable->get_filename(sstables::component_type::TemporaryTOC).format()).get());
auto sst_gen = env.make_sst_factory(s);
auto info = compact_sstables(env, compaction::compaction_descriptor({ unsealed_sstable }), t, sst_gen).get();
BOOST_REQUIRE(info.new_sstables.size() == 1);
});
}
SEASTAR_TEST_CASE(sstable_clone_leaving_unsealed_dest_sstable) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pk = ss.make_pkey();
auto mut1 = mutation(s, pk);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut1)});
auto table = env.make_table_for_tests(s);
auto close_table = deferred_stop(table);
sstables::sstable_generation_generator gen_generator;
bool leave_unsealed = true;
auto d = sst->clone(gen_generator(), leave_unsealed).get();
auto sst2 = env.make_sstable(s, d.generation, d.version, d.format);
sst2->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::TOC).format()).get());
BOOST_REQUIRE(file_exists(sst2->get_filename(sstables::component_type::TemporaryTOC).format()).get());
leave_unsealed = false;
d = sst->clone(gen_generator(), leave_unsealed).get();
auto sst3 = env.make_sstable(s, d.generation, d.version, d.format);
sst3->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
BOOST_REQUIRE(file_exists(sst3->get_filename(sstables::component_type::TOC).format()).get());
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::TemporaryTOC).format()).get());
});
}
SEASTAR_TEST_CASE(failure_when_adding_new_sstable_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pk = ss.make_pkey();
auto mut1 = mutation(s, pk);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {mut1});
auto table = env.make_table_for_tests(s);
auto close_table = deferred_stop(table);
auto on_add = [] (sstables::shared_sstable) { throw std::runtime_error("fail to seal"); return make_ready_future<>(); };
BOOST_REQUIRE_THROW(table->add_new_sstable_and_update_cache(sst, on_add).get(), std::runtime_error);
// Verify new sstable was unlinked on failure.
BOOST_REQUIRE(!file_exists(sst->get_filename(sstables::component_type::Data).format()).get());
auto sst2 = make_sstable_containing(env.make_sstable(s), {mut1});
auto sst3 = make_sstable_containing(env.make_sstable(s), {mut1});
BOOST_REQUIRE_THROW(table->add_new_sstables_and_update_cache({sst2, sst3}, on_add).get(), std::runtime_error);
// Verify both sstables are unlinked on failure.
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::Data).format()).get());
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::Data).format()).get());
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -17,7 +17,6 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, path_to
from test.pylib.runner import testpy_test_fixture_scope
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
@@ -59,20 +58,6 @@ logger = logging.getLogger(__name__)
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
async def decode_backtrace(build_mode: str, input: str):
executable = Path(path_to(build_mode, "scylla"))
proc = await asyncio.create_subprocess_exec(
(TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(),
"-e",
executable.absolute(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(input=input.encode())
return f"{stdout.decode()}\n{stderr.decode()}"
def pytest_addoption(parser):
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
@@ -258,11 +243,6 @@ async def manager(request: pytest.FixtureRequest,
# test failure.
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
failed = failed or found_errors
if failed:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
@@ -286,44 +266,10 @@ async def manager(request: pytest.FixtureRequest,
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status["server_broken"]:
pytest.fail(
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
f"test case {test_case_name} leave unfinished tasks on Scylla server. Server marked as broken,"
f" server_broken_reason: {cluster_status["message"]}"
)
if found_errors:
full_message = []
for server, data in found_errors.items():
summary = []
detailed = []
if criticals := data.get("critical", []):
summary.append(f"{len(criticals)} critical error(s)")
detailed.extend(map(str.rstrip, criticals))
if backtraces := data.get("backtraces", []):
summary.append(f"{len(backtraces)} backtrace(s)")
with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file:
for backtrace in backtraces:
bt_file.write(backtrace + "\n\n")
decoded_bt = await decode_backtrace(build_mode, backtrace)
bt_file.write(decoded_bt + "\n\n")
detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}")
if errors := data.get("error", []):
summary.append(f"{len(errors)} error(s)")
detailed.extend(map(str.rstrip, errors))
if cores := data.get("cores", []):
summary.append(f"{len(cores)} core(s): {', '.join(cores)}")
if summary:
summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})"
detailed = [f" {line}" for line in detailed]
full_message.append(summary_line)
full_message.extend(detailed)
with open(failed_test_dir_path / "found_errors.txt", "w") as f:
f.write("\n".join(full_message))
pytest.fail(f"\n{'\n'.join(full_message)}")
# "cql" fixture: set up client object for communicating with the CQL API.
# Since connection is managed by manager just return that object

View File

@@ -110,9 +110,6 @@ def fixture_dtest_setup(request: FixtureRequest,
except Exception as e: # noqa: BLE001
logger.error("Error stopping cluster: %s", str(e))
manager.ignore_log_patterns.extend(dtest_setup.ignore_log_patterns)
manager.ignore_cores_log_patterns.extend(dtest_setup.ignore_cores_log_patterns)
try:
if not dtest_setup.allow_log_errors:
exclude_errors = []

View File

@@ -14,7 +14,7 @@ import time
from collections import defaultdict
from functools import cached_property
from functools import wraps
from typing import List, Dict, Callable, Optional, Tuple
from typing import List, Dict, Callable
from cassandra import ConsistencyLevel
from cassandra import WriteTimeout, ReadTimeout, OperationTimedOut
@@ -64,7 +64,6 @@ class Worker:
"""
A single worker increments its dedicated column `s{i}` via LWT:
UPDATE .. SET s{i}=? WHERE pk=? IF <guards on other cols> AND s{i}=?
bump global phase-ops counter via on_applied()
It checks for applied state and retries on "uncertainty" timeouts.
"""
def __init__(
@@ -77,10 +76,8 @@ class Worker:
other_columns: List[int],
get_lower_bound: Callable[[int, int], int],
on_applied: Callable[[int, int, int], None],
stop_event: asyncio.Event,
counter_update_statement: Optional[PreparedStatement] = None,
counters_random_delta: bool = False,
counters_max_delta: int = 5,
stop_event: asyncio.Event
):
self.stop_event = stop_event
self.success_counts: Dict[int, int] = {pk: 0 for pk in pks}
@@ -94,11 +91,7 @@ class Worker:
self.cql = cql
self.get_lower_bound = get_lower_bound
self.on_applied = on_applied
# counters
self.counter_update_statement = counter_update_statement
self.counters_random_delta = counters_random_delta
self.counters_max_delta = max(1, counters_max_delta)
self.counter_deltas: Dict[int, int] = {pk: 0 for pk in pks}
async def verify_update_through_select(self, pk, new_val, prev_val):
"""
@@ -113,24 +106,6 @@ class Worker:
assert current_val == new_val or current_val == prev_val
return current_val == new_val
def _next_counter_delta(self) -> int:
"""
Compute the next delta to apply to the counter table.
If random mode is disabled -> always +1.
If random mode is enabled -> random value from
[-max_delta..-1] U [1..max_delta].
"""
if not self.counters_random_delta:
return 1
# Avoid 0 by choosing magnitude in [1, max] and random sign.
mag = self.rng.randint(1, self.counters_max_delta)
sign = -1 if self.rng.random() < 0.5 else 1
return sign * mag
async def _inc_counter(self, pk: int, delta: int) -> None:
stmt = self.counter_update_statement.bind([delta, pk])
stmt.consistency_level = ConsistencyLevel.LOCAL_QUORUM
await self.cql.run_async(stmt)
def stop(self) -> None:
self.stop_event.set()
@@ -195,11 +170,6 @@ class Worker:
self.on_applied(pk, self.worker_id, new_val)
self.success_counts[pk] += 1
if self.counter_update_statement:
delta = self._next_counter_delta()
self.counter_deltas[pk] += delta
await self._inc_counter(pk, delta)
await asyncio.sleep(0.1)
except Exception:
@@ -217,8 +187,7 @@ class BaseLWTTester:
def __init__(
self, manager: ManagerClient, ks: str, tbl: str,
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS, use_counters: bool = False,
counters_random_delta: bool = False, counters_max_delta: int = 5, counter_tbl: Optional[str] = None
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS
):
self.ks = ks
self.tbl = tbl
@@ -233,12 +202,6 @@ class BaseLWTTester:
self.migrations = 0
self.phase = "warmup" # "warmup" -> "migrating" -> "post"
self.phase_ops = defaultdict(int)
# counters config
self.use_counters = use_counters
self.counters_random_delta = counters_random_delta
self.counters_max_delta = counters_max_delta
self.counter_tbl = counter_tbl or (f"{tbl}_ctr" if use_counters else None)
def _get_lower_bound(self, pk: int, col_idx: int) -> int:
return self.lb_counts[pk][col_idx]
@@ -270,14 +233,6 @@ class BaseLWTTester:
def create_workers(self, stop_event) -> List[Worker]:
workers: List[Worker] = []
counter_stmt: Optional[PreparedStatement] = None
if self.use_counters:
counter_stmt = self.cql.prepare(
f"UPDATE {self.ks}.{self.counter_tbl} "
f"SET c = c + ? WHERE pk = ?"
)
for i in range(self.num_workers):
other_columns = [j for j in range(self.num_workers) if j != i]
cond = " AND ".join([*(f"s{j} >= ?" for j in other_columns), f"s{i} = ?"])
@@ -292,9 +247,6 @@ class BaseLWTTester:
other_columns=other_columns,
get_lower_bound=self._get_lower_bound,
on_applied=self._on_applied,
counter_update_statement=counter_stmt,
counters_random_delta=self.counters_random_delta,
counters_max_delta=self.counters_max_delta,
)
workers.append(worker)
return workers
@@ -306,11 +258,6 @@ class BaseLWTTester:
f"CREATE TABLE {self.ks}.{self.tbl} (pk int PRIMARY KEY, {cols_def})"
)
logger.info("Created table %s.%s with %d columns", self.ks, self.tbl, self.num_workers)
if self.use_counters:
await self.cql.run_async(
f"CREATE TABLE {self.ks}.{self.counter_tbl} (pk int PRIMARY KEY, c counter)"
)
logger.info("Created counter table %s.%s", self.ks, self.counter_tbl)
async def initialize_rows(self):
"""
@@ -349,7 +296,7 @@ class BaseLWTTester:
assert not errs, f"worker errors: {errs}"
logger.info("All workers stopped")
async def _verify_base_table(self):
async def verify_consistency(self):
"""Ensure every (pk, column) reflects the number of successful CAS writes."""
# Run SELECTs for all PKs in parallel using prepared statement
tasks = []
@@ -373,35 +320,6 @@ class BaseLWTTester:
total_ops = sum(sum(w.success_counts.values()) for w in self.workers)
logger.info("Consistency verified %d total successful CAS operations", total_ops)
async def _verify_counters(self):
if not self.use_counters:
return
stmt = SimpleStatement(
f"SELECT pk, c FROM {self.ks}.{self.counter_tbl}",
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
)
rows = await self.cql.run_async(stmt)
db_values: Dict[int, int] = {row.pk: row.c for row in rows}
mismatches = []
for pk in self.pks:
actual = db_values.get(pk, 0)
expected = sum(worker.counter_deltas.get(pk, 0) for worker in self.workers)
if actual != expected:
mismatches.append(
f"counter mismatch pk={pk} c={actual}, expected={expected}"
)
assert not mismatches, "Counter consistency violations: " + "; ".join(mismatches)
total_delta = sum(sum(worker.counter_deltas.values()) for worker in self.workers)
logger.info("Counter table consistency verified total delta=%d", total_delta)
async def verify_consistency(self):
await self._verify_base_table()
await self._verify_counters()
async def get_token_for_pk(cql, ks: str, tbl: str, pk: int) -> int:
"""Get the token for a given primary key"""

View File

@@ -1,412 +0,0 @@
# Copyright (C) 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import asyncio
import logging
import random
import re
import time
from typing import Dict
import pytest
from test.cluster.conftest import skip_mode
from test.cluster.lwt.lwt_common import (
BaseLWTTester,
DEFAULT_WORKERS,
DEFAULT_NUM_KEYS,
wait_for_tablet_count
)
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError
from test.pylib.tablets import get_tablet_count
from test.pylib.tablets import get_tablet_replicas
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
TARGET_RESIZE_COUNT = 20
NUM_MIGRATIONS = 20
WARMUP_LWT_CNT = 100
POST_LWT_CNT = 100
PHASE_WARMUP = "warmup"
PHASE_RESIZE = "resize"
PHASE_POST = "post"
MIN_TABLETS = 1
MAX_TABLETS = 20
RESIZE_TIMEOUT = 240
MIGRATE_ONE_TIMEOUT_S = 60
NO_REPLICA_RE = re.compile(r"has no replica on", re.IGNORECASE)
DST_REPLICA_RE = re.compile(r"has replica on", re.IGNORECASE)
def _err_code(e: Exception):
return getattr(e, "code", None)
def _err_text(e: Exception):
return getattr(e, "text", "") or str(e)
def _is_tablet_in_transition_http_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and "in transition" in _err_text(e).lower()
def _is_no_replica_on_src_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and NO_REPLICA_RE.search(_err_text(e)) is not None
def _is_dst_already_replica_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and DST_REPLICA_RE.search(_err_text(e)) is not None
async def _move_tablet_with_retry(manager, src_server, ks, tbl,
src_host_id, src_shard, dst_host_id, dst_shard, token,
*, timeout_s=MIGRATE_ONE_TIMEOUT_S, base_sleep=0.1, max_sleep=2.0):
deadline = time.time() + timeout_s
sleep = base_sleep
while True:
try:
await manager.api.move_tablet(
src_server.ip_addr, ks, tbl,
src_host_id, src_shard, dst_host_id, dst_shard, token
)
return
except Exception as e:
if _is_tablet_in_transition_http_error(e) and time.time() + sleep < deadline:
logger.info("Token %s in transition, retry in %.2fs", token, sleep)
await asyncio.sleep(sleep + random.uniform(0, sleep))
sleep = min(sleep * 1.7, max_sleep)
continue
raise
async def tablet_migration_ops(
stop_event: asyncio.Event,
manager: ManagerClient,
servers,
tester: BaseLWTTester,
table: str,
num_ops: int,
pause_range=(0.5, 2.0),
*,
server_properties,
) -> None:
logger.info("Starting tablet migration ops for %s.%s: target=%d", tester.ks, table, num_ops)
migration_count = 0
intranode_ratio = 0.3
# server_id -> rack
server_id_to_rack: Dict[str, str] = {
s.server_id: prop["rack"] for s, prop in zip(servers, server_properties)
}
host_ids = await asyncio.gather(
*(manager.get_host_id(s.server_id) for s in servers)
)
# server_id -> host_id и host_id -> server
server_id_to_host_id: Dict[str, str] = {
s.server_id: hid for s, hid in zip(servers, host_ids)
}
host_id_to_server = {
hid: s for s, hid in zip(servers, host_ids)
}
attempt = 0
while not stop_event.is_set() and migration_count < num_ops:
attempt += 1
sample_pk = random.choice(tester.pks)
token = tester.pk_to_token[sample_pk]
replicas = await get_tablet_replicas(
manager, servers[0], tester.ks, table, token
)
src_host_id, src_shard = random.choice(replicas)
src_server = host_id_to_server.get(src_host_id)
assert src_server is not None, (
f"Source host_id {src_host_id} for token {token} not found in host_id_to_server (attempt {attempt})"
)
if random.random() < intranode_ratio:
dst_host_id = src_host_id
dst_server = src_server
dst_shard = 0 if src_shard != 0 else 1
else:
replica_hids = {h for (h, _sh) in replicas}
src_rack = server_id_to_rack[src_server.server_id]
same_rack_candidates = [
s for s in servers if server_id_to_rack[s.server_id] == src_rack
and server_id_to_host_id[s.server_id] not in replica_hids
]
assert same_rack_candidates, (
f"No same-rack non-replica candidate for token {token} (attempt {attempt})"
)
dst_server = random.choice(same_rack_candidates)
dst_host_id = server_id_to_host_id[dst_server.server_id]
dst_shard = 0
try:
await _move_tablet_with_retry(
manager, src_server, tester.ks, table,
src_host_id, src_shard, dst_host_id, dst_shard, token,
timeout_s=60,
)
migration_count += 1
logger.info(
"Completed migration #%d (token=%s -> %s:%d) for %s.%s",
migration_count, token, dst_server.ip_addr, dst_shard, tester.ks, table,
)
await asyncio.sleep(random.uniform(*pause_range))
continue
except Exception as e:
if _is_tablet_in_transition_http_error(e):
logger.info("Token %s in transition, switching token (attempt %d)",
token, attempt)
continue
if _is_no_replica_on_src_error(e) or _is_dst_already_replica_error(e):
logger.info("Src replica vanished for token %s, re-pick (attempt %d)",
token, attempt)
continue
raise
assert migration_count == num_ops, f"Only completed {migration_count}/{num_ops} migrations for {tester.ks}.{table}"
logger.info("Completed tablet migration ops for %s.%s: %d/%d", tester.ks, table, migration_count, num_ops)
def powers_of_two_in_range(lo: int, hi: int):
if lo > hi or hi < 1:
return []
lo = max(1, lo)
start_e = (lo - 1).bit_length()
end_e = hi.bit_length()
return [1 << e for e in range(start_e, end_e + 1) if (1 << e) <= hi]
async def run_random_resizes(
stop_event_: asyncio.Event,
manager: ManagerClient,
servers,
tester: BaseLWTTester,
ks: str,
table: str,
counter_table: str,
target_steps: int = TARGET_RESIZE_COUNT,
pause_range=(0.5, 2.0),
):
"""
Perform randomized tablet count changes (splits/merges) on the main LWT table
and its counter table. Runs until target resize count is reached or stop_event_
is set. Returns a dict with simple stats.
"""
split_count = 0
merge_count = 0
current_resize_count = 0
pow2_targets = powers_of_two_in_range(MIN_TABLETS, MAX_TABLETS)
while not stop_event_.is_set() and current_resize_count < target_steps:
# Drive resize direction from the main table.
current_main = await get_tablet_count(manager, servers[0], ks, table)
candidates = [t for t in pow2_targets if t != current_main]
target_cnt = random.choice(candidates)
direction = "split" if target_cnt > current_main else "merge"
logger.info(
"[%s] starting: %s.%s=%d, %s.%s -> target %d",
direction.upper(), ks, table, current_main, ks,
counter_table, target_cnt
)
tables = [table, counter_table]
# Apply ALTER TABLE to both tables.
for tbl in tables:
await tester.cql.run_async(
f"ALTER TABLE {ks}.{tbl} "
f"WITH tablets = {{'min_tablet_count': {target_cnt}}}"
)
if direction == "split":
predicate = lambda c, tgt=target_cnt: c >= tgt
else:
predicate = lambda c, tgt=target_cnt: c <= tgt
# Wait for both tables to converge.
main_after, counter_after = await asyncio.gather(
wait_for_tablet_count(
manager,
servers[0],
tester.ks,
table,
predicate=predicate,
target=target_cnt,
timeout_s=RESIZE_TIMEOUT,
),
wait_for_tablet_count(
manager,
servers[0],
tester.ks,
counter_table,
predicate=predicate,
target=target_cnt,
timeout_s=RESIZE_TIMEOUT,
),
)
# Sanity: both tables should end up with the same tablet count.
assert main_after == counter_after, (
f"Tablet counts diverged: {ks}.{table}={main_after}, "
f"{ks}.{counter_table}={counter_after}"
)
if direction == "split":
logger.info(
"[SPLIT] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
ks, table, current_main, main_after, ks, counter_table,
counter_after, target_cnt
)
assert main_after >= current_main, (
f"Tablet count expected to increase during split "
f"(was {current_main}, now {main_after})"
)
split_count += 1
else:
logger.info(
"[MERGE] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
ks, table, current_main, main_after, ks, counter_table,
counter_after, target_cnt
)
assert main_after <= current_main, (
f"Tablet count expected to decrease during merge "
f"(was {current_main}, now {main_after})"
)
merge_count += 1
current_resize_count += 1
await asyncio.sleep(random.uniform(*pause_range))
return {
"steps_done": current_resize_count,
"seen_split": split_count,
"seen_merge": merge_count,
}
@pytest.mark.asyncio
@skip_mode("debug", "debug mode is too slow for this test")
async def test_multi_column_lwt_migrate_and_random_resizes(manager: ManagerClient):
cfg = {
"enable_tablets": True,
"tablet_load_stats_refresh_interval_in_seconds": 1,
"target-tablet-size-in-bytes": 1024 * 16,
}
properties = [
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
]
cmdline = [
'--logger-log-level', 'paxos=trace', '--smp=2',
]
servers = await manager.servers_add(6, config=cfg, property_file=properties, cmdline=cmdline)
async with new_test_keyspace(
manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} "
"AND tablets = {'initial': 1}",
) as ks:
stop_event_ = asyncio.Event()
table = "lwt_split_merge_table"
cnt_table = "lwt_split_merge_counters"
tester = BaseLWTTester(
manager,
ks,
table,
num_workers=DEFAULT_WORKERS,
num_keys=DEFAULT_NUM_KEYS,
use_counters=True,
counters_random_delta=True,
counters_max_delta=5,
counter_tbl=cnt_table,
)
await tester.create_schema()
await tester.initialize_rows()
await tester.start_workers(stop_event_)
try:
# PHASE: warmup
tester.set_phase(PHASE_WARMUP)
logger.info("LWT warmup: waiting for %d applied CAS", WARMUP_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_WARMUP, WARMUP_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT warmup complete: %d ops", tester.get_phase_ops(PHASE_WARMUP))
# PHASE: resize + migrate
tester.set_phase(PHASE_RESIZE)
logger.info("Starting RESIZE (random powers-of-two) + %d migrations per table", NUM_MIGRATIONS)
resize_task = asyncio.create_task(
run_random_resizes(
stop_event_=stop_event_,
manager=manager,
servers=servers,
tester=tester,
ks=ks,
table=table,
target_steps=TARGET_RESIZE_COUNT,
counter_table=cnt_table,
)
)
migrate_task = asyncio.create_task(
tablet_migration_ops(
stop_event_,
manager, servers, tester,
num_ops=NUM_MIGRATIONS,
pause_range=(0.3, 1.0),
server_properties=properties,
table=table,
)
)
migrate_cnt_task = asyncio.create_task(
tablet_migration_ops(
stop_event_,
manager, servers, tester,
num_ops=NUM_MIGRATIONS,
pause_range=(0.3, 1.0),
server_properties=properties,
table=cnt_table
)
)
resize_stats = await resize_task
await asyncio.gather(migrate_task, migrate_cnt_task)
logger.info(
"Randomized resize stats: steps_done=%d, split=%d, merge=%d; LWT ops during resize=%d",
resize_stats["steps_done"], resize_stats["seen_split"], resize_stats["seen_merge"],
tester.get_phase_ops(PHASE_RESIZE),
)
assert resize_stats["steps_done"] >= 1, "Resize phase performed 0 steps"
assert tester.get_phase_ops(PHASE_RESIZE) > 0, "Expected LWT ops during RESIZE phase"
# PHASE: post
tester.set_phase(PHASE_POST)
logger.info("LWT post resize: waiting for %d applied CAS", POST_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_POST, POST_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT post resize complete: %d ops", tester.get_phase_ops(PHASE_POST))
total_ops = sum(tester.phase_ops.values())
assert total_ops >= (WARMUP_LWT_CNT + POST_LWT_CNT), f"Too few total LWT ops: {total_ops}"
finally:
await tester.stop_workers()
await tester.verify_consistency()
logger.info("Combined LWT during random split/merge + migrations test completed successfully")

View File

@@ -121,7 +121,7 @@ async def test_change_two(manager, random_tables, build_mode):
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
if build_mode != 'release':
s0_logs = await manager.server_open_log(servers[0].server_id)
await s0_logs.wait_for('crash-before-prev-ip-removed hit, killing the node')
@@ -132,7 +132,7 @@ async def test_change_two(manager, random_tables, build_mode):
await wait_proper_ips([servers[0], servers[1]])
await manager.server_start(servers[2].server_id)
servers[2] = servers[2]._replace(ip_addr=s2_new_ip, rpc_address=s2_new_ip)
servers[2] = ServerInfo(servers[2].server_id, s2_new_ip, s2_new_ip, servers[2].datacenter, servers[2].rack)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1], servers[2]])

View File

@@ -51,9 +51,6 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
coordinators_ids = await get_coordinator_host_ids(manager)
assert len(coordinators_ids) == 1, "At least 1 coordinator id should be found"
# Configure manager to ignore crashes caused by crash_coordinator_before_stream injection
manager.ignore_cores_log_patterns.append("crash_coordinator_before_stream: aborting")
# kill coordinator during decommission
logger.debug("Kill coordinator during decommission")
coordinator_host = await get_coordinator_host(manager)

View File

@@ -48,7 +48,7 @@ async def test_no_removed_node_event_on_ip_change(manager: ManagerClient, caplog
with test_cluster.connect() as test_cql:
logger.info(f"starting the follower node {servers[1]}")
await manager.server_start(servers[1].server_id)
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
logger.info("waiting for cql and hosts")
await wait_for_cql_and_get_hosts(test_cql, servers, time.time() + 30)

View File

@@ -10,7 +10,7 @@ from cassandra.policies import FallthroughRetryPolicy
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
from test.cluster.conftest import skip_mode
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
@@ -1981,171 +1981,3 @@ async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
@pytest.mark.asyncio
@pytest.mark.parametrize("repair_before_split", [False, True])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
]
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
expected_tablet_count = 4 # expected tablet count post split
async def run_split_prepare():
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
token = 'all'
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
if repair_before_split:
await generate_repair_work()
for server in servers:
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
await run_split_prepare()
for server in servers:
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
await repair_task
else:
await run_split_prepare()
await generate_repair_work()
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_intranode_synchronization(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
'--smp', '2',
]
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
server = servers[0]
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
host_id = await manager.get_host_id(server.server_id)
src_shard = replica[1]
# if tablet replica is at shard 0, move it to shard 1.
if src_shard == 0:
dst_shard = 1
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
await manager.api.enable_tablet_balancing(server.ip_addr)
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
# force split on the test table
expected_tablet_count = initial_tablets * 2
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
# Check that shard 0 ACKed split.
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
# Move tablet replica back to shard 0, where split was already ACKed.
src_shard = 1
dst_shard = 0
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
await migration_task
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, server, ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)

View File

@@ -1,8 +1,5 @@
add_library(test-lib STATIC)
target_sources(test-lib
PUBLIC
boost_test_tree_lister.cc
boost_tree_lister_injector.cc
PRIVATE
cql_assertions.cc
dummy_sharder.cc

View File

@@ -1,422 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/boost_test_tree_lister.hh"
#include <boost/algorithm/string/replace.hpp>
#include <fmt/ranges.h>
#include <memory>
#include <ranges>
namespace {
using label_info = internal::label_info;
using test_case_info = internal::test_case_info;
using test_suite_info = internal::test_suite_info;
using test_file_info = internal::test_file_info;
} // anonymous namespace
/// --------------------
///
/// Implementation notes
///
/// --------------------
///
/// The structure of the Boost.Test's test tree consists solely
/// of nodes representing test suites and test cases. It ignores
/// information like, for instance, the name of the file those
/// entities reside [1].
///
/// What's more, a test suite can span multiple files as long
/// as it has the same name [2].
///
/// We'd like to re-visualize the tree in a different manner:
/// have a forest, where each tree represents the internal structure
/// of a specific file. The non-leaf nodes represent test suites,
/// and the leaves -- test cases.
///
/// This type achieves that very goal (albeit in a bit ugly manner).
///
/// ---
///
/// Note that the implementation suffers from the same problems
/// Boost.Test itself does. For instance, when parametrizing tests
/// with `boost::unit_test::data`, the test will appear as a test suite,
/// while cases for each of the data instances -- as test cases.
/// There's no way to overcome that, so we're stuck with it.
///
/// -----------
///
/// Assumptions
///
/// -----------
///
/// We rely on the following assumptions:
///
/// 1. The tree traversal is performed pre-order. That's the case for
/// Boost.Test 1.89.0.
/// 2. If a test case TC belong to a test suite TS (directly or indirectly),
/// the following execution order holds:
/// i. `test_suite_start(TC)`,
/// ii. `visit(TC)`,
/// iii. `test_suite_finish(TC)`.
/// 3. If test suite TS1 is nested within test suite TS2, the following
/// execution order holds:
/// i. `test_suite_start(TS1)`,
/// ii. `test_suite_start(TS2)`,
/// iii. `test_suite_finish(TS2)`,
/// iv. `test_suite_finish(TS1)`.
///
/// ----------
///
/// References
///
/// ----------
///
/// [1] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree.html
/// [2] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html
///
/// ----------------------------
///
/// Example of high-level output
///
/// ----------------------------
///
/// Let's consider the following organization of tests.
///
/// TestFile1.cc:
/// - Suite A:
/// - Suite A1:
/// - Test A1.1 (labels: L1)
/// - Test A1.2
/// - Suite A2:
/// - Test A2.1
/// - Test A.1
/// - Suite B:
/// - Test B1
/// - Test B2 (labels: L2, L3)
/// - Test 1
///
/// TestFile2.cc:
/// - Suite A:
/// - Suite A3
/// - Test A3.1
/// - Test A.2
/// - Suite C:
/// - Test C.1
/// - Test 2
///
/// This structure will be translated into the following JSON (we're
/// omitting some details to make it cleaner and easier to read):
///
/// [
/// {
/// "file": "TestFile1.cc",
/// "content": {
/// "suites": [
/// {
/// "name": "A",
/// "suites": [
/// {
/// "name": "A1",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test A1.1",
/// "labels": "L1"
/// },
/// {
/// "name": "Test A1.2",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test1",
/// "labels": ""
/// }
/// ]
/// },
/// {
/// "name": "B",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test B1",
/// "labels": ""
/// },
/// {
/// "name": "Test B2",
/// "labels": "L2,L3"
/// },
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test 1",
/// "labels": ""
/// }
/// ]
/// }
/// },
/// {
/// "file": "TestFile2.cc",
/// "content": {
/// "suites": [
/// {
/// "name": "A",
/// "suites": [
/// {
/// "name": "A3",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test A3.1",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test A.2",
/// "labels": ""
/// }
/// ]
/// },
/// {
/// "name": "C",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test C.1",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test 2",
/// "labels": ""
/// }
/// ]
/// }
/// }
/// ]
///
/// Note that although Boost.Test treats Suite A in TestFile1.cc
/// and Suite A in TestFile2.cc as the SAME suite, we consider it
/// separately for each of the files it resides in.
struct boost_test_tree_lister::impl {
public:
/// The final result we're building while traversing the test tree.
test_file_forest file_forest;
/// The path from the root to the current suite.
std::vector<std::string> active_suites;
public:
void process_test_case(const boost::unit_test::test_case& tc) {
const std::string_view filename = {tc.p_file_name.begin(), tc.p_file_name.end()};
test_file_info& test_file = get_file_info(filename);
std::string test_name = tc.p_name;
std::vector<label_info> labels = tc.p_labels.get();
test_case_info test_info {.name = std::move(test_name), .labels = std::move(labels)};
if (active_suites.empty()) {
test_file.free_tests.push_back(std::move(test_info));
} else {
test_suite_info& suite_info = get_active_suite(filename);
suite_info.tests.push_back(std::move(test_info));
}
}
bool test_suite_start(const boost::unit_test::test_suite& ts) {
// The suite is the master test suite, so let's ignore it
// because it doesn't represent any actual test suite.
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
assert(active_suites.empty());
return true;
}
std::string suite_name = ts.p_name.value;
add_active_suite(std::move(suite_name));
return true;
}
void test_suite_finish(const boost::unit_test::test_suite& ts) {
// The suite is the master test suite, so let's ignore it
// because it doesn't represent any actual test suite.
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
assert(active_suites.empty());
return;
}
drop_active_suite();
}
private:
test_file_info& get_file_info(std::string_view filename) {
auto& test_files = file_forest.test_files;
auto it = test_files.find(filename);
if (it == test_files.end()) {
std::tie(it, std::ignore) = test_files.emplace(filename, std::vector<test_suite_info>{});
}
return it->second;
}
void add_active_suite(std::string suite_name) {
active_suites.push_back(std::move(suite_name));
}
void drop_active_suite() {
assert(!active_suites.empty());
active_suites.pop_back();
}
test_suite_info& get_active_suite(std::string_view filename) {
assert(!active_suites.empty());
test_file_info& file_info = get_file_info(filename);
test_suite_info* last = &get_root_suite(file_info, active_suites[0]);
for (const auto& suite_name : active_suites | std::views::drop(1)) {
last = &get_subsuite(*last, suite_name);
}
return *last;
}
test_suite_info& get_root_suite(test_file_info& file_info, std::string_view suite_name) {
auto suite_it = std::ranges::find(file_info.suites, suite_name, &test_suite_info::name);
if (suite_it != file_info.suites.end()) {
return *suite_it;
}
test_suite_info suite_info {.name = std::string(suite_name)};
file_info.suites.push_back(std::move(suite_info));
return *file_info.suites.rbegin();
}
test_suite_info& get_subsuite(test_suite_info& parent, std::string_view suite_name) {
auto suite_it = std::ranges::find(parent.subsuites, suite_name, [] (auto&& suite_ptr) -> std::string_view {
return suite_ptr->name;
});
if (suite_it != parent.subsuites.end()) {
return **suite_it;
}
auto suite = std::make_unique<test_suite_info>(std::string(suite_name));
parent.subsuites.push_back(std::move(suite));
return **parent.subsuites.rbegin();
}
};
boost_test_tree_lister::boost_test_tree_lister() : _impl(std::make_unique<impl>()) {}
boost_test_tree_lister::~boost_test_tree_lister() noexcept = default;
const test_file_forest& boost_test_tree_lister::get_result() const {
return _impl->file_forest;
}
void boost_test_tree_lister::visit(const boost::unit_test::test_case& tc) {
return _impl->process_test_case(tc);
}
bool boost_test_tree_lister::test_suite_start(const boost::unit_test::test_suite& ts) {
return _impl->test_suite_start(ts);
}
void boost_test_tree_lister::test_suite_finish(const boost::unit_test::test_suite& ts) {
return _impl->test_suite_finish(ts);
}
// Replace every occurrenace of a double quotation mark (`"`) with a string `\"`.
static std::string escape_quotation_marks(std::string_view str) {
const std::size_t double_quotation_count = std::ranges::count(str, '"');
std::string result(str.size() + double_quotation_count, '\\');
std::size_t offset = 0;
for (std::size_t i = 0; i < str.size(); ++i) {
if (str[i] == '"') {
result[i + offset] = '\\';
++offset;
}
result[i + offset] = str[i];
}
return result;
}
auto fmt::formatter<internal::test_case_info>::format(
const internal::test_case_info& test_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
// Sanity check. The names of tests are expected to comprise only of alphanumeric characters.
assert(std::ranges::count(test_info.name, '"') == 0);
auto label_range = test_info.labels | std::views::transform(escape_quotation_marks);
return fmt::format_to(ctx.out(), R"({{"name":"{}","labels":"{}"}})",
test_info.name, fmt::join(label_range, ","));
}
auto fmt::formatter<internal::test_suite_info>::format(
const internal::test_suite_info& suite_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
auto actual_suite_range = suite_info.subsuites | std::views::transform([] (auto&& ptr) -> const test_suite_info& {
return *ptr;
});
auto suite_range = fmt::join(actual_suite_range, ",");
auto test_range = fmt::join(suite_info.tests, ",");
return fmt::format_to(ctx.out(), R"({{"name":"{}","suites":[{}],"tests":[{}]}})",
suite_info.name, std::move(suite_range), std::move(test_range));
}
auto fmt::formatter<internal::test_file_info>::format(
const internal::test_file_info& file_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
auto suite_range = fmt::join(file_info.suites, ",");
auto test_range = fmt::join(file_info.free_tests, ",");
return fmt::format_to(ctx.out(), R"({{"suites":[{}],"tests":[{}]}})",
std::move(suite_range), std::move(test_range));
}
auto fmt::formatter<internal::test_file_forest>::format(
const internal::test_file_forest& forest_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
std::size_t files_left = forest_info.test_files.size();
fmt::format_to(ctx.out(), "[");
for (const auto& [file, content] : forest_info.test_files) {
fmt::format_to(ctx.out(), R"({{"file":"{}","content":{}}})",
file, content);
if (files_left > 1) {
fmt::format_to(ctx.out(), ",");
}
--files_left;
}
return fmt::format_to(ctx.out(), "]");
}

View File

@@ -1,117 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <boost/test/tree/visitor.hpp>
#include <fmt/base.h>
#include <memory>
namespace internal {
using label_info = std::string;
/// Type representing a single Boost test case.
struct test_case_info {
/// The name of the test case.
std::string name;
/// The labels the test was marked with.
std::vector<label_info> labels;
};
/// Type representing a single Boost test suite within a single file.
///
/// Note that a single suite can span multiple files (as of Boost.Test 1.89.0); see:
/// https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html.
///
/// We turn away from that convention and list suites from different files separately.
/// However, that doesn't change the fact that it's still the same suite from the
/// perspective of Boost.Test. In particular, if a suite is marked with a label,
/// it's applied to it globally.
struct test_suite_info {
std::string name;
std::vector<std::unique_ptr<test_suite_info>> subsuites;
/// The tests belonging directly to this suite.
std::vector<test_case_info> tests;
};
struct test_file_info {
std::vector<test_suite_info> suites;
std::vector<test_case_info> free_tests;
};
struct test_file_forest {
std::map<std::string, test_file_info, std::less<>> test_files;
};
} // namespace internal
using test_file_forest = internal::test_file_forest;
/// Implementation of the `boost::unit_test::test_tree_visitor` that
/// produces a similar result to running a Boost.Test executable with
/// `--list_content=HRF` or `--list_content=DOT`. This type results
/// in the JSON format of the output.
///
/// The crucial difference between this implementation and the built-in
/// HRF and DOT ones is that the result obtained by a call to `get_result()`
/// (after the traversal has finished) is going to have a different structure.
///
/// The type `boost_test_tree_lister` will treat the same suite from different
/// files as separate ones, even if they share the name. Boost.Test would treat
/// them as the same one and group the results by suites. In other words,
/// this type groups results by (in order):
///
/// 1. File
/// 2. Suite(s)
/// 3. Test cases
class boost_test_tree_lister : public boost::unit_test::test_tree_visitor {
private:
struct impl;
private:
std::unique_ptr<impl> _impl;
public:
boost_test_tree_lister();
~boost_test_tree_lister() noexcept;
public:
const test_file_forest& get_result() const;
private:
virtual void visit(const boost::unit_test::test_case&) override;
virtual bool test_suite_start(const boost::unit_test::test_suite&) override;
virtual void test_suite_finish(const boost::unit_test::test_suite&) override;
};
template <>
struct fmt::formatter<internal::test_case_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_case_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_suite_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_suite_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_file_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_file_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_file_forest> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_file_forest&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -1,115 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/boost_test_tree_lister.hh"
#include <boost/test/framework.hpp>
#include <boost/test/tree/traverse.hpp>
#include <boost/test/unit_test_suite.hpp>
#include <fmt/core.h>
namespace {
/// Traverse the test tree and collect information about
/// its structure and the tests.
///
/// The output is going to be in the JSON format.
/// For more details, see the implementation of
/// `boost_test_tree_lister`.
void print_boost_tests() {
namespace but = boost::unit_test;
but::framework::finalize_setup_phase();
boost_test_tree_lister traverser;
but::traverse_test_tree(but::framework::master_test_suite().p_id, traverser, true);
fmt::print("{}", traverser.get_result());
}
/// --------
/// Examples
/// --------
///
/// # This will NOT list the tests because Boost.Test
/// # will interpret it as an argument to the framework.
/// $ ./path/to/my/test/exec --list_json_content
///
/// # This will NOT list the tests because Boost.Test requires
/// # that all non-Boost.Test arguments be provided AFTER
/// # a `--` sequence (cf. example below).
/// $ ./path/to/my/test/exec list_json_content
///
/// # This will NOT list the tests because Boost.Test because
/// # the option simply doesn't match the exepected one.
/// $ ./path/to/my/test/exec list_json_content
///
/// # This DOES work and DOES what we expect, i.e. it lists the tests.
/// $ ./path/to/my/test/exec -- --list_json_content
bool list_tests(int argc, char** argv) {
for (int i = 1; i < argc; ++i) {
std::string_view option = argv[i];
if (option == "--list_json_content") {
return true;
}
}
return false;
}
struct boost_tree_lister_injector {
boost_tree_lister_injector() {
const auto& master_suite = boost::unit_test::framework::master_test_suite();
/// The arguments here don't include Boost.Test-specific arguments.
/// Those present correspond to the path to the binary and options
/// specified for the "end-code".
///
/// --------
/// Examples
/// --------
/// $ ./path/to/my/test/exec my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec -- my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec --auto_start_dbg=0 -- my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec --auto_start_dbg=0 my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// ------------------------------------------
/// Interaction with some Boost.Test arguments
/// ------------------------------------------
///
/// Note, however, that some Boost.Test options may prevent us
/// from accessing this code. For instance, if the user runs
///
/// $ ./path/to/my/test/exec --list_content -- my_custom_arg
///
/// then Boost.Test will immediately move to its own code and not
/// execute this one (because it's only called by a global fixture).
auto&& [argc, argv] = std::make_pair(master_suite.argc, master_suite.argv);
if (list_tests(argc, argv)) {
print_boost_tests();
// At this point, it's impossible to prevent Boost.Test
// from executing the tests it collected. This is all
// we can do (at least without writing a lot more code.
// I don't know if it would still be possible to avoid it).
std::exit(0);
}
}
};
} // anonymous namespace
BOOST_GLOBAL_FIXTURE(boost_tree_lister_injector);

View File

@@ -91,7 +91,7 @@ public:
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _sstables_manager;
}
sstables::shared_sstable make_sstable(sstables::sstable_state) const override {
sstables::shared_sstable make_sstable() const override {
return table().make_sstable();
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {

View File

@@ -385,7 +385,7 @@ def test_repair_options_hosts_and_dcs_tablets(nodetool, datacenter, hosts):
[("--tablet-tokens", "1")],
[("--tablet-tokens", "-1,2")],
[("--tablet-tokens", "-1"), ("--tablet-tokens", "2")]])
def test_repair_options_tokens_tablets(nodetool, tokens):
def test_repair_options_hosts_tablets(nodetool, tokens):
_do_test_repair_options_tablets(nodetool, tokens=tokens)
def test_repair_all_with_vnode_keyspace(nodetool):

View File

@@ -623,7 +623,7 @@ Repair session 1
Repair session 1 finished
"""
def test_repair_keyspace_failure(nodetool):
def test_repair_keyspace(nodetool):
check_nodetool_fails_with(
nodetool,
("repair", "ks"),

View File

@@ -29,9 +29,9 @@ class KeyProvider(Enum):
class KeyProviderFactory:
"""Base class for provider factories"""
def __init__(self, key_provider : KeyProvider, tmpdir):
def __init__(self, key_provider : KeyProvider):
self.key_provider = key_provider
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
self.system_keyfile = None
async def __aenter__(self):
return self
@@ -50,7 +50,7 @@ class KeyProviderFactory:
def configuration_parameters(self) -> dict[str, str]:
"""scylla.conf entries for provider"""
return {"system_key_directory": self.system_key_location}
return {}
def additional_cf_options(self) -> dict[str, str]:
# pylint: disable=unused-argument
@@ -62,7 +62,7 @@ class KeyProviderFactory:
class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
"""LocalFileSystemKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(LocalFileSystemKeyProviderFactory, self).__init__(KeyProvider.local, tmpdir)
super(LocalFileSystemKeyProviderFactory, self).__init__( KeyProvider.local)
self.secret_file = os.path.join(tmpdir, "test/node1/conf/data_encryption_keys")
def additional_cf_options(self) -> dict[str, str]:
@@ -72,7 +72,8 @@ class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
class ReplicatedKeyProviderFactory(KeyProviderFactory):
"""ReplicatedKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(ReplicatedKeyProviderFactory, self).__init__(KeyProvider.replicated, tmpdir)
super(ReplicatedKeyProviderFactory, self).__init__( KeyProvider.replicated)
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
self.system_key_file_name = "system_key"
async def __aenter__(self):
@@ -87,13 +88,17 @@ class ReplicatedKeyProviderFactory(KeyProviderFactory):
raise RuntimeError(f'Could not generate system key: {stderr.decode()}')
return self
def configuration_parameters(self) -> dict[str, str]:
"""scylla.conf entries for provider"""
return super().configuration_parameters() | {"system_key_directory": self.system_key_location}
def additional_cf_options(self):
return super().additional_cf_options() | {"system_key": self.system_key_file_name}
class KmipKeyProviderFactory(KeyProviderFactory):
"""KmipKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(KmipKeyProviderFactory, self).__init__(KeyProvider.kmip, tmpdir)
super(KmipKeyProviderFactory, self).__init__( KeyProvider.kmip)
self.tmpdir = tmpdir
self.kmip_server_wrapper = None
self.kmip_host = "kmip_test"
@@ -173,7 +178,7 @@ class KmipKeyProviderFactory(KeyProviderFactory):
class KMSKeyProviderFactory(KeyProviderFactory):
"""KMSKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(KMSKeyProviderFactory, self).__init__(KeyProvider.kms, tmpdir)
super(KMSKeyProviderFactory, self).__init__( KeyProvider.kms)
self.tmpdir = tmpdir
self.master_key = "alias/Scylla-test"
self.kms_host = "kms_test"
@@ -255,7 +260,7 @@ class KMSKeyProviderFactory(KeyProviderFactory):
class AzureKeyProviderFactory(KeyProviderFactory):
"""AzureKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(AzureKeyProviderFactory, self).__init__(KeyProvider.azure, tmpdir)
super(AzureKeyProviderFactory, self).__init__( KeyProvider.azure)
self.tmpdir = tmpdir
self.azure_host = "azure_test"
self.azure_server = None

View File

@@ -22,13 +22,12 @@ class ServerInfo(NamedTuple):
rpc_address: IPAddress
datacenter: str
rack: str
pid: int
def __str__(self):
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack}, {self.pid})"
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack})"
def as_dict(self) -> dict[str, object]:
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack, "pid": self.pid}
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack}
def property_file(self) -> dict[str, str]:
return {"dc": self.datacenter, "rack": self.rack}

View File

@@ -167,48 +167,3 @@ class ScyllaLogFile:
line = await self._run_in_executor(log_file.readline, loop=loop)
return matches
async def find_backtraces(self, from_mark: int | None = None) -> list[str]:
"""
Find and extract all backtraces from the log file.
Each backtrace starts with a line "Backtrace:" followed by lines that start with exactly 2 spaces.
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
Return a list of strings, where each string is a complete backtrace (all lines joined together).
"""
loop = asyncio.get_running_loop()
backtraces = []
with self.file.open(encoding="utf-8") as log_file:
if from_mark:
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
line = await self._run_in_executor(log_file.readline, loop=loop)
while line:
if line.strip() == "Backtrace:":
# Found a backtrace, collect all lines that start with exactly 2 spaces
backtrace_lines = [line]
while True:
next_line = await self._run_in_executor(log_file.readline, loop=loop)
if not next_line:
# End of file
break
if next_line.startswith(" ") and not next_line.startswith(" "):
# Line starts with exactly 2 spaces (backtrace entry)
backtrace_lines.append(next_line)
else:
# End of backtrace
line = next_line
break
if backtrace_lines:
# Join all backtrace lines into a single string
backtraces.append(''.join(backtrace_lines))
# Continue from current line (already read in the inner loop)
continue
line = await self._run_in_executor(log_file.readline, loop=loop)
return backtraces

View File

@@ -8,9 +8,7 @@
Provides helper methods to test cases.
Manages driver refresh when cluster is cycled.
"""
from collections import defaultdict
import pathlib
import re
import shutil
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
@@ -66,8 +64,6 @@ class ManagerClient:
self.metrics = ScyllaMetricsClient()
self.thread_pool = ThreadPoolExecutor()
self.test_finished_event = asyncio.Event()
self.ignore_log_patterns = [] # patterns to ignore in server logs when checking for errors
self.ignore_cores_log_patterns = [] # patterns to ignore in server logs when checking for core files
@property
def client(self):
@@ -183,82 +179,6 @@ class ManagerClient:
logger.info("Cluster after test %s: %s", test_case_name, cluster_status)
return cluster_status
async def check_all_errors(self, check_all_errors=False) -> dict[ServerInfo, dict[str, Union[list[str], list[str], Path, list[str]]]]:
errors = defaultdict(dict)
# find errors in logs
for server in await self.all_servers():
log_file = await self.server_open_log(server_id=server.server_id)
# check if we should ignore cores on this server
ignore_cores = []
if self.ignore_cores_log_patterns:
if matches := log_file.grep("|".join(f"({p})" for p in set(self.ignore_cores_log_patterns))):
logger.debug(f"Will ignore cores on {server}. Found the following log messages: {matches}")
ignore_cores.append(server)
critical_error_pattern = r"Assertion.*failed|AddressSanitizer"
if server not in ignore_cores:
critical_error_pattern += "|Aborting on shard"
if found_critical := await log_file.grep(critical_error_pattern):
errors[server]["critical"] = [e[0] for e in found_critical]
# Find the backtraces for the critical errors
if found_backtraces := await log_file.find_backtraces():
errors[server]["backtraces"] = found_backtraces
if check_all_errors:
if found_errors := await log_file.grep_for_errors(distinct_errors=True):
if filtered_errors := await self.filter_errors(found_errors):
errors[server]["error"] = filtered_errors
# find core files
for server, cores in (await self.find_cores()).items():
errors[server]["cores"] = cores
# add log file path to the report for servers that had errors or cores
for server in await self.all_servers():
log_file = await self.server_open_log(server_id=server.server_id)
if server in errors:
errors[server]["log"] = log_file.file.name
return errors
async def filter_errors(self, errors: list[str]):
exclude_errors_pattern = re.compile("|".join(f"{p}" for p in {
*self.ignore_log_patterns,
*self.ignore_cores_log_patterns,
r"Compaction for .* deliberately stopped",
r"update compaction history failed:.*ignored",
# We may stop nodes that have not finished starting yet.
r"(Startup|start) failed:.*(seastar::sleep_aborted|raft::request_aborted)",
r"Timer callback failed: seastar::gate_closed_exception",
# Ignore expected RPC errors when nodes are stopped.
r"rpc - client .*(connection dropped|fail to connect)",
# We see benign RPC errors when nodes start/stop.
# If they cause system malfunction, it should be detected using higher-level tests.
r"rpc::unknown_verb_error",
r"raft_rpc - Failed to send",
r"raft_topology.*(seastar::broken_promise|rpc::closed_error)",
# Expected tablet migration stream failure where a node is stopped.
# Refs: https://github.com/scylladb/scylladb/issues/19640
r"Failed to handle STREAM_MUTATION_FRAGMENTS.*rpc::stream_closed",
# Expected Raft errors on decommission-abort or node restart with MV.
r"raft_topology - raft_topology_cmd.*failed with: raft::request_aborted",
}))
return [e for e in errors if not exclude_errors_pattern.search(e)]
async def find_cores(self) -> dict[ServerInfo, list[str]]:
"""Find core files on all servers"""
# find *.core files in current dir
cores = [str(core_file.absolute()) for core_file in pathlib.Path('.').glob('*.core')]
server_cores = dict()
# match core files to servers by pid
for server in await self.all_servers():
if found_cores := [core for core in cores if f".{server.pid}." in core]:
server_cores[server] = found_cores
return server_cores
async def gather_related_logs(self, failed_test_path_dir: Path, logs: Dict[str, Path]) -> None:
for server in await self.all_servers():
@@ -292,7 +212,8 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of running servers") from exc
assert isinstance(server_info_list, list), "running_servers got unknown data type"
return [ServerInfo(*info) for info in server_info_list]
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
async def all_servers(self) -> list[ServerInfo]:
"""Get List of server info (id and IP address) of all servers"""
@@ -301,7 +222,8 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of servers") from exc
assert isinstance(server_info_list, list), "all_servers got unknown data type"
return [ServerInfo(*info) for info in server_info_list]
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
async def starting_servers(self) -> list[ServerInfo]:
"""Get List of server info (id and IP address) of servers currently
@@ -314,7 +236,8 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of starting servers") from exc
assert isinstance(server_info_list, list), "starting_servers got unknown data type"
return [ServerInfo(*info) for info in server_info_list]
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
async def mark_dirty(self) -> None:
"""Manually mark current cluster dirty.
@@ -353,9 +276,6 @@ class ManagerClient:
Replace CLI options and environment variables with `cmdline_options_override` and `append_env_override`
if provided.
"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient starting %s", server_id)
data = {
"expected_error": expected_error,
@@ -492,9 +412,6 @@ class ManagerClient:
expected_server_up_state: Optional[ServerUpState] = None,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
try:
data = self._create_server_add_data(
replace_cfg,
@@ -523,7 +440,11 @@ class ManagerClient:
except Exception as exc:
raise Exception("Failed to add server") from exc
try:
s_info = ServerInfo(**server_info)
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]),
server_info["datacenter"],
server_info["rack"])
except Exception as exc:
raise RuntimeError(f"server_add got invalid server data {server_info}") from exc
logger.debug("ManagerClient added %s", s_info)
@@ -552,9 +473,6 @@ class ManagerClient:
assert servers_num > 0, f"servers_add: cannot add {servers_num} servers, servers_num must be positive"
assert not (property_file and auto_rack_dc), f"Either property_file or auto_rack_dc can be provided, but not both"
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
if auto_rack_dc:
property_file = [{"dc":auto_rack_dc, "rack":f"rack{i+1}"} for i in range(servers_num)]
@@ -571,7 +489,11 @@ class ManagerClient:
s_infos = list[ServerInfo]()
for server_info in server_infos:
try:
s_info = ServerInfo(**server_info)
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]),
server_info["datacenter"],
server_info["rack"])
s_infos.append(s_info)
except Exception as exc:
raise RuntimeError(f"servers_add got invalid server data {server_info}") from exc
@@ -590,9 +512,6 @@ class ManagerClient:
wait_removed_dead: bool = True,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
"""Invoke remove node Scylla REST API for a specified server"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient remove node %s on initiator %s", server_id, initiator_id)
# If we remove a node, we should wait until other nodes see it as dead
@@ -613,9 +532,6 @@ class ManagerClient:
expected_error: str | None = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
"""Tell a node to decommission with Scylla REST API"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient decommission %s", server_id)
data = {"expected_error": expected_error}
await self.client.put_json(f"/cluster/decommission-node/{server_id}", data,

View File

@@ -477,8 +477,7 @@ class ScyllaServer:
return "DEFAULT_RACK"
def server_info(self) -> ServerInfo:
pid = self.cmd.pid if self.cmd else None
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack, pid)
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack)
def change_rpc_address(self, rpc_address: IPAddress) -> None:
"""Change RPC IP address of the current server. Pre: the server is

View File

@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
class random_content_file:
class RandomContentFile:
def __init__(self, path: str, size_in_bytes: int):
path = pathlib.Path(path)
self.filename = path if path.is_file() else path / str(uuid.uuid4())
@@ -68,7 +68,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
try:
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
except Exception:
pass
else:
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
@pytest.mark.asyncio
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
cmdline = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
@@ -134,7 +134,7 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
s2_mark = await s2_log.mark()
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
mark = await log.mark()
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
coord_log = await manager.server_open_log(coord_serv.server_id)
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
await manager.server_restart(servers[0].server_id, wait_others=2)
@@ -397,80 +397,3 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)
await assert_resize_task_info(table_id, lambda response: len(response) == 2 and all(r.resize_task_info is None for r in response))
# Verify that new sstable produced by repair cannot be split, if disk utilization level is critical.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
logger.info("Create and populate test table")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 2}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
table_id = (await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'"))[0].id
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 64)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
async def run_split():
await manager.api.enable_injection(coord_serv.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 4}}")
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
await generate_repair_work()
await manager.api.enable_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait", one_shot=True)
token = 'all'
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, table, token))
# Emit split decision during repair.
await run_split()
await log.wait_for("maybe_split_new_sstable_wait: waiting", from_mark=mark)
await manager.api.disable_injection(coord_serv.ip_addr, "tablet_resize_finalization_postpone")
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
await manager.api.message_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait")
# Expect repair to fail when splitting new sstables
await log.wait_for("Repair for tablet migration of .* failed", from_mark=mark)
await log.wait_for("Cannot split .* because manager has compaction disabled", from_mark=mark)
assert await log.grep(f"compaction .* Split {cf}", from_mark=mark) == []
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
await repair_task
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}", from_mark=mark)

View File

@@ -1,10 +1,6 @@
## Copyright 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# This file is generated by cargo-toml-template. Do not edit directly.
# To make changes, edit the template and regenerate with command:
# "$ cargo-toml-template > Cargo.toml".
[workspace]
members = ["crates/*"]
default-members = ["crates/validator"]
@@ -16,16 +12,13 @@ edition = "2024"
[workspace.dependencies]
anyhow = "1.0.97"
async-backtrace = "0.2.7"
futures = "0.3.31"
scylla = { version = "1.2.0", features = ["time-03"] }
tokio = { version = "1.44.1", features = ["full"] }
tracing = "0.1.41"
uuid = "1.16.0"
httpclient = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-store = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
[patch.'https://github.com/scylladb/scylladb.git']
[patch.'https://github.com/scylladb/vector-store.git']
vector-search-validator-scylla = { path = "crates/validator-scylla" }

View File

@@ -8,8 +8,6 @@ namespace to separate it from the host environment. `vector-search-validator`
contains DNS server and all tests in one binary. It uses external scylla and
vector-store binaries.
## Running tests
The `test_validator.py::test_validator[test-case]` is the entry point for
running the tests. It is parametrized with name of the test case. Available
test cases are taken dynamically from the `vector-search-validator` binary.
@@ -39,22 +37,6 @@ $ pytest --mode=dev test/vector_search_validator/test_validator.py --filters fil
Logs are stored in
`testlog/{mode}/vector_search_validator/{test-case}-{run_id}/` directory.
## Development of test cases
`vector-search-validator` (in short `validator`) is divided into multiple
crates:
- `validator` - a main crate that contains only the entry point
- `validator-scylla` - contains implementation of the validator tests on the
scylladb.git side. If you want to add/modify the tests implemented in the
scylladb.git, you will work in this crate.
- `vector-store.git/validator-engine` - contains the core logic of the
validator - overall test runner and implementation of actors for tests (dns
server, scylla cluster, vector store cluster)
- `vector-store.git/validator-tests` - contains the core logic of the framework
tests, provides base structures for tests and actor interfaces. In the
future we should check if it is possible to merge it with `validator-engine`
crate.
- `vector-store.git/validator-vector-store` - contains implementation of the
validator tests on the vector-store.git side. If you want to add/modify the
tests implemented in the vector-store.git, you will work in this crate.
Implementing new test cases on the Scylla repository side means adding new test
in crate `crates/validator-scylla`.

View File

@@ -1,2 +1,2 @@
VECTOR_STORE_GIT=https://github.com/scylladb/vector-store.git
VECTOR_STORE_REV=d79ee80
VECTOR_STORE_REV=3ee46a5

View File

@@ -11,10 +11,6 @@ cat << EOF
## Copyright 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# This file is generated by cargo-toml-template. Do not edit directly.
# To make changes, edit the template and regenerate with command:
# "\$ cargo-toml-template > Cargo.toml".
[workspace]
members = ["crates/*"]
default-members = ["crates/validator"]
@@ -26,17 +22,14 @@ edition = "2024"
[workspace.dependencies]
anyhow = "1.0.97"
async-backtrace = "0.2.7"
futures = "0.3.31"
scylla = { version = "1.2.0", features = ["time-03"] }
tokio = { version = "1.44.1", features = ["full"] }
tracing = "0.1.41"
uuid = "1.16.0"
httpclient = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-search-validator-engine = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-search-validator-tests = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-store = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
[patch.'https://github.com/scylladb/scylladb.git']
[patch.'$VECTOR_STORE_GIT']
vector-search-validator-scylla = { path = "crates/validator-scylla" }
EOF

View File

@@ -4,11 +4,5 @@ version = "0.1.0"
edition = "2024"
[dependencies]
async-backtrace.workspace = true
httpclient.workspace = true
scylla.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
vector-search-validator-tests.workspace = true
vector-store.workspace = true

View File

@@ -1,302 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use async_backtrace::framed;
use httpclient::HttpClient;
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::response::query_result::QueryRowsResult;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::info;
use uuid::Uuid;
use vector_search_validator_tests::DnsExt;
use vector_search_validator_tests::ScyllaClusterExt;
use vector_search_validator_tests::ScyllaNodeConfig;
use vector_search_validator_tests::TestActors;
use vector_search_validator_tests::VectorStoreClusterExt;
use vector_search_validator_tests::VectorStoreNodeConfig;
use vector_store::httproutes::IndexStatus;
use vector_store::IndexInfo;
pub(crate) const DEFAULT_TEST_TIMEOUT: Duration = Duration::from_secs(120);
pub(crate) const VS_NAMES: [&str; 3] = ["vs1", "vs2", "vs3"];
pub(crate) const VS_PORT: u16 = 6080;
pub(crate) const DB_OCTET_1: u8 = 1;
pub(crate) const DB_OCTET_2: u8 = 2;
pub(crate) const DB_OCTET_3: u8 = 3;
pub(crate) const VS_OCTET_1: u8 = 128;
pub(crate) const VS_OCTET_2: u8 = 129;
pub(crate) const VS_OCTET_3: u8 = 130;
#[framed]
pub(crate) async fn get_default_vs_urls(actors: &TestActors) -> Vec<String> {
let domain = actors.dns.domain().await;
VS_NAMES
.iter()
.map(|name| format!("http://{name}.{domain}:{VS_PORT}"))
.collect()
}
pub(crate) fn get_default_vs_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
vec![
actors.services_subnet.ip(VS_OCTET_1),
actors.services_subnet.ip(VS_OCTET_2),
actors.services_subnet.ip(VS_OCTET_3),
]
}
pub(crate) fn get_default_db_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
vec![
actors.services_subnet.ip(DB_OCTET_1),
actors.services_subnet.ip(DB_OCTET_2),
actors.services_subnet.ip(DB_OCTET_3),
]
}
#[framed]
pub(crate) async fn get_default_scylla_node_configs(actors: &TestActors) -> Vec<ScyllaNodeConfig> {
let default_vs_urls = get_default_vs_urls(actors).await;
get_default_db_ips(actors)
.iter()
.enumerate()
.map(|(i, &ip)| {
let mut vs_urls = default_vs_urls.clone();
ScyllaNodeConfig {
db_ip: ip,
primary_vs_uris: vec![vs_urls.remove(i)],
secondary_vs_uris: vs_urls,
}
})
.collect()
}
pub(crate) fn get_default_vs_node_configs(actors: &TestActors) -> Vec<VectorStoreNodeConfig> {
let db_ips = get_default_db_ips(actors);
get_default_vs_ips(actors)
.iter()
.zip(db_ips.iter())
.map(|(&vs_ip, &db_ip)| VectorStoreNodeConfig {
vs_ip,
db_ip,
envs: HashMap::new(),
})
.collect()
}
#[framed]
pub(crate) async fn init(actors: TestActors) {
info!("started");
let scylla_configs = get_default_scylla_node_configs(&actors).await;
let vs_configs = get_default_vs_node_configs(&actors);
init_with_config(actors, scylla_configs, vs_configs).await;
info!("finished");
}
#[framed]
pub(crate) async fn init_with_config(
actors: TestActors,
scylla_configs: Vec<ScyllaNodeConfig>,
vs_configs: Vec<VectorStoreNodeConfig>,
) {
let vs_ips = get_default_vs_ips(&actors);
for (name, ip) in VS_NAMES.iter().zip(vs_ips.iter()) {
actors.dns.upsert(name.to_string(), *ip).await;
}
actors.db.start(scylla_configs, None).await;
assert!(actors.db.wait_for_ready().await);
actors.vs.start(vs_configs).await;
assert!(actors.vs.wait_for_ready().await);
}
#[framed]
pub(crate) async fn cleanup(actors: TestActors) {
info!("started");
for name in VS_NAMES.iter() {
actors.dns.remove(name.to_string()).await;
}
actors.vs.stop().await;
actors.db.stop().await;
info!("finished");
}
#[framed]
pub(crate) async fn prepare_connection_with_custom_vs_ips(
actors: &TestActors,
vs_ips: Vec<Ipv4Addr>,
) -> (Arc<Session>, Vec<HttpClient>) {
let session = Arc::new(
SessionBuilder::new()
.known_node(actors.services_subnet.ip(DB_OCTET_1).to_string())
.build()
.await
.expect("failed to create session"),
);
let clients = vs_ips
.iter()
.map(|&ip| HttpClient::new((ip, VS_PORT).into()))
.collect();
(session, clients)
}
#[framed]
pub(crate) async fn wait_for<F, Fut>(mut condition: F, msg: &str, timeout: Duration)
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
time::timeout(timeout, async {
while !condition().await {
time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
}
#[framed]
pub(crate) async fn wait_for_value<F, Fut, T>(mut poll_fn: F, msg: &str, timeout: Duration) -> T
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Option<T>>,
{
time::timeout(timeout, async {
loop {
if let Some(value) = poll_fn().await {
return value;
}
time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
}
#[framed]
pub(crate) async fn wait_for_index(
client: &HttpClient,
index: &IndexInfo,
) -> vector_store::httproutes::IndexStatusResponse {
wait_for_value(
|| async {
match client.index_status(&index.keyspace, &index.index).await {
Ok(resp) if resp.status == IndexStatus::Serving => Some(resp),
_ => None,
}
},
"Waiting for index to be SERVING",
Duration::from_secs(20),
)
.await
}
#[framed]
pub(crate) async fn get_query_results(query: String, session: &Session) -> QueryRowsResult {
session
.query_unpaged(query, ())
.await
.expect("failed to run query")
.into_rows_result()
.expect("failed to get rows")
}
#[framed]
pub(crate) async fn create_keyspace(session: &Session) -> String {
let keyspace = format!("ks_{}", Uuid::new_v4().simple());
// Create keyspace with replication factor of 3 for the 3-node cluster
session.query_unpaged(
format!("CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}"),
(),
).await.expect("failed to create a keyspace");
// Use keyspace
session
.use_keyspace(&keyspace, false)
.await
.expect("failed to use a keyspace");
keyspace
}
#[framed]
pub(crate) async fn create_table(
session: &Session,
columns: &str,
options: Option<&str>,
) -> String {
let table = format!("tbl_{}", Uuid::new_v4().simple());
let extra = if let Some(options) = options {
format!("WITH {options}")
} else {
String::new()
};
// Create table
session
.query_unpaged(format!("CREATE TABLE {table} ({columns}) {extra}"), ())
.await
.expect("failed to create a table");
table
}
#[framed]
pub(crate) async fn create_index(
session: &Session,
clients: &[HttpClient],
table: &str,
column: &str,
) -> IndexInfo {
let index = format!("idx_{}", Uuid::new_v4().simple());
// Create index
session
.query_unpaged(
format!("CREATE INDEX {index} ON {table}({column}) USING 'vector_index'"),
(),
)
.await
.expect("failed to create an index");
// Wait for the index to be created
wait_for(
|| async {
for client in clients.iter() {
if !client
.indexes()
.await
.iter()
.any(|idx| idx.index.to_string() == index)
{
return false;
}
}
true
},
"Waiting for the first index to be created",
Duration::from_secs(10),
)
.await;
clients
.first()
.expect("No vector store clients provided")
.indexes()
.await
.into_iter()
.find(|idx| idx.index.to_string() == index)
.expect("index not found")
}

View File

@@ -3,13 +3,12 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use crate::common;
use async_backtrace::framed;
use vector_search_validator_tests::TestCase;
use std::time::Duration;
use vector_search_validator_tests::common;
use vector_search_validator_tests::*;
#[framed]
pub(crate) async fn new() -> TestCase {
let timeout = common::DEFAULT_TEST_TIMEOUT;
let timeout = Duration::from_secs(30);
TestCase::empty()
.with_init(timeout, common::init)
.with_cleanup(timeout, common::cleanup)

View File

@@ -1,115 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use crate::common;
use async_backtrace::framed;
use tracing::info;
use vector_search_validator_tests::ScyllaClusterExt;
use vector_search_validator_tests::ScyllaNodeConfig;
use vector_search_validator_tests::TestActors;
use vector_search_validator_tests::TestCase;
use vector_search_validator_tests::VectorStoreNodeConfig;
#[framed]
pub(crate) async fn new() -> TestCase {
let timeout = common::DEFAULT_TEST_TIMEOUT;
TestCase::empty()
.with_cleanup(timeout, common::cleanup)
.with_test(
"secondary_uri_works_correctly",
timeout,
test_secondary_uri_works_correctly,
)
}
#[framed]
async fn test_secondary_uri_works_correctly(actors: TestActors) {
info!("started");
let vs_urls = common::get_default_vs_urls(&actors).await;
let vs_url = &vs_urls[0];
let scylla_configs: Vec<ScyllaNodeConfig> = vec![
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
primary_vs_uris: vec![vs_url.clone()],
secondary_vs_uris: vec![],
},
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_2),
primary_vs_uris: vec![],
secondary_vs_uris: vec![vs_url.clone()],
},
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_3),
primary_vs_uris: vec![],
secondary_vs_uris: vec![vs_url.clone()],
},
];
let vs_configs = vec![VectorStoreNodeConfig {
vs_ip: actors.services_subnet.ip(common::VS_OCTET_1),
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
envs: Default::default(),
}];
common::init_with_config(actors.clone(), scylla_configs, vs_configs).await;
let vs_ips = vec![actors.services_subnet.ip(common::VS_OCTET_1)];
let (session, clients) = common::prepare_connection_with_custom_vs_ips(&actors, vs_ips).await;
let keyspace = common::create_keyspace(&session).await;
let table =
common::create_table(&session, "pk INT PRIMARY KEY, v VECTOR<FLOAT, 3>", None).await;
// Insert vectors
for i in 0..100 {
let embedding = vec![i as f32, (i * 2) as f32, (i * 3) as f32];
session
.query_unpaged(
format!("INSERT INTO {table} (pk, v) VALUES (?, ?)"),
(i, &embedding),
)
.await
.expect("failed to insert data");
}
let index = common::create_index(&session, &clients, &table, "v").await;
for client in &clients {
let index_status = common::wait_for_index(&client, &index).await;
assert_eq!(
index_status.count, 100,
"Expected 100 vectors to be indexed"
);
}
// Down the first node with primary URI
let first_node_ip = actors.services_subnet.ip(common::DB_OCTET_1);
info!("Bringing down node {first_node_ip}");
actors.db.down_node(first_node_ip).await;
// Should work via secondary URIs
let results = common::get_query_results(
format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"),
&session,
)
.await;
let rows = results
.rows::<(i32,)>()
.expect("failed to get rows after node down");
assert!(
rows.rows_remaining() <= 10,
"Expected at most 10 results from ANN query after node down"
);
// Drop keyspace
session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");
info!("finished");
}

View File

@@ -3,19 +3,12 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
mod common;
mod cql;
mod high_availability;
use async_backtrace::framed;
use vector_search_validator_tests::TestCase;
#[framed]
pub async fn test_cases() -> impl Iterator<Item = (String, TestCase)> {
vec![
("scylla_cql", cql::new().await),
("scylla_high_availability", high_availability::new().await),
]
.into_iter()
.map(|(name, test_case)| (name.to_string(), test_case))
vec![("cql", cql::new().await)]
.into_iter()
.map(|(name, test_case)| (name.to_string(), test_case))
}

View File

@@ -819,7 +819,7 @@ public:
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _permit; }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return do_make_sstable(); }
virtual sstables::shared_sstable make_sstable() const override { return do_make_sstable(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return do_configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
@@ -909,7 +909,7 @@ void scrub_operation(schema_ptr schema, reader_permit permit, const std::vector<
auto compaction_descriptor = compaction::compaction_descriptor(std::move(sstables));
compaction_descriptor.options = compaction::compaction_type_options::make_scrub(scrub_mode, compaction::compaction_type_options::scrub::quarantine_invalid_sstables::no);
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(sstables::sstable_state::normal); };
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(); };
compaction_descriptor.replacer = [] (compaction::compaction_completion_desc) { };
auto compaction_data = compaction::compaction_data{};

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-43-20251217
docker.io/scylladb/scylla-toolchain:fedora-43-20251208